首页 > 代码库 > gevent动态随时添加任务

gevent动态随时添加任务

关于爬虫,有scrapy框架,也有requests加协程 协程 进程的方法。

相关的包很多,比如threading 、threadpool、multiprocessing,还有threadpoolexecutor、processpoolexecutor这两个。

协程gevent pool的用法和threadpool 很像,但关于使用协程我需要一个像threadpoolexecutor一样的强大方法,可以随时追加任务的。

 

# -*- coding: utf-8 -*-
import gevent,time,redis
from gevent.pool import Pool
from gevent.queue import JoinableQueue
from gevent import monkey
monkey.patch_all(time=True)
class GeventPoolExecutor(object):
    def __init__ (self,max_works):
        self._q = JoinableQueue()

        for i in range(max_works):
            gevent.spawn(self.worker)
        self._q.join()

    def worker(self):
        while True:
            fn= self._q.get()
            try:
                exec(fn)
            except Exception,e:
                print e
            finally:
                pass
                self._q.task_done()

    def submit(self,fn):
        self._q.put(fn)

def func(a,b):
    print a*10,b+100
    time.sleep(2)

if __name__=="__main__":                   ###测试

    gevent_pool_executor=GeventPoolExecutor(max_works=3)
    while(1):

        #print gevent_pool_executor.q.qsize()
        if gevent_pool_executor._q.qsize()<5:
            functionx=redis.Redis(host=127.0.0.1, port=6379, db=0).blpop(ceshi)[1]
       #functionx是一个字符串,内容例如为func(1,2),但是是个字符串,在worker中用exec来执行他 gevent_pool_executor.submit(functionx)
else: time.sleep(0.001)

 

另一个文件

#coding=utf-8

r=redis.Redis(host=‘127.0.0.1‘, port=6379, db=0)

for i in range(100,2000):

   #r.rpush(‘seeds‘, json.dumps(dictx))
r.rpush(‘ceshi‘,‘func(%s,%s)‘%(i,i))
print ‘func(%s,%s)‘%(i,i)

 

gevent动态随时添加任务