首页 > 代码库 > tornado 学习之异步调用详解

tornado 学习之异步调用详解

本文和大家分享的主要是tornado 中异步调用相关内容,一起来看看吧,希望对大家学习python有所帮助。

一般来说,阻塞是绝对的,非阻塞则是相对的,因为任何指令或调用的执行都要占用 CPU 周期,或网络,或 IO。非阻塞只是说调用或者资源消耗不影响后续逻辑执行,他们经得起等待。非阻塞往往和异步一起出现。

tornado 是一个异步非阻塞 httpserver,同时也是一个 web framework

Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed . By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections, making it ideal for long polling , WebSockets , and other applications that require a long-lived connection to each user

 tornado 中异步和非阻塞是通过 ioloop 和 Future 实现的,Future 是借助 python 的协程来实现非阻塞调用。tornado 提供了不同的异步调用形式来适配不同的调用场景,本文的目的就是为了说明不同形式是如何使用的以及它们的差别。

为了演示方便,这里使用了一个公共模块 util.py 来提供 ioloop 的启动和销毁,下文不做特殊说明皆是如此。

# util.py import tornado.ioloop

COUNTER = 0

def stop_loop(times):

global COUNTER

COUNTER += 1

if COUNTER == times:

tornado.ioloop.IOLoop.instance().stop()

print(’====> ioloop end’)

def start_loop():

print(’====> ioloop start’)

ioloop = tornado.ioloop.IOLoop.current()

ioloop.start()

最常用的调用方式

 tornado 中最普遍的使用方式是把函数的调用结果封装成 Future,利用 ioloop 执行并异步获得结果。

# -*- coding:utf-8 -*-from tornado.httpclient import AsyncHTTPClientfrom tornado.concurrent import Futurefrom tornado import gen

from util import stop_loop, start_loop

# fetch 返回的是 futuredef asyn_fetch_future(url):

http_client = AsyncHTTPClient()

return http_client.fetch(url)

def asyn_fetch_future_callback(future):

result = future.result()

print(’future_callback’)

print(result.request.url, result.code, result.reason, result.request_time)

stop_loop(1)

if __name__ == ’__main__’:

