首页 > 代码库 > Tornado异步任务的实现之一

Tornado异步任务的实现之一

前言

刚开始使用用tornado时都很迷惑:tornado不是标榜异步非阻塞解决10K问题的嘛?
但为什么我在handler中有一个步骤是耗时时,整体的响应就慢下了呢?
是不是torando根本就不好用。
其实是你用错了,因为你没有将你的耗时任务使用tornado的异步功能实现。

下面就各种torndo响应请求,进行耗时任务处理时的各种异步场景一一总结

一、异步HTTP请求的实现

tornado是一个异步web framework,说是异步,是因为tornado server与client的网络交互是异步的,底层基于io event loop。
但是如果client请求server处理的handler里面有一个阻塞的耗时操作,那么整体的server性能就会下降。
比如: 访问一个耗时的网站请求 www.douban.com/search, 这个结果要在5秒后才返回值。 
当我访问的话,肯定是要等5秒钟,这时候,要是有别的客户要连接的别的页面,不堵塞的页面,
你猜他能马上显示吗?不能的。。。 他也是要等当前这个5秒延迟过后,才能访问的。

幸运的是,tornado提供了一套异步机制,方便我们实现自己的异步操作。
当handler处理需要进行其余的网络操作的时候,tornado提供了一个AsyncHTTPClient来支持异步。
示例代码如下:

import json
import tornado.web
import tornado.gen

def MainHandler(tornado.web.RequestHandler):
     def initialize(self):  
         ... 


     @tornado.web.asynchronous
     @tornado.gen.coroutine
     def get(self):
         url = "www.douban.com/search/"
         data = http://www.mamicode.com/{}
         data[‘title‘] = ‘fury‘
         ...
         client = tornado.httpclient.AsyncHTTPClient()
         response = yield client.fetch(url, method=POST, body=json.dumps(data))
         self.on_response(response)

     def on_response(self, resp):
         body = json.loads(resp.body)
         if body == None:
             self.write(‘error‘)
         else:
             self.write(body)
         return

注意:
1. 只在每个响应处理的函数(如get, post, put,delete)前加修饰器@tornado.web.asynchronous 和  @tornado.gen.coroutine,
   不要在初始函数 initialize(self)前加修饰器,不然会造成各种程序错误
2. 在 on_response(self, resp)函数中,如果使用了 self.wirte()或self.render()函数,就不要使用 self.finish()函数,
   不然,会报异常:
       raise RuntimeError("finish() called twice.  May be caused "
       RuntimeError: finish() called twice.  May be caused by using async operations without the @asynchronous decorator.
   如果没有用self.wirte()或self.render()函数,就要使用self.finish()函数

上面的例子,主要有几个变化:
.  使用asynchronous decorator,它主要设置_auto_finish为false,
   这样handler的get函数返回的时候tornado就不会关闭与client的连接。
.  使用AsyncHttpClient,fetch的时候提供callback函数,
   这样当fetch http请求完成的时候才会去调用on_response,而不会阻塞。
.  on_response调用完成之后通过finish结束与client的连接。

更详细的用法可以参见:
http://demo.pythoner.com/itt2zh/ch5.html

二、异步数据库处理

2.1 支持mongoDB的模块

1. PyMongo

PyMongo基本使用
http://api.mongodb.org/python/current/tutorial.html
.  引用PyMongo
>>> import pymongo
.  创建连接Connection
>>> import pymongo
>>> conn = pymongo.Connection(‘localhost‘,27017)

>>> from pymongo import Connection
>>> conn = Connection(‘localhost‘,27017)
.  创建Connection时,指定host及port参数
>>> import pymongo
>>> conn = pymongo.Connection(host=‘127.0.0.1‘,port=27017)
.  连接数据库
>>> db = conn.ChatRoom

>>> db = conn[‘ChatRoom‘]
.  连接聚集
>>> account = db.Account
或 
>>> account = db["Account"]
.  查看全部聚集名称
>>> db.collection_names()
.  查看聚集的一条记录
>>> db.Account.find_one()
>>> db.Account.find_one({"UserName":"keyword"})
.  查看聚集的字段 
>>> db.Account.find_one({},{"UserName":1,"Email":1})
{u‘UserName‘: u‘libing‘, u‘_id‘: ObjectId(‘4ded95c3b7780a774a099b7c‘), u‘Email‘: u‘libing@35.cn‘} 
>>> db.Account.find_one({},{"UserName":1,"Email":1,"_id":0})
{u‘UserName‘: u‘libing‘, u‘Email‘: u‘libing@35.cn‘}
.  查看聚集的多条记录
>>> for item in db.Account.find():
        item
>>> for item in db.Account.find({"UserName":"libing"}):
        item["UserName"]
.  查看聚集的记录统计 
>>> db.Account.find().count()
>>> db.Account.find({"UserName":"keyword"}).count()
.  聚集查询结果排序 
>>> db.Account.find().sort("UserName")  --默认为升序
>>> db.Account.find().sort("UserName",pymongo.ASCENDING)   --升序
>>> db.Account.find().sort("UserName",pymongo.DESCENDING)  --降序
.  聚集查询结果多列排序
>>> db.Account.find().sort([("UserName",pymongo.ASCENDING),("Email",pymongo.DESCENDING)])
.  添加记录
>>> db.Account.insert({"AccountID":21,"UserName":"libing"})
.  修改记录
>>> db.Account.update({"UserName":"libing"},{"$set":{"Email":"libing@126.com","Password":"123"}})
.  删除记录
>>> db.Account.remove()   -- 全部删除
>>> db.Test.remove({"UserName":"keyword"})
?

2. AsyncMongo

链接 https://github.com/bitly/asyncmongo
AsyncMongo is an asynchronous library for accessing mongo which is built on the tornado ioloop.
1). Build Status
Installation
Installing: pip install asyncmongo

