首页 > 代码库 > 【译】在Flask中使用Celery

【译】在Flask中使用Celery

为了在后台运行任务,我们可以使用线程(或者进程)。
使用线程(或者进程)的好处是保持处理逻辑简洁。但是,在需要可扩展的生产环境中,我们也可以考虑使用Celery代替线程。
 

Celery是什么?

Celery是个异步分布式任务队列。
通过Celery在后台跑任务并不像用线程那么的简单,但是用Celery的话,能够使应用有较好的可扩展性,因为Celery是个分布式架构。下面介绍Celery的三个核心组件。
  1. 生产者(Celery client)。生产者(Celery client)发送消息。在Flask上工作时,生产者(Celery client)在Flask应用内运行。
  2. 消费者(Celery workers)。消费者用于处理后台任务。消费者(Celery client)可以是本地的也可以是远程的。我们可以在运行Flask的server上运行一个单一的消费者(Celery workers),当业务量上涨之后再去添加更多消费者(Celery workers)。
  3. 消息传递者(message broker)。生产者(Celery client)和消费者(Celery workers)的信息的交互使用的是消息队列(message queue)。Celery支持若干方式的消息队列,其中最常用的是RabbitMQ和Redis.
话不多说上代码先。代码中包含两个例子:异步发送邮件;开始一或多个异步工作,然后在网页上更新其进度。
 

Flask结合Celery

Flask与Celery结合极其简单,无需其他扩展。一个使用Celery的Flask应用的初始化过程如下:通过创建Celery类的对象,完成Celery的初始化。创建Celery对象时,需要传递应用的名称以及消息传递者(message broker)的URL。
from flask import Flaskfrom celery import Celery app = Flask(__name__)app.config[CELERY_BROKER_URL] = redis://localhost:6379/0app.config[CELERY_RESULT_BACKEND] = redis://localhost:6379/0 celery = Celery(app.name, broker=app.config[CELERY_BROKER_URL])celery.conf.update(app.config)
其中的URL参数告诉了Celery,消息传递的服务的位置。如果消息传递者用的不是Redis,或者Redis部署在其他机器,那么需要做适当的改变。
 
而通过调用  celery.conf.update() 方法,我们能够为Celery同步Flask上的配置。
仅当需要Celery存储状态即存储结果时, CELERY_RESULT_BACKEND  选项才会被用到。
下文第一个例子不需要存储状态以及存储结果,但是第二个例子是需要的,所以一次配置好。
 
任何想要在后台运行的任务,都需要使用装饰者celery.task 进行包装,如下。
@celery.taskdef my_background_task(arg1, arg2):     # some long running task here     return result 
现在Flask 应用就能够发起“在后台执行任务”的请求了,如下。
task = my_background_task.delay(10, 20)

其中 delay() 方法是 apply_async() 的快捷调用。

 

此处用apply_async()同样奏效,如下。

task = my_background_task.apply_async(args=[10, 20])
相比于 delay() 方法,当使用 apply_async() 方法时,我们能够对后台任务的执行方式有更多的控制。例如任务在何时执行等。
 
举例来说,下面的代码可以让任务在一分钟之后开始运行。
task = my_background_task.apply_async(args=[10, 20], countdown=60)
  delay() 和 apply_async() 的返回值是一个 AsyncResult 的对象。通过该对象,能够获得任务的状态。
一些其他的配置选项不再叙述。
 

例一:异步发邮件

第一个例子的需求比较广泛:发电子邮件的时候无需阻塞主应用线程。
本例使用了扩展Flask-Mail。
 
