首页 > 代码库 > python--分布式爬虫

python--分布式爬虫

//server
import socket, select, re, queue, redis
from multiprocessing import Pool, cpu_count
from pymongo import MongoClient

host = 192.168.1.107
ConnectionList = []
Recv_buffer = 4096000
Client_Status = {}
Client_Num = {}
redis1 = redis.Redis(host=localhost, port=6379, db=0)
Num = 0


class Distributed_Web_Crawler:
    def __init__(self, port):
        self.url_num = 1
        self.queue = queue.Queue()
        self.db = MongoClient().CrawSpider.content
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((host, port))
        self.server_socket.listen(10)
        self.pool = Pool(cpu_count() - 1)
        ConnectionList.append(self.server_socket)
        print("服务器运行在端口:" + str(port))
        address = https://movie.douban.com/
        self.queue.put(address)
        redis1.set(address, 0)
        self.main()

    def main(self):
        global Num
        while 1:
            if not self.queue.empty() and ConnectionList.__len__() > 1 is not None:
                self.pool.apply_async(self.task_manage())
            read_sockets, write_sockets, error_sockets = select.select(ConnectionList, [], [])
            for sock in read_sockets:
                if sock == self.server_socket:
                    conn, addr = self.server_socket.accept()
                    ConnectionList.append(conn)
                    core_num = conn.recv(Recv_buffer).decode(utf8)
                    Client_Status[conn] = core_num
                    Client_Num[conn] = Client_Num.__len__() + 1
                    print(客户端  + addr[0] + : + str(addr[1]) + 已连接,核心数:  + core_num + \n编号为 + str(Client_Num[
                        conn]))
                else:
                    data = sock.recv(Recv_buffer)
                    if data:
                        Contents = data.decode(utf8).split(Page_ContentPPPPPP///////)
                        # print(‘收到‘+str(Client_Num[sock])+‘号机发来数据,正在处理‘)
                        Client_Status[sock] = int(Client_Status[sock]) + len(Contents)
                        print(编号+str(Client_Num[sock])+可用核心+str(Client_Status[sock]))
                        for content in Contents:
                            if content:
                                self.pool.apply_async(self.web_page_resolution(content))
                    else:
                        print(客户端  + addr[0] + : + str(addr[1]) + 断开连接)
                        sock.close()
                        Client_Status.pop(sock)
                        Client_Num.pop(sock)
                        ConnectionList.remove(sock)

    def web_page_resolution(self, content):
        db = MongoClient().Web.data
        db.insert({page_content: content})
        pattern = re.compile(https://movie.douban.com/(.*?)")
        urls = re.findall(string=content, pattern=pattern)
        for url in urls:
            url = https://movie.douban.com/ + url
            if redis1.get(url) is None:
                redis1.set(url, self.url_num)
                self.queue.put(url)
                self.url_num += 1

    def task_manage(self):
        urls = ‘‘
        for socket in ConnectionList:
            if socket != self.server_socket:
                while not self.queue.empty() and int(Client_Status[socket]) != 0:
                    urls = urls + self.queue.get() +  
                    Client_Status[socket] = int(Client_Status[socket]) - 1
                # print(‘向‘ + str(Client_Num[socket]) + ‘号终端分配任务‘)
                socket.send(urls.encode(utf8))


if __name__ == "__main__":
    port = 8888
    Distributed_Web_Crawler(port, )
//Client
import socket, sys, select
from multiprocessing import cpu_count
from requests import get
from multiprocessing import Pool

p = Pool(cpu_count() - 1)
host = 192.168.0.103
Page_contents = []


def crawler_page(url):
    print("正在爬取网页" + url)
    content = get(url).content.decode(utf8) + Page_ContentPPPPPP///////
    print(url + "爬取完成,正在向服务器发送数据")
    s.send(content.encode(utf8))


def listing():
    while 1:
        rlist = [sys.stdin, s]
        read_list, write_list, error_list = select.select(rlist, [], [])
        for sock in read_list:
            if sock == s:
                data = sock.recv(4096).decode(utf8)
                if data != quit and data:
                    urls = data.split()
                    if len(urls) == 1:
                        p.apply_async(crawler_page(urls[0]))
                    else:
                        for url in urls:
                            p.apply_async(crawler_page(url))
                            urls.remove(url)
                elif data =http://www.mamicode.com/= quit:
                    print(接收到服务器关闭指令,客户端正在退出)
                    sys.exit()
                else:
                    print(服务器连接失败,正在退出)
                    sys.exit()


if __name__ == "__main__":
    port = 8888
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(3)
    try:
        s.connect((192.168.1.107, port))
    except:
        print("无法连接至服务器,请检查地址后重试")
        sys.exit()
    print("已连接至服务器,开始发送机器信息\n核心数:" + str(cpu_count()))
    s.send(str(cpu_count()).encode(utf8))
    listing()

 

python--分布式爬虫