首页 > 代码库 > 学习 Tornado

学习 Tornado

异步编程

Future

弄清楚 Future 是什么。

异步结果的占位符

Future 用法

future 完成后的回调处理

from tornado.concurrent import Futuredef async_fetch_future(url):    http_client = AsyncHTTPClient()    my_future = Future()    fetch_future = http_client.fetch(url)    fetch_future.add_done_callback(        lambda self: my_future.set_result(self.result()))    return my_future  

 add_done_callback 方法

调用传入的函数,参数是它自己

    def add_done_callback(self, fn):        """Attaches the given callback to the `Future`.        It will be invoked with the `Future` as its argument when the Future        has finished running and its result is available.  In Tornado        consider using `.IOLoop.add_future` instead of calling        `add_done_callback` directly.        """        if self._done:            fn(self)        else:            self._callbacks.append(fn)

 上面这个回调处理的意思是,当 fetch_future 完成后,将 fetch_future 的结果储存到 my_future 里面,最后返回 my_future。

gen.coroutine

@gen.coroutine 中 yield a_future 就会返回 a_future 的结果,上面的例子写成这样

from tornado import gen@gen.coroutinedef fetch_coroutine(url):    http_client = AsyncHTTPClient()    response = yield http_client.fetch(url)    raise gen.Return(response.body)

@gen.coroutine的原理

A function containing yield is a generator. All generators are asynchronous; when called they return a generator object instead of running to completion. The @gen.coroutine decorator communicates with the generator via the yield expressions, and with the coroutine’s caller by returning a Future.

Here is a simplified version of the coroutine decorator’s inner loop:

# Simplified inner loop of tornado.gen.Runnerdef run(self):    # send(x) makes the current yield return x.    # It returns when the next yield is reached    future = self.gen.send(self.next)    def callback(f):        self.next = f.result()        self.run()    future.add_done_callback(callback)

The decorator receives a Future from the generator, waits (without blocking) for that Future to complete, then “unwraps” the Future and sends the result back into the generator as the result of the yield expression. Most asynchronous code never touches the Future class directly except to immediately pass the Future returned by an asynchronous function to a yield expression.

简化版本并不是很清晰。比如,@gen.coroutine 是与被修饰函数中的每个 yield 交互吗?下面是更详细的源码:

def _make_coroutine_wrapper(func, replace_callback):    """The inner workings of ``@gen.coroutine`` and ``@gen.engine``.    The two decorators differ in their treatment of the ``callback``    argument, so we cannot simply implement ``@engine`` in terms of    ``@coroutine``.    """    # On Python 3.5, set the coroutine flag on our generator, to allow it    # to be used with ‘await‘.    wrapped = func    if hasattr(types, ‘coroutine‘):        func = types.coroutine(func)    @functools.wraps(wrapped)    def wrapper(*args, **kwargs):        future = TracebackFuture()        if replace_callback and ‘callback‘ in kwargs:            callback = kwargs.pop(‘callback‘)            IOLoop.current().add_future(                future, lambda future: callback(future.result()))        try:            result = func(*args, **kwargs)        except (Return, StopIteration) as e:            result = _value_from_stopiteration(e)        except Exception:            future.set_exc_info(sys.exc_info())            return future        else:            if isinstance(result, GeneratorType):                # Inline the first iteration of Runner.run.  This lets us                # avoid the cost of creating a Runner when the coroutine                # never actually yields, which in turn allows us to                # use "optional" coroutines in critical path code without                # performance penalty for the synchronous case.                try:                    orig_stack_contexts = stack_context._state.contexts                    yielded = next(result)                    if stack_context._state.contexts is not orig_stack_contexts:                        yielded = TracebackFuture()                        yielded.set_exception(                            stack_context.StackContextInconsistentError(                                ‘stack_context inconsistency (probably caused ‘                                ‘by yield within a "with StackContext" block)‘))                except (StopIteration, Return) as e:                    future.set_result(_value_from_stopiteration(e))                except Exception:                    future.set_exc_info(sys.exc_info())                else:                    _futures_to_runners[future] = Runner(result, future, yielded)                yielded = None                try:                    return future                finally:                    # Subtle memory optimization: if next() raised an exception,                    # the future‘s exc_info contains a traceback which                    # includes this stack frame.  This creates a cycle,                    # which will be collected at the next full GC but has                    # been shown to greatly increase memory usage of                    # benchmarks (relative to the refcount-based scheme                    # used in the absence of cycles).  We can avoid the                    # cycle by clearing the local variable after we return it.                    future = None        future.set_result(result)        return future    wrapper.__wrapped__ = wrapped    wrapper.__tornado_coroutine__ = True    return wrapper

if replace_callback and ‘callback‘ in kwargs:

  通过 IOLoop.add_future(future, callback) 将 future 加入 IOLoop

 