网页包含了一个Text类型的域的表单。用户需要在其中输入邮箱地址,点击提交,然后服务器向该地址发送一封测试邮件。该表单包含两个提交按钮,其中一个会立即发送邮件,而另一个会在点击后延迟一分钟后再发送。html代码如下。
<html>  <head>    <title>Flask + Celery Examples</title>  </head>  <body>    <h1>Flask + Celery Examples</h1>    <h2>Example 1: Send Asynchronous Email</h2>    {% for message in get_flashed_messages() %}    <p style="color: red;">{{ message }}</p>    {% endfor %}    <form method="POST">      <p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>      <input type="submit" name="submit" value="Send">      <input type="submit" name="submit" value="Send in 1 minute">    </form>  </body></html>

 

用于发送邮件的Flask-Mail需要一些配置,主要与发送邮件的邮件服务器、发送邮件时间相关。
考虑到用户名密码安全性,作者将其放到了环境变量中。
# Flask-Mail configurationapp.config[MAIL_SERVER] = smtp.googlemail.comapp.config[MAIL_PORT] = 587app.config[MAIL_USE_TLS] = Trueapp.config[MAIL_USERNAME] = os.environ.get(MAIL_USERNAME)app.config[MAIL_PASSWORD] = os.environ.get(MAIL_PASSWORD)app.config[MAIL_DEFAULT_SENDER] = flask@example.com

 

异步发送代码如下。

@app.route(/, methods=[GET, POST])def index():    if request.method == GET:        return render_template(index.html, email=session.get(email, ‘‘))    email = request.form[email]    session[email] = email    # send the email    msg = Message(Hello from Flask,                  recipients=[request.form[email]])    msg.body = This is a test email sent from a background Celery task.    if request.form[submit] == Send:        # send right away        send_async_email.delay(msg)        flash(Sending email to {0}.format(email))    else:        # send in one minute        send_async_email.apply_async(args=[msg], countdown=60)        flash(An email will be sent to {0} in one minute.format(email))    return redirect(url_for(index))
用 session 将用户键入的信息保存,以便页面刷新时能够使用该信息。
朋友们发现了,重点在发送邮件的代码,使用的是Celery 的任务send_async_email,通过调用它的  delay()  方法或 apply_async() 进行异步发送。
 
最后来看异步任务代码。
@celery.taskdef send_async_email(msg):    """Background task to send an email with Flask-Mail."""    with app.app_context():        mail.send(msg)

 

使用装饰者 celery.task 包装  send_async_email , 使其成为后台运行的任务。因为Flask-Mail需要应用的context,所以需要在调用send方法前先创建应用的context环境。

另一点很重要,从异步调用的返回值是不会保存的,所以应用本身无法知道是否异步调用是否成功。在这个例子之中需要看Celery的消费者的输出才能确定发送邮件过程是否有问题。

 第一个例子比较简单,我们起了后台任务然后就不必再去管它了。很多应用的需求与例子一相仿。
 
然而也会有一些应用,需要监控后台任务的运行,获得任务的结果。下面来看第二个例子。
 

例二:显示状态更新进度

用户可以点击按钮以启动一个或者多个长时间任务,此时在网页使用ajax技术不断轮询服务器以更新所有的这些长时间任务们的状态。
而对于每一个长时间任务,网页上会有一个窗台条、一个进度百分比、一个状态消息与之对应,当完成时会显示相应结果。
 
状态更新时后台任务代码。
@celery.task(bind=True)def long_task(self):    """Background task that runs a long function with progress reports."""    verb = [Starting up, Booting, Repairing, Loading, Checking]    adjective = [master, radiant, silent, harmonic, fast]    noun = [solar array, particle reshaper, cosmic ray, orbiter, bit]    message = ‘‘    total = random.randint(10, 50)    for i in range(total):        if not message or random.random() < 0.25:            message = {0} {1} {2}....format(random.choice(verb),                                              random.choice(adjective),                                              random.choice(noun))        self.update_state(state=PROGRESS,                          meta={current: i, total: total,                                status: message})        time.sleep(1)    return {current: 100, total: 100, status: Task completed!,            result: 42}

代码中作者在Celery 装饰者中加入了  bind=True 参数,这使得Celery向函数中传入了self参数,因此在函数中能够记录状态更新。