result_future = asyn_fetch_future(’http://www.apple.com/cn/’) #1

result_future.add_done_callback(asyn_fetch_future_callback)

start_loop()

 #1 处通过tornado 提供的异步 httpclient 获得一个 Future,为 Future 添加执行结果回调函数获得执行结果,Future 的执行结果也是一个 Future

输出结果如下

====> ioloop start

future_callback

(’http://www.apple.com/cn/’, 200, ’OK’, 0.5546278953552246)

====> ioloop end

0.71 real 0.06 user 0.06 sys

这种调用方式必须手动启动 ioloop 并在调用结束之后手动销毁 ioloop,想要获得调用结果必须为 Future 添加回调。

只关心调用,不在乎结果

有时候我们并不在乎函数的调用结果,只要函数正确执行即可,ioloop 提供了 spawn_callback 来执行这样的操作:

# -*- coding:utf-8 -*-import tornado.genimport tornado.ioloop

from util import start_loop, stop_loop

@tornado.gen.coroutinedef divide(x, y):

return x / y

if __name__ == ’__main__’:

# The IOLoop will catch the exception and print a stack trace in

# the logs. Note that this doesn’t look like a normal call, since

# we pass the function object to be called by the IOLoop.

tornado.ioloop.IOLoop.current().spawn_callback(divide, 1, 0)

tornado.ioloop.IOLoop.current().add_timeout(

tornado.ioloop.time.time() + 1, stop_loop, 1)

start_loop()

借助 spawn_callback 可以直接执行函数调用,但是依然需要手动处理 ioloop 的开闭。

一次性执行

run_sync 可以自动开启 ioloop 并在函数执行结束之后关闭 ioloop,此种调用在执行一次性操作时非常有用,比如要对数据库进行一次性修改:

@tornado.gen.coroutinedef init_tag():

tb_tag = Tag() #1

for i in range(10):

result = yield tb_tag.insert({’name’: random.choice(’abcdefghjklpoiuytrewq’)})

print(result)

if __name__ == ’__main__’:

tornado.ioloop.IOLoop.current().run_sync(lambda: init_tag())

#1 是一个 motor Mongodb collection,借助 run_sync 可以非常方便对数据库执行操作,由于run_sync 只接受一个参数,如果函数调用需要传参,可以把函数封装成 lambda

使用 callback

如果调用需要与 callback 函数进行交互,可以利用 gen.task

# -*- coding:utf-8 -*-import tornado.genimport tornado.ioloop

from util import start_loop, stop_loop

@tornado.gen.coroutinedef call_task():

result = yield tornado.gen.Task(some_function, 1, 2)

raise tornado.gen.Return(result)

def fetch_coroutine_callback(future):

print(’coroutine callback ==> ’, future.result())

stop_loop(1)

def some_function(x, y, callback=None):

print(’some_function called’)

callback(x * y)

if __name__ == ’__main__’:

future = call_task()

future.add_done_callback(fetch_coroutine_callback)

start_loop()

使用 gen.task 可以直接返回一个 Future ,被调用的函数会自动增加一个 callback 函数,用来在函数执行结束之后把结果进行回调通知。

通过 threadPool 来调用阻塞操作

#-*- coding:utf-8 -*-from tornado.concurrent import Futurefrom tornado import genimport timefrom concurrent.futures import ThreadPoolExecutor

from util import start_loop, stop_loop

EXECUTOR = ThreadPoolExecutor(max_workers=4)

def fetch_coroutine_callback(future):

print(’coroutine callback’)

print(future.result())

stop_loop(2)

def sleep_func(t):

print(’sleep_func call’)

time.sleep(t)

return ’blocking func result’

@gen.coroutinedef blocking():

result = yield EXECUTOR.submit(sleep_func, *(4, ))

raise gen.Return(result)

@gen.coroutinedef not_blocking():

future = Future()

future.set_result(’not blocking func result’)

result = yield future

raise gen.Return(result)

if __name__ == ’__main__’:

"""

用多线程的方式借助协程实现异步调用

"""

blocking().add_done_callback(fetch_coroutine_callback)

not_blocking().add_done_callback(fetch_coroutine_callback)

start_loop()

如果代码中有阻塞操作,可以借助 ThreadPoolExecutor 来完成异步的调用,这样把耗时的操作交给别的线程,可以使当前的线程继续执行后续的操作。这里的阻塞操作一般是 IO 或者其他系统调用。

并行执行

tornado 的 Future 是支持并行执行的,可以对多个 future 进行 yield 操作并返回多个结果。

# -*- coding:utf-8 -*-from tornado.httpclient import AsyncHTTPClientfrom tornado.concurrent import Futurefrom tornado import gen

from util import stop_loop, start_loop

@gen.coroutinedef fetch_many_coroutine(urls):

http_client = AsyncHTTPClient()

response1, response2 = yield [http_client.fetch(urls[0]), http_client.fetch(urls[1])]

raise gen.Return([response1, response2])

@gen.coroutinedef fetch_coroutine(urls):

http_client = AsyncHTTPClient()

responses = yield [http_client.fetch(url) for url in urls]

raise gen.Return(responses)

def fetch_coroutine_callback(future):

print(’coroutine callback’)

for result in future.result():

print(

result.request.url,

result.code, result.reason, result.request_time)

stop_loop(2)

if __name__ == ’__main__’:

"""

使用gen.coroutine 可以很方便让包含 yield 的函数返回future

"""

result_future = fetch_coroutine([’https://baidu.com’, ’https://baidu.com’])

result_future.add_done_callback(fetch_coroutine_callback)

result_future = fetch_many_coroutine([’https://baidu.com’, ’https://baidu.com’])

result_future.add_done_callback(fetch_coroutine_callback)

start_loop()

 

来源:三月沙

tornado 学习之异步调用详解