首页 > 代码库 > 基于gevent全国手机号段spider蜘蛛爬虫

基于gevent全国手机号段spider蜘蛛爬虫

 

首先介绍下python异步执行,python有两种方法编写异步代码:

1、corutines协程(也称为greenlets)

2、回调

gevent是greenlets的一种实现方式,可以通过pip方便的安装gevent模块。gevent执行方式实际上是代码块的交替执行,具体的可以看下这篇blog,我就不重复造轮子了。

值得一提的是,gevent封装了很多接口,其中一个是著名的猴子补丁monkey,

from gevent import monkeymonkey.patch_all()

这两行就可以在代码中改变其余包的行为,让其从同步阻塞方式变为异步非阻塞方式,非常的神奇。

我利用gevent的异步非阻塞方式写了一个手机号段蜘蛛爬虫,目前一直在服务器稳定的运行,代码详见我的github,内有福利。脚本用法:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]

  1 #!/usr/bin/python  2 #-*- coding:utf-8 -*-  3 """手机号段爬虫:接收用户命令参数精简版 for sqlitedb  4 @version:1.0  5 @author:Kenny{Kenny.F<mailto:kennyffly@gmail.com>}  6 @since:2014/05/23  7 """  8 import sys  9 reload(sys) 10 sys.setdefaultencoding(utf8) 11 import gevent                         #gevent协程包 12 import multiprocessing                #多进程 13 from multiprocessing import Manager 14 import urllib2 15 from urllib import unquote,quote 16 import socket 17 socket.setdefaulttimeout(20) 18 import cookielib 19 import random 20 import simplejson as json 21 import os 22 import time 23 import sqlite3                        #sqlite数据库操作 24 from functools import wraps            #方法工具 25 from strtodecode import strtodecode    #编码检测转换 26  27  28 manager = Manager()                    #多进程共享队列 29 lacknumlist = manager.list() 30  31  32 def multi_run_wrapper(func):        #多进程map包裹参数 33     @wraps(func) 34     def newF(args): 35         if isinstance(args,list): 36             return func(*args) 37         elif isinstance(args,tuple): 38             return func(*args) 39         else: 40             return func(args) 41     return newF 42  43  44 def getRanIp():        #得到随机IP 45     #123.125.40.255 - 123.127.134.56 北京联通154938条 46     return "123.{0}.{1}.{2}".format(random.randint(125,127), random.randint(40,134), random.randint(56,255)) 47  48  49 def _cookiePool(url):        #查看cookie池 50     cookie = cookielib.CookieJar() 51     opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie)) 52     opener.open(url) 53     for item in cookie: 54         print Name = +item.name 55         print Value = http://www.mamicode.com/+item.value 56  57  58 def catchPage(url=‘‘):        #封装的网页页面获取 59     if not url: 60         return False 61  62     with open("./logs/outprint.txt","a") as f: 63         f.write(url+"\n") 64  65     try: 66         headers = { 67             User-Agent:Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6, 68             Referer:http://www.baidu.com, 69             "X-Forwarded-For":getRanIp() 70         } 71         req = urllib2.Request( 72             url = url, 73             headers = headers 74         ) 75  76         html = ‘‘ 77         result = ‘‘ 78         try: 79             try: 80                 gevent.Timeout 81             except: 82                 result = urllib2.urlopen(req,timeout=20) 83             else: 84                 with gevent.Timeout(20, False): 85                     result = urllib2.urlopen(req) 86         except urllib2.HTTPError, e: 87             #For Ptyhon 2.6 88             try: 89                 socket.timeout 90             except: 91                 print The server couldn\‘t fulfill the request. 92                 print "url:{0} Httperrorcode:{1}".format(url, e.code) 93             else: 94                 if isinstance(e.reason, socket.timeout): 95                     print The server couldn\‘t fulfill the request. 96                     print "url:{0} Httperrorcode:{1}".format(url, e.code) 97         except urllib2.URLError, e: 98             print We failed to reach a server. 99             print "url:{0} Reason:{1}".format(url, e.reason)100         except socket.timeout, e:101             #For Python 2.7102             print The server couldn\‘t fulfill the request.103             print "url:{0} Httperrorcode:{1}".format(url, e)104         else:105             if result:106                 html = result.read()107         return html108     except:109         try:110             socket.timeout111         except:112             print The server couldn\‘t fulfill the request.113             print "url:{0} Httperrorcode:{1}".format(url, timeout)114         else:115             print The server couldn\‘t fulfill the request.116             print "url:{0} Server someting error".format(url)117         return False118 119 120 def opensqlitedb():        #从sqlite数据源开始工作121     db_file = ./data/mobile_area.db122 123     if not os.path.exists(db_file):124         try:125             cx = sqlite3.connect(db_file)126             cu = cx.cursor()127             #建表128             sql = "create table mobile_area (id integer primary key,129                                         mobile_num integer,130                                         mobile_area varchar(50) NULL,131                                         mobile_type varchar(50) NULL,132                                         area_code varchar(50) NULL,133                                         post_code varchar(50) NULL)"134             cu.execute(sql)135         except:136             print "can not find sqlite db file\n"137             with open(./logs/errorlog.txt,a) as f:138                 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file))139             return False140     else:141         try:142             cx = sqlite3.connect(db_file)143             cu = cx.cursor()144         except:145             print "can not find sqlite db file\n"146             with open(./logs/errorlog.txt,a) as f:147                 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file))148             return False149 150     mobile_err_list,mobile_dict = [],{}151     limit = 10000152     offset = 0153     mobile_num_pre = 0154     while 1:155         cu.execute("SELECT * FROM mobile_area ORDER BY mobile_num ASC LIMIT %d OFFSET %d " % (limit, offset))156         rs = cu.fetchall()157         if not rs:158             break159         else:160             offset = offset + limit161             for i in xrange(0,len(rs)):162                 id = rs[i][0]163                 mobile_num = int(rs[i][1])164                 mobile_area = rs[i][2]165                 mobile_type = rs[i][3]166                 area_code = rs[i][4]167                 post_code = rs[i][5]168 169                 if len(mobile_area) > 100 or (not mobile_area)  or (not mobile_num) or len(mobile_type) > 100 or len(area_code) > 100 or len(post_code) > 100 or len(str(mobile_num)) > 7:170                     print "error id:%d" % id171                     continue172 173                 #正确的号码入字典174                 mobile_dict[str(mobile_num)] = True175 176     print "get data from sqlite works down!\n"177     return mobile_dict178 179 180 @multi_run_wrapper181 def getNumPage(segnum=‘‘, num=‘‘, url=‘‘):        #获取号码页详细数据182     if not segnum:183         return False184     if not num:185         return False186     if not url:187         return False188 189     gevent.sleep(random.randint(10,22)*0.81)    #从此处协程并发190 191     db_file = ./data/mobile_area.db192 193     html = catchPage(url)194     if not html:195         print "catch %s num page error!" % num196         print "url:%s\n" % (url)197         with open("./logs/errornum.txt", "a") as f:198             f.write(segnum+,+num+,+url+"\n")199         return False200 201     #json数据202     try:203         page_temp_dict = json.loads(unquote(html))204     except:205         print segnum+,+num+,+url+",result error convert to dict\n"206         with open(./logs/errorlog.txt,a) as f:207             f.write(segnum+,+num+,+url+",result error convert to dict\n")208         return False209     else:210         try:211             cx = sqlite3.connect(db_file)212             cu = cx.cursor()213         except:214             print "can not find sqlite db file\n"215             with open(./logs/errorlog.txt,a) as f:216                 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file))217             return False218 219         insdata =http://www.mamicode.com/ {}220         #mobile_num221         if page_temp_dict.get(Mobile, False):222             insdata[mobile_num] = int(page_temp_dict[Mobile])223         else:224             with open(./logs/errorlog.txt,a) as f:225                 f.write(segnum+,+num+,+url+",No matching data\n")226             return False    #无号码227         #mobile_area228         if page_temp_dict.get(Province, False):229             if page_temp_dict[Province] == u未知:230                 with open(./logs/errorlog.txt,a) as f:231                     f.write(segnum+,+num+,+url+",province is weizhi\n")232                 return False    #无地区233             if page_temp_dict.get(City, False):234                 insdata[mobile_area] = strtodecode(page_temp_dict[Province]+ +page_temp_dict[City])235             else:236                 insdata[mobile_area] = strtodecode(page_temp_dict[Province]+ +page_temp_dict[Province])237         else:238             with open(./logs/errorlog.txt,a) as f:239                 f.write(segnum+,+num+,+url+",No matching province\n")240             return False    #无地区241         #mobile_type242         if page_temp_dict.get(Corp, False):243             if page_temp_dict.get(Card, False):244                 insdata[mobile_type] = strtodecode(page_temp_dict[Corp]+ +page_temp_dict[Card])245             else:246                 insdata[mobile_type] = strtodecode(page_temp_dict[Corp])247         #area_code248         if page_temp_dict.get(AreaCode, False):249             insdata[area_code] = strtodecode(page_temp_dict[AreaCode])250         #post_code251         if page_temp_dict.get(PostCode, False):252             insdata[post_code] = strtodecode(page_temp_dict[PostCode])253 254         if insdata:255             sql = "insert into mobile_area values (?,?,?,?,?,?)"256             cu.execute(sql, (None,insdata[mobile_num],insdata[mobile_area],insdata[mobile_type],insdata[area_code],insdata[post_code]))257 258             try:259                 cx.commit()        #执行insert260             except:261                 with open(./logs/errorlog.txt,a) as f:262                     f.write(segnum+,+num+,+url+",insert sqlitdb faild\n")263                 return False264             else:265                 return True266 267 def getneednum(url=‘‘, step=10):        #获取所有未记录的号码信息数据268     if not lacknumlist:269         return False270     if not url:271         return False272     if not step:273         print "step can not be null"274         return False275     if not isinstance(step,int):276         print "step should be numeric"277         return False278     if step < 0:279         print "step should be > 0"280         return False281 282     offset = 0283     limit = int(step)284     len_max = len(lacknumlist)285     breaktag = False286     while 1:287         if breaktag:288             break289 290         threads = []291         for i in xrange(offset,(limit+offset)):292             try:293                 num = lacknumlist[i]294             except:295                 breaktag = True296                 break297             else:298                 furl = url()299                 threads.append( gevent.spawn(getNumPage, (num[0:3], num, furl+num)) )        #协程并发300 301         try:302             gevent.joinall(threads)303             print "%d-%d is end\n" % (offset+1,limit+offset)304         except Exception as e:305             print "Gevent catch error\n"306 307         offset = offset + limit308         time.sleep(random.randint(5,80)*0.9)309 310     i = 1                                     #处理网络异常号码数据10次311     while i <= 10:312         if not os.path.exists("./logs/errornum.txt"):313             break314         j = 1315         threads = []316         with open("./logs/errornum.txt","r") as f:317             while 1:318                 if (j >= step) and threads:319                     try:320                         gevent.joinall(threads)321                     except Exception as e:322                         print "turn%d-%d Gevent catch error\n" % (i,j)323                     time.sleep(random.randint(5,80)*0.9)324                     threads = []325                     j = 0326                 line = f.readline()327                 if line:328                     errnum_str = line.strip()329                     errnum_truple = errnum_str.split(,)330                     threads.append(gevent.spawn(getNumPage, (errnum_truple[0], errnum_truple[1], errnum_truple[2])))331                 else:332                     if threads:333                         try:334                             gevent.joinall(threads)335                         except Exception as e:336                             print "turn%d-%d Gevent catch error\n" % (i,j)337                     break338                 j += 1339 340         if i < 10:341             with open("./logs/errornum.txt","w") as f:        #清除文件内容342                 pass343         i = i + 1344 345 346 def setneednum(num=‘‘, mobile_dict={}):        #设置得到所有未补全的号码347     if not num:348         return False349 350     if len(str(num))==3:351         start_num = int(num+0000)352         end_num = int(num+9999)353     else:354         num_list = num.split(-)355         start_num = int(num_list[0])356         end_num = int(num_list[1])357 358     i = start_num359     while i <= end_num:360         if not mobile_dict.get(str(i),False):        #查找没有的号码361             lacknumlist.append(str(i))362         i += 1363     # print "%s num works down\n" % num364 365 366 def setsegnum(segnumlist=[], mobile_dict={}):        #根据号段起并发进程367     if not segnumlist:368         return False369 370     record = []371     for seg in xrange(0, len(segnumlist)):372         segnum = segnumlist[seg].strip()373         if len(str(segnum)) == 3:        #指定的单个号段:137374             try:375                 int(segnum)376             except:377                 print "%s is illegal argument\n" % str(segnum)378                 continue379             else:380                 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict))381                 process.start()382                 record.append(process)383         elif len(str(segnum)) == 7:        #具体指定的单个号码:1391234384             if not mobile_dict.get(str(segnum),False):385                 lacknumlist.append(str(segnum)) #sqlite没有的号码386         else:387             segparam_list = segnum.split(-)388             try:389                 int(segparam_list[0])390             except:391                 print "%s is illegal argument\n" % str(segnum)392                 continue393             else:394                 try:395                     segparam_list[1]396                 except:397                     print "%s is illegal argument\n" % str(segnum)398                     continue399                 else:400                     if segparam_list[0][:3] == segparam_list[1][:3] :        #指定号码范围:1380000-1389999401                         process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict))402                         process.start()403                         record.append(process)404                     else:405                         print "%s is illegal argument\n" % str(segnum)406                         continue407     for process in record:408         process.join()409 410     print "all SegNum prepare works down!\n"411 412 413 def callback_url_showji():        #返回showji网的api地址414     showji = http://api.showji.com/Locating/www.showji.c.o.m.aspx?output=json415     return "{0}&timestamp={1}&m=".format(showji, int(time.time()))416 417 418 def main(param=‘‘):        #主方法419     with open("./logs/errornum.txt","w") as f:        #清除零时文件内容420         pass421     with open("./logs/outprint.txt","w") as f:422         pass423 424     if not param:425         print "no argument!"426         return False427 428     # segnumlist = [\429     #             # ‘134‘,‘135‘,‘136‘,‘137‘,‘138‘,‘139‘,‘147‘,‘150‘,‘151‘,‘152‘,‘157‘,‘158‘,‘159‘,‘182‘,‘183‘,‘187‘,‘188‘,\430     #             # ‘130‘,‘131‘,‘132‘,‘136‘,‘145‘,‘185‘,‘186‘,\431     #             # ‘133‘,‘153‘,‘180‘,‘189‘,\432     #             # ‘147‘,‘155‘,‘156‘,‘170‘,‘176‘,‘177‘,‘178‘,‘181‘,‘184‘\433     #             ]434 435     segnumlist = str(param).split(,)436 437     #从sqlite库查已有的438     mobile_dict = opensqlitedb()439 440     #算哪些是还没有的441     setsegnum(segnumlist, mobile_dict)442     if lacknumlist:443         tempstr = ‘‘444         for i in xrange(0,len(lacknumlist)):445             tempstr += str(lacknumlist[i])+"\n"446         with open("./logs/needmobilelist.txt","w") as f:447             f.write(tempstr)448 449     #补没有的450     getneednum(callback_url_showji)451 452     print "all works end!"453 454 455 if __name__ == "__main__":456     from optparse import OptionParser457     USAGE = "usage:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]"458     parser = OptionParser(USAGE)459     parser.add_option("-s", dest="s")460     opt,args = parser.parse_args()461     judopt = lambda x:x.s462 463     if not opt.s:464         print USAGE465         sys.exit(1)466 467     if not judopt(opt):468         print USAGE469         sys.exit(1)470 471     if opt.s:472         content = opt.s473 474     main(content)

如果你看的仔细一定会发现我在加了这样两行:

import socketsocket.setdefaulttimeout(20)

这是为了兼容python2.6以下版本urllib2的timeout无法正常生效。而且在gevent异步非阻塞方式下urllib2的阻塞方式需要改用gevent.Timeout()替代。

基于gevent全国手机号段spider蜘蛛爬虫