本例中随机挑选了一些单词作为状态的更新,同时,选取随机数作为每个后台任务运行时间。
 self.update_state() 方法用于指明 Celery如何接收任务更新。
Celery有很多内建状态比如 STARTED ,  SUCCESS 等等,当然Celery也允许程序员自定义状态。本例子中使用的是自定义状态, PROGRESS 。与 PROGRESS 一起的还有 metadata 。 metadata 是一个字典,包含当
前进度,任务大小,以及消息。
当循环跳出时,返回字典,字典中包含任务的执行结果。
 
 long_task() 函数在 Celery消费者进程中运行。下面看一下Flask应用如何启动该后台任务。
@app.route(/longtask, methods=[POST])def longtask():    task = long_task.apply_async()    return jsonify({}), 202, {Location: url_for(taskstatus,                                                  task_id=task.id)}

用户需要向 /longtask 发送 POST 请求以触发后台任务执行。服务器启动任务并存储返回值。作者使用了状态码202,在REST API中有“请求正在处理中”的意思,而加入了Location头则是为了生产者能够获取任务执行时的状态信息。url_for用于生成路由到taskstatus函数的url,并且该url包含task id,task id的值是 task.id .

 

taskstatus 函数用于获取后台任务的更新状态。

@app.route(/status/<task_id>)def taskstatus(task_id):    task = long_task.AsyncResult(task_id)    if task.state == PENDING:        // job did not start yet        response = {            state: task.state,            current: 0,            total: 1,            status: Pending...        }    elif task.state != FAILURE:        response = {            state: task.state,            current: task.info.get(current, 0),            total: task.info.get(total, 1),            status: task.info.get(status, ‘‘)        }        if result in task.info:            response[result] = task.info[result]    else:        # something went wrong in the background job        response = {            state: task.state,            current: 1,            total: 1,            status: str(task.info),  # this is the exception raised        }    return jsonify(response)

为了得到后台任务产生的数据,使用了task id作为参数创建了一个task 对象。

本函数产生了JSON响应,JSON响应中的内容与 update_state() 更新的一致。

我们使用 task.state 区分后台任务的状态:本例有未运行、未发生错误、发生错误三种状态。
我们使用 task.info 访问任务相关信息。而发生错误时, task.state 的状态是 FAILURE 时,异常会包含在 task.info 之中。
 

前端JS代码

作者用的是nanobar.js实现进度条,用了jQuery的ajax。
<script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
<
script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
 
