首页 > 代码库 > 【译】在Flask中使用Celery
【译】在Flask中使用Celery
Celery是什么?
- 生产者(Celery client)。生产者(Celery client)发送消息。在Flask上工作时,生产者(Celery client)在Flask应用内运行。
- 消费者(Celery workers)。消费者用于处理后台任务。消费者(Celery client)可以是本地的也可以是远程的。我们可以在运行Flask的server上运行一个单一的消费者(Celery workers),当业务量上涨之后再去添加更多消费者(Celery workers)。
- 消息传递者(message broker)。生产者(Celery client)和消费者(Celery workers)的信息的交互使用的是消息队列(message queue)。Celery支持若干方式的消息队列,其中最常用的是RabbitMQ和Redis.
Flask结合Celery
from flask import Flaskfrom celery import Celery app = Flask(__name__)app.config[‘CELERY_BROKER_URL‘] = ‘redis://localhost:6379/0‘app.config[‘CELERY_RESULT_BACKEND‘] = ‘redis://localhost:6379/0‘ celery = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL‘])celery.conf.update(app.config)
celery.task
进行包装,如下。@celery.taskdef my_background_task(arg1, arg2): # some long running task here return result
task = my_background_task.delay(10, 20)
其中 delay() 方法是 apply_async() 的快捷调用。
此处用apply_async()同样奏效,如下。
task = my_background_task.apply_async(args=[10, 20])
task = my_background_task.apply_async(args=[10, 20], countdown=60)
例一:异步发邮件
<html> <head> <title>Flask + Celery Examples</title> </head> <body> <h1>Flask + Celery Examples</h1> <h2>Example 1: Send Asynchronous Email</h2> {% for message in get_flashed_messages() %} <p style="color: red;">{{ message }}</p> {% endfor %} <form method="POST"> <p>Send test email to: <input type="text" name="email" value="{{ email }}"></p> <input type="submit" name="submit" value="Send"> <input type="submit" name="submit" value="Send in 1 minute"> </form> </body></html>
# Flask-Mail configurationapp.config[‘MAIL_SERVER‘] = ‘smtp.googlemail.com‘app.config[‘MAIL_PORT‘] = 587app.config[‘MAIL_USE_TLS‘] = Trueapp.config[‘MAIL_USERNAME‘] = os.environ.get(‘MAIL_USERNAME‘)app.config[‘MAIL_PASSWORD‘] = os.environ.get(‘MAIL_PASSWORD‘)app.config[‘MAIL_DEFAULT_SENDER‘] = ‘flask@example.com‘
异步发送代码如下。
@app.route(‘/‘, methods=[‘GET‘, ‘POST‘])def index(): if request.method == ‘GET‘: return render_template(‘index.html‘, email=session.get(‘email‘, ‘‘)) email = request.form[‘email‘] session[‘email‘] = email # send the email msg = Message(‘Hello from Flask‘, recipients=[request.form[‘email‘]]) msg.body = ‘This is a test email sent from a background Celery task.‘ if request.form[‘submit‘] == ‘Send‘: # send right away send_async_email.delay(msg) flash(‘Sending email to {0}‘.format(email)) else: # send in one minute send_async_email.apply_async(args=[msg], countdown=60) flash(‘An email will be sent to {0} in one minute‘.format(email)) return redirect(url_for(‘index‘))
@celery.taskdef send_async_email(msg): """Background task to send an email with Flask-Mail.""" with app.app_context(): mail.send(msg)
另一点很重要,从异步调用的返回值是不会保存的,所以应用本身无法知道是否异步调用是否成功。在这个例子之中需要看Celery的消费者的输出才能确定发送邮件过程是否有问题。
例二:显示状态更新进度
@celery.task(bind=True)def long_task(self): """Background task that runs a long function with progress reports.""" verb = [‘Starting up‘, ‘Booting‘, ‘Repairing‘, ‘Loading‘, ‘Checking‘] adjective = [‘master‘, ‘radiant‘, ‘silent‘, ‘harmonic‘, ‘fast‘] noun = [‘solar array‘, ‘particle reshaper‘, ‘cosmic ray‘, ‘orbiter‘, ‘bit‘] message = ‘‘ total = random.randint(10, 50) for i in range(total): if not message or random.random() < 0.25: message = ‘{0} {1} {2}...‘.format(random.choice(verb), random.choice(adjective), random.choice(noun)) self.update_state(state=‘PROGRESS‘, meta={‘current‘: i, ‘total‘: total, ‘status‘: message}) time.sleep(1) return {‘current‘: 100, ‘total‘: 100, ‘status‘: ‘Task completed!‘, ‘result‘: 42}
代码中作者在Celery 装饰者中加入了 bind=True 参数,这使得Celery向函数中传入了self参数,因此在函数中能够记录状态更新。
@app.route(‘/longtask‘, methods=[‘POST‘])def longtask(): task = long_task.apply_async() return jsonify({}), 202, {‘Location‘: url_for(‘taskstatus‘, task_id=task.id)}
用户需要向 /longtask 发送 POST 请求以触发后台任务执行。服务器启动任务并存储返回值。作者使用了状态码202,在REST API中有“请求正在处理中”的意思,而加入了Location头则是为了生产者能够获取任务执行时的状态信息。url_for用于生成路由到taskstatus函数的url,并且该url包含task id,task id的值是 task.id .
taskstatus 函数用于获取后台任务的更新状态。
@app.route(‘/status/<task_id>‘)def taskstatus(task_id): task = long_task.AsyncResult(task_id) if task.state == ‘PENDING‘: // job did not start yet response = { ‘state‘: task.state, ‘current‘: 0, ‘total‘: 1, ‘status‘: ‘Pending...‘ } elif task.state != ‘FAILURE‘: response = { ‘state‘: task.state, ‘current‘: task.info.get(‘current‘, 0), ‘total‘: task.info.get(‘total‘, 1), ‘status‘: task.info.get(‘status‘, ‘‘) } if ‘result‘ in task.info: response[‘result‘] = task.info[‘result‘] else: # something went wrong in the background job response = { ‘state‘: task.state, ‘current‘: 1, ‘total‘: 1, ‘status‘: str(task.info), # this is the exception raised } return jsonify(response)
为了得到后台任务产生的数据,使用了task id作为参数创建了一个task 对象。
本函数产生了JSON响应,JSON响应中的内容与 update_state() 更新的一致。
前端JS代码
<script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
function start_long_task() { // add task status elements div = $(‘<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>‘); $(‘#progress‘).append(div); // create a progress bar var nanobar = new Nanobar({ bg: ‘#44f‘, target: div[0].childNodes[0] }); // send ajax POST request to start background job $.ajax({ type: ‘POST‘, url: ‘/longtask‘, success: function(data, status, request) { status_url = request.getResponseHeader(‘Location‘); update_progress(status_url, nanobar, div[0]); }, error: function() { alert(‘Unexpected error‘); } }); }
其中被加入的HTML元素与任务的信息的对应关系如下。
<div class="progress"> <div></div> <-- Progress bar <div>0%</div> <-- Percentage <div>...</div> <-- Status message <div> </div> <-- Result</div><hr>
function update_progress(status_url, nanobar, status_div) { // send GET request to status URL $.getJSON(status_url, function(data) { // update UI percent = parseInt(data[‘current‘] * 100 / data[‘total‘]); nanobar.go(percent); $(status_div.childNodes[1]).text(percent + ‘%‘); $(status_div.childNodes[2]).text(data[‘status‘]); if (data[‘state‘] != ‘PENDING‘ && data[‘state‘] != ‘PROGRESS‘) { if (‘result‘ in data) { // show result $(status_div.childNodes[3]).text(‘Result: ‘ + data[‘result‘]); } else { // something unexpected happened $(status_div.childNodes[3]).text(‘Result: ‘ + data[‘state‘]); } } else { // rerun in 2 seconds setTimeout(function() { update_progress(status_url, nanobar, status_div); }, 2000); } }); }
当后台任务完成时,result会加载到页面之中。如果没有result的话,这就意味着任务的执行以失败告终,此时任务的状态是 FAILURE 。
运行例子
1 $ git clone https://github.com/miguelgrinberg/flask-celery-example.git2 $ cd flask-celery-example3 $ virtualenv venv4 $ source venv/bin/activate5 (venv) $ pip install -r requirements.txt
未安装virtualenv的话直接跳过第三行第四行命令。
$ export MAIL_USERNAME=<your-mail-username>$ export MAIL_PASSWORD=<your-mail-password>$ source venv/bin/activate(venv) $ celery worker -A app.celery --loglevel=info
Celery的 -A选项是应用中的celer对象,与文章最开头的代码对应。
$ source venv/bin/activate(venv) $ python app.py
访问http://localhost:5000/
即可。
- 待执行的任务的名字
- task id
- 任务要用到的参数
- 其他元数据,比如retries, eta, expires,他们的含义详见Task.apply_async(from celery import Task),不再细说。
【译】在Flask中使用Celery