首页 > 代码库 > 基于gevent全国手机号段spider蜘蛛爬虫
基于gevent全国手机号段spider蜘蛛爬虫
首先介绍下python异步执行,python有两种方法编写异步代码:
1、corutines协程(也称为greenlets)
2、回调
gevent是greenlets的一种实现方式,可以通过pip方便的安装gevent模块。gevent执行方式实际上是代码块的交替执行,具体的可以看下这篇blog,我就不重复造轮子了。
值得一提的是,gevent封装了很多接口,其中一个是著名的猴子补丁monkey,
from gevent import monkeymonkey.patch_all()
这两行就可以在代码中改变其余包的行为,让其从同步阻塞方式变为异步非阻塞方式,非常的神奇。
我利用gevent的异步非阻塞方式写了一个手机号段蜘蛛爬虫,目前一直在服务器稳定的运行,代码详见我的github,内有福利。脚本用法:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]
1 #!/usr/bin/python 2 #-*- coding:utf-8 -*- 3 """手机号段爬虫:接收用户命令参数精简版 for sqlitedb 4 @version:1.0 5 @author:Kenny{Kenny.F<mailto:kennyffly@gmail.com>} 6 @since:2014/05/23 7 """ 8 import sys 9 reload(sys) 10 sys.setdefaultencoding(‘utf8‘) 11 import gevent #gevent协程包 12 import multiprocessing #多进程 13 from multiprocessing import Manager 14 import urllib2 15 from urllib import unquote,quote 16 import socket 17 socket.setdefaulttimeout(20) 18 import cookielib 19 import random 20 import simplejson as json 21 import os 22 import time 23 import sqlite3 #sqlite数据库操作 24 from functools import wraps #方法工具 25 from strtodecode import strtodecode #编码检测转换 26 27 28 manager = Manager() #多进程共享队列 29 lacknumlist = manager.list() 30 31 32 def multi_run_wrapper(func): #多进程map包裹参数 33 @wraps(func) 34 def newF(args): 35 if isinstance(args,list): 36 return func(*args) 37 elif isinstance(args,tuple): 38 return func(*args) 39 else: 40 return func(args) 41 return newF 42 43 44 def getRanIp(): #得到随机IP 45 #123.125.40.255 - 123.127.134.56 北京联通154938条 46 return "123.{0}.{1}.{2}".format(random.randint(125,127), random.randint(40,134), random.randint(56,255)) 47 48 49 def _cookiePool(url): #查看cookie池 50 cookie = cookielib.CookieJar() 51 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie)) 52 opener.open(url) 53 for item in cookie: 54 print ‘Name = ‘+item.name 55 print ‘Value = http://www.mamicode.com/‘+item.value 56 57 58 def catchPage(url=‘‘): #封装的网页页面获取 59 if not url: 60 return False 61 62 with open("./logs/outprint.txt","a") as f: 63 f.write(url+"\n") 64 65 try: 66 headers = { 67 ‘User-Agent‘:‘Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6‘, 68 ‘Referer‘:‘http://www.baidu.com‘, 69 "X-Forwarded-For":getRanIp() 70 } 71 req = urllib2.Request( 72 url = url, 73 headers = headers 74 ) 75 76 html = ‘‘ 77 result = ‘‘ 78 try: 79 try: 80 gevent.Timeout 81 except: 82 result = urllib2.urlopen(req,timeout=20) 83 else: 84 with gevent.Timeout(20, False): 85 result = urllib2.urlopen(req) 86 except urllib2.HTTPError, e: 87 #For Ptyhon 2.6 88 try: 89 socket.timeout 90 except: 91 print ‘The server couldn\‘t fulfill the request.‘ 92 print "url:{0} Httperrorcode:{1}".format(url, e.code) 93 else: 94 if isinstance(e.reason, socket.timeout): 95 print ‘The server couldn\‘t fulfill the request.‘ 96 print "url:{0} Httperrorcode:{1}".format(url, e.code) 97 except urllib2.URLError, e: 98 print ‘We failed to reach a server.‘ 99 print "url:{0} Reason:{1}".format(url, e.reason)100 except socket.timeout, e:101 #For Python 2.7102 print ‘The server couldn\‘t fulfill the request.‘103 print "url:{0} Httperrorcode:{1}".format(url, e)104 else:105 if result:106 html = result.read()107 return html108 except:109 try:110 socket.timeout111 except:112 print ‘The server couldn\‘t fulfill the request.‘113 print "url:{0} Httperrorcode:{1}".format(url, ‘timeout‘)114 else:115 print ‘The server couldn\‘t fulfill the request.‘116 print "url:{0} Server someting error".format(url)117 return False118 119 120 def opensqlitedb(): #从sqlite数据源开始工作121 db_file = ‘./data/mobile_area.db‘122 123 if not os.path.exists(db_file):124 try:125 cx = sqlite3.connect(db_file)126 cu = cx.cursor()127 #建表128 sql = "create table mobile_area (id integer primary key,129 mobile_num integer,130 mobile_area varchar(50) NULL,131 mobile_type varchar(50) NULL,132 area_code varchar(50) NULL,133 post_code varchar(50) NULL)"134 cu.execute(sql)135 except:136 print "can not find sqlite db file\n"137 with open(‘./logs/errorlog.txt‘,‘a‘) as f:138 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file))139 return False140 else:141 try:142 cx = sqlite3.connect(db_file)143 cu = cx.cursor()144 except:145 print "can not find sqlite db file\n"146 with open(‘./logs/errorlog.txt‘,‘a‘) as f:147 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file))148 return False149 150 mobile_err_list,mobile_dict = [],{}151 limit = 10000152 offset = 0153 mobile_num_pre = 0154 while 1:155 cu.execute("SELECT * FROM mobile_area ORDER BY mobile_num ASC LIMIT %d OFFSET %d " % (limit, offset))156 rs = cu.fetchall()157 if not rs:158 break159 else:160 offset = offset + limit161 for i in xrange(0,len(rs)):162 id = rs[i][0]163 mobile_num = int(rs[i][1])164 mobile_area = rs[i][2]165 mobile_type = rs[i][3]166 area_code = rs[i][4]167 post_code = rs[i][5]168 169 if len(mobile_area) > 100 or (not mobile_area) or (not mobile_num) or len(mobile_type) > 100 or len(area_code) > 100 or len(post_code) > 100 or len(str(mobile_num)) > 7:170 print "error id:%d" % id171 continue172 173 #正确的号码入字典174 mobile_dict[str(mobile_num)] = True175 176 print "get data from sqlite works down!\n"177 return mobile_dict178 179 180 @multi_run_wrapper181 def getNumPage(segnum=‘‘, num=‘‘, url=‘‘): #获取号码页详细数据182 if not segnum:183 return False184 if not num:185 return False186 if not url:187 return False188 189 gevent.sleep(random.randint(10,22)*0.81) #从此处协程并发190 191 db_file = ‘./data/mobile_area.db‘192 193 html = catchPage(url)194 if not html:195 print "catch %s num page error!" % num196 print "url:%s\n" % (url)197 with open("./logs/errornum.txt", "a") as f:198 f.write(segnum+‘,‘+num+‘,‘+url+"\n")199 return False200 201 #json数据202 try:203 page_temp_dict = json.loads(unquote(html))204 except:205 print segnum+‘,‘+num+‘,‘+url+",result error convert to dict\n"206 with open(‘./logs/errorlog.txt‘,‘a‘) as f:207 f.write(segnum+‘,‘+num+‘,‘+url+",result error convert to dict\n")208 return False209 else:210 try:211 cx = sqlite3.connect(db_file)212 cu = cx.cursor()213 except:214 print "can not find sqlite db file\n"215 with open(‘./logs/errorlog.txt‘,‘a‘) as f:216 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file))217 return False218 219 insdata =http://www.mamicode.com/ {}220 #mobile_num221 if page_temp_dict.get(‘Mobile‘, False):222 insdata[‘mobile_num‘] = int(page_temp_dict[‘Mobile‘])223 else:224 with open(‘./logs/errorlog.txt‘,‘a‘) as f:225 f.write(segnum+‘,‘+num+‘,‘+url+",No matching data\n")226 return False #无号码227 #mobile_area228 if page_temp_dict.get(‘Province‘, False):229 if page_temp_dict[‘Province‘] == u‘未知‘:230 with open(‘./logs/errorlog.txt‘,‘a‘) as f:231 f.write(segnum+‘,‘+num+‘,‘+url+",province is weizhi\n")232 return False #无地区233 if page_temp_dict.get(‘City‘, False):234 insdata[‘mobile_area‘] = strtodecode(page_temp_dict[‘Province‘]+‘ ‘+page_temp_dict[‘City‘])235 else:236 insdata[‘mobile_area‘] = strtodecode(page_temp_dict[‘Province‘]+‘ ‘+page_temp_dict[‘Province‘])237 else:238 with open(‘./logs/errorlog.txt‘,‘a‘) as f:239 f.write(segnum+‘,‘+num+‘,‘+url+",No matching province\n")240 return False #无地区241 #mobile_type242 if page_temp_dict.get(‘Corp‘, False):243 if page_temp_dict.get(‘Card‘, False):244 insdata[‘mobile_type‘] = strtodecode(page_temp_dict[‘Corp‘]+‘ ‘+page_temp_dict[‘Card‘])245 else:246 insdata[‘mobile_type‘] = strtodecode(page_temp_dict[‘Corp‘])247 #area_code248 if page_temp_dict.get(‘AreaCode‘, False):249 insdata[‘area_code‘] = strtodecode(page_temp_dict[‘AreaCode‘])250 #post_code251 if page_temp_dict.get(‘PostCode‘, False):252 insdata[‘post_code‘] = strtodecode(page_temp_dict[‘PostCode‘])253 254 if insdata:255 sql = "insert into mobile_area values (?,?,?,?,?,?)"256 cu.execute(sql, (None,insdata[‘mobile_num‘],insdata[‘mobile_area‘],insdata[‘mobile_type‘],insdata[‘area_code‘],insdata[‘post_code‘]))257 258 try:259 cx.commit() #执行insert260 except:261 with open(‘./logs/errorlog.txt‘,‘a‘) as f:262 f.write(segnum+‘,‘+num+‘,‘+url+",insert sqlitdb faild\n")263 return False264 else:265 return True266 267 def getneednum(url=‘‘, step=10): #获取所有未记录的号码信息数据268 if not lacknumlist:269 return False270 if not url:271 return False272 if not step:273 print "step can not be null"274 return False275 if not isinstance(step,int):276 print "step should be numeric"277 return False278 if step < 0:279 print "step should be > 0"280 return False281 282 offset = 0283 limit = int(step)284 len_max = len(lacknumlist)285 breaktag = False286 while 1:287 if breaktag:288 break289 290 threads = []291 for i in xrange(offset,(limit+offset)):292 try:293 num = lacknumlist[i]294 except:295 breaktag = True296 break297 else:298 furl = url()299 threads.append( gevent.spawn(getNumPage, (num[0:3], num, furl+num)) ) #协程并发300 301 try:302 gevent.joinall(threads)303 print "%d-%d is end\n" % (offset+1,limit+offset)304 except Exception as e:305 print "Gevent catch error\n"306 307 offset = offset + limit308 time.sleep(random.randint(5,80)*0.9)309 310 i = 1 #处理网络异常号码数据10次311 while i <= 10:312 if not os.path.exists("./logs/errornum.txt"):313 break314 j = 1315 threads = []316 with open("./logs/errornum.txt","r") as f:317 while 1:318 if (j >= step) and threads:319 try:320 gevent.joinall(threads)321 except Exception as e:322 print "turn%d-%d Gevent catch error\n" % (i,j)323 time.sleep(random.randint(5,80)*0.9)324 threads = []325 j = 0326 line = f.readline()327 if line:328 errnum_str = line.strip()329 errnum_truple = errnum_str.split(‘,‘)330 threads.append(gevent.spawn(getNumPage, (errnum_truple[0], errnum_truple[1], errnum_truple[2])))331 else:332 if threads:333 try:334 gevent.joinall(threads)335 except Exception as e:336 print "turn%d-%d Gevent catch error\n" % (i,j)337 break338 j += 1339 340 if i < 10:341 with open("./logs/errornum.txt","w") as f: #清除文件内容342 pass343 i = i + 1344 345 346 def setneednum(num=‘‘, mobile_dict={}): #设置得到所有未补全的号码347 if not num:348 return False349 350 if len(str(num))==3:351 start_num = int(num+‘0000‘)352 end_num = int(num+‘9999‘)353 else:354 num_list = num.split(‘-‘)355 start_num = int(num_list[0])356 end_num = int(num_list[1])357 358 i = start_num359 while i <= end_num:360 if not mobile_dict.get(str(i),False): #查找没有的号码361 lacknumlist.append(str(i))362 i += 1363 # print "%s num works down\n" % num364 365 366 def setsegnum(segnumlist=[], mobile_dict={}): #根据号段起并发进程367 if not segnumlist:368 return False369 370 record = []371 for seg in xrange(0, len(segnumlist)):372 segnum = segnumlist[seg].strip()373 if len(str(segnum)) == 3: #指定的单个号段:137374 try:375 int(segnum)376 except:377 print "%s is illegal argument\n" % str(segnum)378 continue379 else:380 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict))381 process.start()382 record.append(process)383 elif len(str(segnum)) == 7: #具体指定的单个号码:1391234384 if not mobile_dict.get(str(segnum),False):385 lacknumlist.append(str(segnum)) #sqlite没有的号码386 else:387 segparam_list = segnum.split(‘-‘)388 try:389 int(segparam_list[0])390 except:391 print "%s is illegal argument\n" % str(segnum)392 continue393 else:394 try:395 segparam_list[1]396 except:397 print "%s is illegal argument\n" % str(segnum)398 continue399 else:400 if segparam_list[0][:3] == segparam_list[1][:3] : #指定号码范围:1380000-1389999401 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict))402 process.start()403 record.append(process)404 else:405 print "%s is illegal argument\n" % str(segnum)406 continue407 for process in record:408 process.join()409 410 print "all SegNum prepare works down!\n"411 412 413 def callback_url_showji(): #返回showji网的api地址414 showji = ‘http://api.showji.com/Locating/www.showji.c.o.m.aspx?output=json‘415 return "{0}×tamp={1}&m=".format(showji, int(time.time()))416 417 418 def main(param=‘‘): #主方法419 with open("./logs/errornum.txt","w") as f: #清除零时文件内容420 pass421 with open("./logs/outprint.txt","w") as f:422 pass423 424 if not param:425 print "no argument!"426 return False427 428 # segnumlist = [\429 # # ‘134‘,‘135‘,‘136‘,‘137‘,‘138‘,‘139‘,‘147‘,‘150‘,‘151‘,‘152‘,‘157‘,‘158‘,‘159‘,‘182‘,‘183‘,‘187‘,‘188‘,\430 # # ‘130‘,‘131‘,‘132‘,‘136‘,‘145‘,‘185‘,‘186‘,\431 # # ‘133‘,‘153‘,‘180‘,‘189‘,\432 # # ‘147‘,‘155‘,‘156‘,‘170‘,‘176‘,‘177‘,‘178‘,‘181‘,‘184‘\433 # ]434 435 segnumlist = str(param).split(‘,‘)436 437 #从sqlite库查已有的438 mobile_dict = opensqlitedb()439 440 #算哪些是还没有的441 setsegnum(segnumlist, mobile_dict)442 if lacknumlist:443 tempstr = ‘‘444 for i in xrange(0,len(lacknumlist)):445 tempstr += str(lacknumlist[i])+"\n"446 with open("./logs/needmobilelist.txt","w") as f:447 f.write(tempstr)448 449 #补没有的450 getneednum(callback_url_showji)451 452 print "all works end!"453 454 455 if __name__ == "__main__":456 from optparse import OptionParser457 USAGE = "usage:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]"458 parser = OptionParser(USAGE)459 parser.add_option("-s", dest="s")460 opt,args = parser.parse_args()461 judopt = lambda x:x.s462 463 if not opt.s:464 print USAGE465 sys.exit(1)466 467 if not judopt(opt):468 print USAGE469 sys.exit(1)470 471 if opt.s:472 content = opt.s473 474 main(content)
如果你看的仔细一定会发现我在加了这样两行:
import socketsocket.setdefaulttimeout(20)
这是为了兼容python2.6以下版本urllib2的timeout无法正常生效。而且在gevent异步非阻塞方式下urllib2的阻塞方式需要改用gevent.Timeout()替代。
基于gevent全国手机号段spider蜘蛛爬虫
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。