启动后台任务的按钮的JS代码如下。
 function start_long_task() {        // add task status elements         div = $(‘<div class="progress"><div></div><div>0%</div><div>...</div><div>&nbsp;</div></div><hr>‘);        $(‘#progress‘).append(div);        // create a progress bar        var nanobar = new Nanobar({            bg: ‘#44f‘,            target: div[0].childNodes[0]        });        // send ajax POST request to start background job        $.ajax({            type: ‘POST‘,            url: ‘/longtask‘,            success: function(data, status, request) {                status_url = request.getResponseHeader(‘Location‘);                update_progress(status_url, nanobar, div[0]);            },            error: function() {                alert(‘Unexpected error‘);            }        });    }

 

其中被加入的HTML元素与任务的信息的对应关系如下。

<div class="progress">    <div></div>         <-- Progress bar    <div>0%</div>       <-- Percentage    <div>...</div>      <-- Status message    <div>&nbsp;</div>   <-- Result</div><hr>
start_long_task() 函数通过ajax向 /longtask 发送POST请求,使得后台任务开始运行。
当ajax的POST请求返回时,回调函数获得响应,响应中包含形如 /status/<task_id>的url, 其他函数(如 update_progress )用此url从 taskstatus 函数获取数据。
调用函数 update_progress() ,向函数传入 start_url 以及 nanoba r变量,用于生成进度条。
 
 update_progress 函数向/status/<task_id>发送GET请求,获得json数据然后更新相应的页面元素。
   function update_progress(status_url, nanobar, status_div) {        // send GET request to status URL        $.getJSON(status_url, function(data) {            // update UI            percent = parseInt(data[‘current‘] * 100 / data[‘total‘]);            nanobar.go(percent);            $(status_div.childNodes[1]).text(percent + ‘%‘);            $(status_div.childNodes[2]).text(data[‘status‘]);            if (data[‘state‘] != ‘PENDING‘ && data[‘state‘] != ‘PROGRESS‘) {                if (‘result‘ in data) {                    // show result                    $(status_div.childNodes[3]).text(‘Result: ‘ + data[‘result‘]);                }                else {                    // something unexpected happened                    $(status_div.childNodes[3]).text(‘Result: ‘ + data[‘state‘]);                }            }            else {                // rerun in 2 seconds                setTimeout(function() {                    update_progress(status_url, nanobar, status_div);                }, 2000);            }        });    }

当后台任务完成时,result会加载到页面之中。如果没有result的话,这就意味着任务的执行以失败告终,此时任务的状态是 FAILURE 。

任当后台任务运行时,为了能够持续获得任务状态并更新页面,作者使用了定时器,定时器每个两秒一更新直到后台任务完成。
 

运行例子

读者先安装好virtualenv(强烈推荐!但是virtualenv非必需安装)。
下载代码,安装相应库,如下。
1 $ git clone https://github.com/miguelgrinberg/flask-celery-example.git2 $ cd flask-celery-example3 $ virtualenv venv4 $ source venv/bin/activate5 (venv) $ pip install -r requirements.txt

未安装virtualenv的话直接跳过第三行第四行命令。

 
redis server端读者自行安装。安装后运行启动。
Celery 消费者也需要读者运行,使用 celery命令。
邮件用户名密码自行设置。
$ export MAIL_USERNAME=<your-mail-username>$ export MAIL_PASSWORD=<your-mail-password>$ source venv/bin/activate(venv) $ celery worker -A app.celery --loglevel=info

Celery的 -A选项是应用中的celer对象,与文章最开头的代码对应。

 --loglevel=info 则是让日志内容更为详细。
 
最后启动应用。
$ source venv/bin/activate(venv) $ python app.py

访问http://localhost:5000/ 即可。

 
对原文做了修改。
欢迎各位拍砖,欢迎交流!
原文链接:http://blog.miguelgrinberg.com/post/using-celery-with-flask
【翻译结束】
 
--------
翻译版本:0.1, 2015-01-29
来自:鱼香唐僧 [:3]
--------
拓展阅读
 
关于消息传递者(message broker)
消息传递者从生产者接受消息,并将消息路由到消费者上。消息传递者包含一个exchange和一个或者多个queue. 
 
Exchange
生产者只能通过exchange将消息发送到队列上。
exchange有类型(type),类型决定了发送消息到队列的方式。有如下类型:direct, 
 
Queue
消息队列或任务队列作为存储消息的缓冲区。
 
Bindings
Bindings是一些规则,通过这些规则exchange将消息分发给各个queue.
也就是说,将某一个exchange和某一个queue关联起来,可称之为一个binding.
 
Routing Keys
Bindings可以有一些可选的路由关键字。
 
从生产者(Celery client)发送的消息头包含四部分内容:
  • 待执行的任务的名字
  • task id
  • 任务要用到的参数
  • 其他元数据,比如retries, eta, expires,他们的含义详见Task.apply_async(from celery import Task),不再细说。
在RabbitMq中"Message Queue会在Message不能被正常消费时将其缓存起来,但是当Consumer与Message Queue之间的连接通畅时,Message Queue将Message转发给Consumer"[1],不知道Celery中的实现是不是跟RabbitMq有相似的。
最后推荐了解AMQP。
[1]RabbitMQ源码解析前奏--AMQP协议
 

【译】在Flask中使用Celery