2). Usage
asyncmongo syntax strives to be similar to pymongo.

import asyncmongo
import tornado.web

class Handler(tornado.web.RequestHandler):
    @property
    def db(self):
        if not hasattr(self, ‘_db‘):
            self._db = asyncmongo.Client(pool_id=‘mydb‘, host=‘127.0.0.1‘, port=27017, maxcached=10, maxconnections=50, dbname=‘test‘)
        return self._db

    @tornado.web.asynchronous
    def get(self):
        self.db.users.find({‘username‘: self.current_user}, limit=1, callback=self._on_response)
        # or
        # conn = self.db.connection(collectionname="...", dbname="...")
        # conn.find(..., callback=self._on_response)
    def _on_response(self, response, error):
        if error:
            raise tornado.web.HTTPError(500)
        self.render(‘template‘, full_name=response[‘full_name‘])
3). About
Some features are not currently implemented:
directly interfacing with indexes, dropping collections
retrieving results in batches instead of all at once 
(asyncmongo‘s nature means that no calls are blocking regardless 
of the number of results you are retrieving)
tailable cursors #15

4). Requirements
The following two python libraries are required
pymongo version 1.9+ for bson library
tornado

3. Motor

http://motor.readthedocs.org/en/stable/
Motor: Asynchronous Python driver for Tornado and MongoDB
About
Motor presents a Tornado callback- or Future-based API for non-blocking access to MongoDB. 
The source is on GitHub and the docs are on ReadTheDocs.
“Motor uses a clever greenlet-based approach to fully support both synchronous and 
asynchronous interfaces from a single codebase. It’s great to see companies like 
MongoDB produce first-party asynchronous drivers for their products.”
                   —Ben Darnell, Tornado maintainer

Install with:
$ pip install motor
Post questions about Motor to the mongodb-user list on Google Groups. 
For confirmed issues or feature requests, open a case in Jira in the “MOTOR” project.

它的异步请求可以见下面的链接:
http://motor.readthedocs.org/en/stable/tutorial.html

2.2 支持MySQL的模块

1. torndb

tornado在升级过程中把数据库模块独立出来了。
模块为torndb模块。模块git地址:https://github.com/bdarnell/torndb    
官方文档地址:http://torndb.readthedocs.org/en/latest/_modules/torndb.html

安装方法:执行命令 pip install torndb ,
但是这个命令默认读取的是pypi站的索引,但是其访问不是很稳定。
所以这里可以采用豆瓣的pypi索引。命令如下:
pip install -i http://pypi.douban.com/simple/  torndb

所以以后pypi站索引无法访问的时候尝试利用豆瓣的pypi索引。

使用:
连接数据库  
import torndb
db=torndb.Connection(hostaddress,database name,user,password)

查询
查询有两种查询方式,一种为get,一种为query,get是得到一行数据。query是得到一列数据。
get返回数据为封装好的dict,query得到的数据为封装好的list,单元为dict。
>>> a=db.get(‘select * from query where id=1‘)
>>> a
{‘queryc‘: ‘dac‘, ‘id‘: 1}
>>> a=db.query(‘select * from query‘)
>>> a
[{‘id‘: 2, ‘queryc‘: ‘isca‘}, {‘id‘: 1, ‘queryc‘: ‘dac‘}]
  执行sql语句

下面的命令是无返回参数的执行sql语句的方法。
string=‘dac‘
str=‘insert into query(id,queryc)values(%d,"%s")‘%(1,string)
db.execute(exe)
MySQL命令行操作创建数据库和表的时候指定编码的命令
数据库
mysql> CREATE DATABASE IF NOT EXISTS my_db DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
数据表
mysql> CREATE TABLE my_table (name VARCHAR(20) NOT NULL) type=MyISAM DEFAULT CHARSET utf8;
连接 :
>>>db = torndb.Connection(“127.0.0.1:3306″, “test”, user=“root”, password=“admin”)
默认字符集UTF8,没必要在加上 charset = “utf8″ 。
查询:
query: 得到多行记录,单行为字典
view plaincopy在CODE上查看代码片派生到我的代码片
>>>sql = ‘SELECT * FROM test WHERE name = %s AND id < %s’
>>>db.query(sql, ‘bbb’, 11)
[{‘date’: None, ‘id’: 1L, ‘name’: u‘bbb’}, {‘date’: None, ‘id’: 2L, ‘name’: u‘bbb’}]
get: 得到单行记录
>>>sql = ‘SELECT * FROM test WHERE id = %s’
>>>db.get(sql, 2)
{‘date’: None, ‘id’: 2L, ‘name’: u‘bbb’}
insert:
>>>sql = “INSERT INTO test (id,name,date) VALUES (%s,%s,%s)”
>>>db.insert(sql, 100, “aaa”, ‘0000-01-01′)
100L
insert的参数不支持列表或元组,如果想插入列表或元组的话可以用insertmany
insertmany:
#插入单行记录
sql = “INSERT INTO test (id,name,date) VALUES (%s,%s,%s)”
db.insertmany(sql,[[200,‘bbb’,None]])
200L
#插入多行记录
db.insertmany(sql,[[300,‘bbb’,None],[400,‘bbb’,None]])
400L
db.insertmany(sql,[(301,‘bbb’,None),(401,‘bbb’,None)])
401L

Tornado异步任务的实现之一