首页 > 代码库 > 学习 Tornado
学习 Tornado
异步编程
Future
弄清楚 Future 是什么。
异步结果的占位符
Future 用法
future 完成后的回调处理
from tornado.concurrent import Futuredef async_fetch_future(url): http_client = AsyncHTTPClient() my_future = Future() fetch_future = http_client.fetch(url) fetch_future.add_done_callback( lambda self: my_future.set_result(self.result())) return my_future
add_done_callback 方法
调用传入的函数,参数是它自己
def add_done_callback(self, fn): """Attaches the given callback to the `Future`. It will be invoked with the `Future` as its argument when the Future has finished running and its result is available. In Tornado consider using `.IOLoop.add_future` instead of calling `add_done_callback` directly. """ if self._done: fn(self) else: self._callbacks.append(fn)
上面这个回调处理的意思是,当 fetch_future 完成后,将 fetch_future 的结果储存到 my_future 里面,最后返回 my_future。
gen.coroutine
@gen.coroutine 中 yield a_future 就会返回 a_future 的结果,上面的例子写成这样
from tornado import gen@gen.coroutinedef fetch_coroutine(url): http_client = AsyncHTTPClient() response = yield http_client.fetch(url) raise gen.Return(response.body)
@gen.coroutine的原理
A function containing
yield
is a generator. All generators are asynchronous; when called they return a generator object instead of running to completion. The@gen.coroutine
decorator communicates with the generator via theyield
expressions, and with the coroutine’s caller by returning aFuture
.Here is a simplified version of the coroutine decorator’s inner loop:
# Simplified inner loop of tornado.gen.Runnerdef run(self): # send(x) makes the current yield return x. # It returns when the next yield is reached future = self.gen.send(self.next) def callback(f): self.next = f.result() self.run() future.add_done_callback(callback)
The decorator receives a
Future
from the generator, waits (without blocking) for thatFuture
to complete, then “unwraps” theFuture
and sends the result back into the generator as the result of theyield
expression. Most asynchronous code never touches theFuture
class directly except to immediately pass theFuture
returned by an asynchronous function to ayield
expression.
简化版本并不是很清晰。比如,@gen.coroutine 是与被修饰函数中的每个 yield 交互吗?下面是更详细的源码:
def _make_coroutine_wrapper(func, replace_callback): """The inner workings of ``@gen.coroutine`` and ``@gen.engine``. The two decorators differ in their treatment of the ``callback`` argument, so we cannot simply implement ``@engine`` in terms of ``@coroutine``. """ # On Python 3.5, set the coroutine flag on our generator, to allow it # to be used with ‘await‘. wrapped = func if hasattr(types, ‘coroutine‘): func = types.coroutine(func) @functools.wraps(wrapped) def wrapper(*args, **kwargs): future = TracebackFuture() if replace_callback and ‘callback‘ in kwargs: callback = kwargs.pop(‘callback‘) IOLoop.current().add_future( future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, GeneratorType): # Inline the first iteration of Runner.run. This lets us # avoid the cost of creating a Runner when the coroutine # never actually yields, which in turn allows us to # use "optional" coroutines in critical path code without # performance penalty for the synchronous case. try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( ‘stack_context inconsistency (probably caused ‘ ‘by yield within a "with StackContext" block)‘)) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: future.set_exc_info(sys.exc_info()) else: _futures_to_runners[future] = Runner(result, future, yielded) yielded = None try: return future finally: # Subtle memory optimization: if next() raised an exception, # the future‘s exc_info contains a traceback which # includes this stack frame. This creates a cycle, # which will be collected at the next full GC but has # been shown to greatly increase memory usage of # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. future = None future.set_result(result) return future wrapper.__wrapped__ = wrapped wrapper.__tornado_coroutine__ = True return wrapper
if replace_callback and ‘callback‘ in kwargs:
通过 IOLoop.add_future(future, callback) 将 future 加入 IOLoop
通过 @gen.coroutine,将被修饰函数的结果放入 future 中(如果被修饰函数的结果是 generator 类型,对这个 generator 调用 next,然后将结果放入 future 中。)
没有将其放入 IOLoop 中吗?
try:
result = func(*args, **kwargs)
func 有什么要求,不能阻塞?
result 是 future ?
future 嵌套
@gen.coroutinedef divide(x, y): return x / ydef bad_call(): # This should raise a ZeroDivisionError, but it won‘t because # the coroutine is called incorrectly. divide(1, 0)@gen.coroutinedef good_call(): # yield will unwrap the Future returned by divide() and raise # the exception. yield divide(1, 0)
上面的例子中,good_call 返回的也是一个 future。
Most asynchronous functions in Tornado return a Future
; yielding this object returns its result
为什么要这样做?@gen.coroutine 把结果放到 future 中,yield 把结果中 future 中拿出来,两者不是抵消了吗?
另外,我返现返回了一个 future 的函数,加上 @gen.coroutine 和 yield 还会出错。比如 motor count。
什么情况下使用 @gen.coroutine 和 yield 或者 @gen.coroutine 和 return?
我发现 Python2.7.3 可以在 @gen.coroutine 中使用 return
@gen.coroutinedef divide(x, y): return x / y>>> divide(1, 1).result()>>> 1
使用 raise gen.Return
@gen.coroutinedef divide2(x, y): raise gen.Return(x/y)divide2(1, 1).result()Out[16]: 1
回调
Interaction with callbacks
To interact with asynchronous code that uses callbacks instead of Future, wrap the call in a Task. This will add the callback argument for you and return a Future which you can yield:
@gen.coroutinedef call_task(): # Note that there are no parens on some_function. # This will be translated by Task into # some_function(other_args, callback=callback) yield gen.Task(some_function, other_args)
上面的调用是什么意思?
gen.coroutine 做了什么
Any generator that yields objects from this module must be wrapped in either this decorator or
engine.
Functions with this decorator return a
Future.
当函数完成,将结果储存在 Future 中。使用 yield future 就能得到结果。
为什么 get, post 需要这个装饰器?
get, post 最后一步,一般都是 self.write()
下面两个函数效果是一样:
@gen.coroutinedef f(): yield function_that_return_future()def f(): return function_that_return_future()
最后返回的都是 Future。我们选择哪种形式呢?
带有 @gen.coroutine 是更好的,读代码的时候知道这个函数会返回一个 future。
In Tornado they are normally used with
IOLoop.add_future
or by yielding them in agen.coroutine
gen.coroutine 和 asynchronous
gen.coroutine 返回一个 future,调用 yield 等待其完成;
asynchronous 完成后,需要一个 callback 函数处理。
阻塞
To minimize the cost of concurrent connections, Tornado uses a single-threaded event loop. This means that all application code should aim to be asynchronous and non-blocking because only one operation can be active at a time.--http://www.tornadoweb.org/en/stable/guide/async.html#asynchronous-and-non-blocking-i-o
上面写应用的代码需要异步和非阻塞,为什么又有一个调用阻塞函数的办法,用在什么地方?
The simplest way to call a blocking function from a coroutine is to use a
ThreadPoolExecutor
, which returnsFutures
that are compatible with coroutines:来源:http://www.tornadoweb.org/en/stable/guide/coroutines.html#calling-blocking-functions
thread_pool = ThreadPoolExecutor(4)@gen.coroutinedef call_blocking(): yield thread_pool.submit(blocking_func, args)
根据 Python Tornado - Confused how to convert a blocking function into a non-blocking function:
If the blocking function is CPU-bound (as your for/xrange example is), then threads (or processes) are the only way to make it non-blocking. Creating a thread per incoming request is expensive, but making a small ThreadPoolExecutor to handle all CPU-bound operations is not.
To make a function non-blocking without using threads, the function must be event-driven: it must be waiting on some external event (such as network I/O) so that it can be awoken when that event occurs.
上面两个都是返回 future,与 coroutine 交互是否都需要返回 future?为什么?
根据上文,future 是一 Future 的 instance,在函数运行完后的回调中,设置函数的结果为 future 的一个属性。如果使用了 gen.coroutine 与 yield,那么 yield 会返回函数的结果而不是 future。我们可以从 future 中得到结果,那么为什么不直接返回结果呢?
处理阻塞代码的方法
1. Find a coroutine-friendly equivalent.
For time.sleep
, use tornado.gen.sleep
instead
2. Find a callback-based equivalent.
the Tornado wiki can be useful to find suitable libraries.
class CoroutineTimeoutHandler(RequestHandler): @gen.coroutine def get(self): io_loop = IOLoop.current() for i in range(5): print(i) yield gen.Task(io_loop.add_timeout, io_loop.time() + 1)
add_timeout:
IOLoop.add_timeout(deadline, callback, *args, **kwargs)
在 deadline 这个时间点运行 callback 函数
gen.Task:
Adapts a callback-based asynchronous function for use in coroutines.
Takes a function (and optional additional arguments) and runs it with those arguments plus a
callback
keyword argument. The argument passed to the callback is returned as the result of the yield expression.
yield gen.Task(io_loop.add_timeout, io_loop.time() + 1) 中哪里有 callback ?
这个 callback 是 gen.Task 自己加入的,不是我们传入的。作用是返回 Future。
3. Run the blocking code on another thread.
MongoDB 异步
使用 motor 使 MongoDB 异步。
一个请求就建立一个 MotorClient 还是多个请求公用一个 MotorClient ?
只建立一个与 MongoDB 的连接:
It is a common mistake to create a new client object for every request; this comes at a dire performance cost. Create the client when your application starts and reuse that one client for the lifetime of the process, as shown in these examples.
来源:https://motor.readthedocs.io/en/stable/tutorial-tornado.html#tornado-application-startup-sequence
关闭连接
为什么使用 tornado.web.asynchronous
时,write() 之后连接不会自动关闭,而使用 @tornado.gen.coroutine 就会自动关闭?
日志
tornado with sentry
异常处理
所有的异常都返回 json 形式,通过自定义 write_error
If a handler raises an exception, Tornado will call
RequestHandler.write_error
to generate an error page.tornado.web.HTTPError
can be used to generate a specified status code; all other exceptions return a 500 status.
运行与部署
为什么 Django 需要 WSGI 而 Tornado 不需要?
部署方式:
只开一个;
开 CPU 核心数目个应用,并监听不同端口,然后使用 Nginx 负载均衡;
通过命令行或配置文件运行
tornado.options 是一个全局对象
1. 定义需要的 options,比如 host 表示需要监听的主机、port 表示需要监听的端口号,定义之后就会添加到全局对象 tornado.options 中;
2. 调用 parse_command_line,把命令行中传入的值传入到对应的 tornado.options 中;
3. 引用 tornado.options。
在新建 Application 时引用,比如
tornado.web.Application(urls, default_host=tornado.options.host)
在 handlers 中引用。因为 tornado.options 是全局变量。
Tornado Server 配置
一般是使用
application = tornado.web.Application(urls, default_host=options.host)application.listen(options.port, xheaders=True, max_body_size=MAX_BODY_SIZE,)
application.listen 相当于
server = HTTPServer(app)server.listen(port)IOLoop.current().start()
所以 listen() 的参数与 HTTPServer 的参数一样。
HTTPServer
xheader
If
xheaders
isTrue
, we support theX-Real-Ip
/X-Forwarded-For
andX-Scheme
/X-Forwarded-Proto
headers, which override the remote IP and URI scheme/protocol for all requests.
这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。
ssl_options
HTTPServer 支持 HTTP1ConnectionParameters 参数。
HTTP1ConnectionParameters
no_keep_alive (bool) – If true, always close the connection after one request.
chunk_size (int) – how much data to read into memory at once
max_header_size (int) – maximum amount of data for HTTP headers
header_timeout (float) – how long to wait for all headers (seconds)
max_body_size (int) – maximum amount of data for body
body_timeout (float) – how long to wait while reading body (seconds)
decompress (bool) – if true, decode incoming Content-Encoding: gzip
IOLoop
IOLoop.current() and IOLoop.instance()
IOLoop.instance()
Most applications have a single, global
IOLoop
running on the main thread. Use this method to get this instance from another thread. In most other cases, it is better to usecurrent()
to get the current thread’sIOLoop
.
IOLoop.current(instance=True)
返回当前线程的 IOLoop
If an
IOLoop
is currently running or has been marked as current bymake_current
, returns that instance. If there is no currentIOLoop
, returnsIOLoop.instance()
(i.e. the main thread’sIOLoop
, creating one if necessary) ifinstance
is true.
总结
是一个全局的变量。如果你有多个线程,要所有的线程公用一个 IOLoop,那么使用 IOLoop.instance()。
In general you should use
IOLoop.current
as the default when constructing an asynchronous object, and useIOLoop.instance
when you mean to communicate to the main thread from a different one.
文档中提到,少量的情况下你需要使用多个 IOLoop,比如每个线程一个 IOLoop,那么如何做到呢?
The vast majority of Tornado apps should have only one IOLoop, running in the main thread. You can run multiple HTTPServers (or other servers) on the same IOLoop.
It is possible to create multiple IOLoops and give each one its own thread, but this is rarely useful, because the GIL ensures that only one thread is running at a time. If you do use multiple IOLoops you must be careful to ensure that the different threads only communicate with each other through thread-safe methods (i.e. IOLoop.add_callback).
安全和验证
使用 IP 白名单
在 application.listen() 中设置 xheader=True,这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。
我选择覆盖 RequestHandler.prepare 方法来实现。
学习 Tornado