通过 @gen.coroutine,将被修饰函数的结果放入 future 中(如果被修饰函数的结果是 generator 类型,对这个 generator 调用 next,然后将结果放入 future 中。)

没有将其放入 IOLoop 中吗? 

try:
  result = func(*args, **kwargs)

func 有什么要求,不能阻塞?

result 是 future ?

 

future 嵌套

@gen.coroutinedef divide(x, y):    return x / ydef bad_call():    # This should raise a ZeroDivisionError, but it won‘t because    # the coroutine is called incorrectly.    divide(1, 0)@gen.coroutinedef good_call():    # yield will unwrap the Future returned by divide() and raise    # the exception.    yield divide(1, 0)

 上面的例子中,good_call 返回的也是一个 future。  

Most asynchronous functions in Tornado return a Future; yielding this object returns its result

为什么要这样做?@gen.coroutine 把结果放到 future 中,yield 把结果中 future 中拿出来,两者不是抵消了吗?

另外,我返现返回了一个 future 的函数,加上 @gen.coroutine 和 yield 还会出错。比如 motor count。

什么情况下使用 @gen.coroutine 和 yield 或者 @gen.coroutine 和 return?

 

 

我发现 Python2.7.3 可以在 @gen.coroutine 中使用 return

@gen.coroutinedef divide(x, y):    return x / y>>> divide(1, 1).result()>>> 1

 使用 raise gen.Return  

@gen.coroutinedef divide2(x, y):    raise gen.Return(x/y)divide2(1, 1).result()Out[16]: 1

  

回调

Interaction with callbacks
To interact with asynchronous code that uses callbacks instead of Future, wrap the call in a Task. This will add the callback argument for you and return a Future which you can yield:

@gen.coroutinedef call_task():    # Note that there are no parens on some_function.    # This will be translated by Task into    # some_function(other_args, callback=callback)    yield gen.Task(some_function, other_args)

 上面的调用是什么意思?

gen.coroutine 做了什么

Any generator that yields objects from this module must be wrapped in either this decorator or engine.

Functions with this decorator return a Future.

当函数完成,将结果储存在 Future 中。使用 yield future 就能得到结果。

 

为什么 get, post 需要这个装饰器?

get, post 最后一步,一般都是 self.write()

 

下面两个函数效果是一样:

@gen.coroutinedef f():    yield function_that_return_future()def f():    return function_that_return_future()

 最后返回的都是 Future。我们选择哪种形式呢?

带有 @gen.coroutine 是更好的,读代码的时候知道这个函数会返回一个 future。  

 In Tornado they are normally used with IOLoop.add_future or by yielding them in a gen.coroutine

gen.coroutine 和 asynchronous 

gen.coroutine 返回一个 future,调用 yield 等待其完成; 

asynchronous 完成后,需要一个 callback 函数处理。

 

阻塞

To minimize the cost of concurrent connections, Tornado uses a single-threaded event loop. This means that all application code should aim to be asynchronous and non-blocking because only one operation can be active at a time.--http://www.tornadoweb.org/en/stable/guide/async.html#asynchronous-and-non-blocking-i-o

上面写应用的代码需要异步和非阻塞,为什么又有一个调用阻塞函数的办法,用在什么地方?

The simplest way to call a blocking function from a coroutine is to use a ThreadPoolExecutor, which returns Futures that are compatible with coroutines:

来源:http://www.tornadoweb.org/en/stable/guide/coroutines.html#calling-blocking-functions 

thread_pool = ThreadPoolExecutor(4)@gen.coroutinedef call_blocking():    yield thread_pool.submit(blocking_func, args)

 根据 Python Tornado - Confused how to convert a blocking function into a non-blocking function:

If the blocking function is CPU-bound (as your for/xrange example is), then threads (or processes) are the only way to make it non-blocking. Creating a thread per incoming request is expensive, but making a small ThreadPoolExecutor to handle all CPU-bound operations is not.

To make a function non-blocking without using threads, the function must be event-driven: it must be waiting on some external event (such as network I/O) so that it can be awoken when that event occurs.

 

  上面两个都是返回 future,与 coroutine 交互是否都需要返回 future?为什么?

根据上文,future 是一 Future 的 instance,在函数运行完后的回调中,设置函数的结果为 future 的一个属性。如果使用了 gen.coroutine 与 yield,那么 yield 会返回函数的结果而不是 future。我们可以从 future 中得到结果,那么为什么不直接返回结果呢?

处理阻塞代码的方法

1. Find a coroutine-friendly equivalent. 

For time.sleep, use tornado.gen.sleep instead

2. Find a callback-based equivalent. 

the Tornado wiki can be useful to find suitable libraries.

class CoroutineTimeoutHandler(RequestHandler):    @gen.coroutine    def get(self):        io_loop = IOLoop.current()        for i in range(5):            print(i)            yield gen.Task(io_loop.add_timeout, io_loop.time() + 1)

add_timeout:

IOLoop.add_timeout(deadline, callback, *args, **kwargs)

在 deadline 这个时间点运行 callback 函数  

gen.Task: 

Adapts a callback-based asynchronous function for use in coroutines.

Takes a function (and optional additional arguments) and runs it with those arguments plus a callback keyword argument. The argument passed to the callback is returned as the result of the yield expression.

yield gen.Task(io_loop.add_timeout, io_loop.time() + 1) 中哪里有 callback ?

这个 callback 是 gen.Task 自己加入的,不是我们传入的。作用是返回 Future。

3. Run the blocking code on another thread.

 MongoDB 异步

使用 motor 使 MongoDB 异步。

一个请求就建立一个 MotorClient 还是多个请求公用一个 MotorClient ?

只建立一个与 MongoDB 的连接: 

It is a common mistake to create a new client object for every request; this comes at a dire performance cost. Create the client when your application starts and reuse that one client for the lifetime of the process, as shown in these examples.

来源:https://motor.readthedocs.io/en/stable/tutorial-tornado.html#tornado-application-startup-sequence

 

关闭连接

为什么使用 tornado.web.asynchronous 时,write() 之后连接不会自动关闭,而使用 @tornado.gen.coroutine 就会自动关闭?

日志

tornado with sentry 

异常处理

所有的异常都返回 json 形式,通过自定义 write_error

If a handler raises an exception, Tornado will call RequestHandler.write_error to generate an error page. tornado.web.HTTPError can be used to generate a specified status code; all other exceptions return a 500 status.

 

运行与部署

为什么 Django 需要 WSGI 而 Tornado 不需要?

部署方式:

只开一个;

开 CPU 核心数目个应用,并监听不同端口,然后使用 Nginx 负载均衡;

通过命令行或配置文件运行

tornado.options 是一个全局对象

1. 定义需要的 options,比如 host 表示需要监听的主机、port 表示需要监听的端口号,定义之后就会添加到全局对象 tornado.options 中;

2. 调用 parse_command_line,把命令行中传入的值传入到对应的 tornado.options 中;

3. 引用 tornado.options。

  在新建 Application 时引用,比如

  

tornado.web.Application(urls, default_host=tornado.options.host)

  在 handlers 中引用。因为 tornado.options 是全局变量。

Tornado Server 配置

一般是使用 

application = tornado.web.Application(urls, default_host=options.host)application.listen(options.port, xheaders=True, max_body_size=MAX_BODY_SIZE,)

application.listen 相当于

server = HTTPServer(app)server.listen(port)IOLoop.current().start()

 所以 listen() 的参数与 HTTPServer 的参数一样。

HTTPServer

xheader

If xheaders is True, we support the X-Real-Ip/X-Forwarded-For and X-Scheme/X-Forwarded-Proto headers, which override the remote IP and URI scheme/protocol for all requests. 

这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。

ssl_options

 

HTTPServer 支持 HTTP1ConnectionParameters 参数。

HTTP1ConnectionParameters

no_keep_alive (bool) – If true, always close the connection after one request.
chunk_size (int) – how much data to read into memory at once
max_header_size (int) – maximum amount of data for HTTP headers
header_timeout (float) – how long to wait for all headers (seconds)
max_body_size (int) – maximum amount of data for body
body_timeout (float) – how long to wait while reading body (seconds)
decompress (bool) – if true, decode incoming Content-Encoding: gzip

IOLoop

IOLoop.current() and IOLoop.instance()

IOLoop.instance()

Most applications have a single, global IOLoop running on the main thread. Use this method to get this instance from another thread. In most other cases, it is better to use current() to get the current thread’s IOLoop.

IOLoop.current(instance=True)

返回当前线程的 IOLoop

If an IOLoop is currently running or has been marked as current by make_current, returns that instance. If there is no current IOLoop, returns IOLoop.instance() (i.e. the main thread’s IOLoop, creating one if necessary) if instance is true.

总结

是一个全局的变量。如果你有多个线程,要所有的线程公用一个 IOLoop,那么使用 IOLoop.instance()。

In general you should use IOLoop.current as the default when constructing an asynchronous object, and use IOLoop.instance when you mean to communicate to the main thread from a different one.

文档中提到,少量的情况下你需要使用多个 IOLoop,比如每个线程一个 IOLoop,那么如何做到呢?

The vast majority of Tornado apps should have only one IOLoop, running in the main thread. You can run multiple HTTPServers (or other servers) on the same IOLoop.

It is possible to create multiple IOLoops and give each one its own thread, but this is rarely useful, because the GIL ensures that only one thread is running at a time. If you do use multiple IOLoops you must be careful to ensure that the different threads only communicate with each other through thread-safe methods (i.e. IOLoop.add_callback).

 

安全和验证

使用 IP 白名单

在 application.listen() 中设置 xheader=True,这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。

我选择覆盖 RequestHandler.prepare 方法来实现。

学习 Tornado