首页 > 代码库 > django celery的分布式异步之路(一) hello world

django celery的分布式异步之路(一) hello world

 

设想你遇到如下场景:

1)高并发

2)请求的执行相当消耗机器资源,流量峰值的时候可能超出单机界限

3)请求返回慢,客户长时间等在页面等待任务返回

4)存在耗时的定时任务

这时你就需要一个分布式异步的框架了。

celery会是一个不错的选择。本文将一步一步的介绍如何使用celery和django进行集成,并进行分布式异步编程。

 

1、安装依赖

默认你已经有了python和pip。我使用的版本是:

python 2.7.10
pip 9.0.1
virtualenv 15.1.0

创建沙盒环境,我们生产过程中通过沙盒环境来使用各种python包的版本,各个应用的沙盒环境之间互不干扰。

$ mkdir kangaroo
$ cd kangaroo
$ virtualenv kangaroo.env
# 沙盒下面有什么,可以看到有python的bin、include和pip
$ ll kangaroo.env
total 8
drwxrwxr-x 2 data_monitor data_monitor 4096 Aug  7 16:00 bin
drwxrwxr-x 2 data_monitor data_monitor   22 Aug  7 16:00 include
drwxrwxr-x 3 data_monitor data_monitor   22 Aug  7 16:00 lib
-rw-rw-r-- 1 data_monitor data_monitor   60 Aug  7 16:00 pip-selfcheck.json
# 让沙盒环境在当前session(shell)中生效
$ source kangaroo.env/bin/activate
# 可以看到命令行首多了(kangaroo.env),而且python的路径已经变了
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ which python
/data/home/data_monitor/yangfan/test/kangaroo/kangaroo.env/bin/python

下面我们开始在kangaroo环境下安装相应版本的django和celery,以及django-celery集成包。

(kangaroo.env) [XXX@XXX kangaroo]$ pip install django==1.10.6
(kangaroo.env) [XXX@XXX kangaroo]$ pip install celery==3.1.25
(kangaroo.env) [XXX@XXX kangaroo]$ pip install django-celery==3.2.1

我在安装的时候写明了版本号,是因为这套版本号在我们的生产环境过玩转过。

如果你换了对应的版本号的话,可能会引发冲突,出现意想不到的问题。亲测还是有一些版本之间是有怪问题的。

 

2、创建工程

创建工程kangaroo:django-admin startproject kangaroo

# 在kangaroo.env同级目录下创建工程kangaroo
cd /home/data_monitor/yangfan/test/kangaroo
# 创建工程kangaroo (kangaroo.
env) [data_monitor@bigdata-arch-client10 kangaroo]$ django-admin startproject kangaroo (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll total 0 drwxrwxr-x 3 data_monitor data_monitor 37 Aug 7 16:45 kangaroo drwxrwxr-x 5 data_monitor data_monitor 65 Aug 7 16:00 kangaroo.env

创建APP foot:python manage.py startapp foot

# 进入工程目录,你会看到manage.py文件
cd kangaroo
# 创建APP foot
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ python manage.py startapp foot
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll
total 4
drwxrwxr-x 3 data_monitor data_monitor 116 Aug  7 16:48 foot
drwxrwxr-x 2 data_monitor data_monitor 108 Aug  7 16:48 kangaroo
-rwxrwxr-x 1 data_monitor data_monitor 806 Aug  7 16:45 manage.py
# 进入foot app目录,看一下有什么
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ cd foot/
(kangaroo.env) [data_monitor@bigdata-arch-client10 foot]$ ll
total 20
-rw-rw-r-- 1 data_monitor data_monitor  63 Aug  7 16:48 admin.py
-rw-rw-r-- 1 data_monitor data_monitor 124 Aug  7 16:48 apps.py
-rw-rw-r-- 1 data_monitor data_monitor   0 Aug  7 16:48 __init__.py
drwxrwxr-x 2 data_monitor data_monitor  24 Aug  7 16:48 migrations
-rw-rw-r-- 1 data_monitor data_monitor  98 Aug  7 16:48 models.py
-rw-rw-r-- 1 data_monitor data_monitor  60 Aug  7 16:48 tests.py
-rw-rw-r-- 1 data_monitor data_monitor  63 Aug  7 16:48 views.py

 至此我们创建了工程kangaroo和app foot,下面我们介绍如何集成celery。

 

3、django-celery的集成配置

我们这里集成的方式是使用django-celery包。

集成配置要注意以下几个地方就好了,配置起来还是比较简单的。

1)修改kangaroo/settings.py文件

让djcelery模块生效

import os
import djcelery
djcelery.setup_loader()
...
INSTALLED_APPS = (
   ...
    djcelery,
    kombu.transport.django,
    ...
)

配置broker和backend

# Celery settings
# redis做broker, 第二个":"前后是redis的用户名密码,后面的2是db
# BROKER_URL = ‘redis://:password@10.93.84.53:6379/2‘
# rabbitMQ做broker,第二个":"前后是rabbitMQ的用户名密码
BROKER_URL = amqp://admin:bigdata123@10.93.21.21:5672//
# Celery的backend记录地址,这里只给出redis的配置
CELERY_RESULT_BACKEND = redis://:bigdata123@10.93.84.53:6379/3

 

4、第一个task

其实应该在创建app的时候就将appName添加到settings.py的INSTALLED_APPS中,我们没有这样做事留到现在好说明问题。

我们修改settings.py加入foot。

import os
import djcelery
djcelery.setup_loader()
...
INSTALLED_APPS = (
   ...
    djcelery,
    kombu.transport.django,
   ...
    foot,
    ...
)

当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为@task的function, 并将它们注册为celery task。

所以我们在foot包下创建tasks.py文件,并且添加我们的task。

foot/tasks.py

# -*- coding:utf-8 -*-
import time
import logging

from celery import task
logger = logging.getLogger(__name__)

@task()
def foot_task(param_dict):
    logger.info(foot task start! param_dict:%s‘ % param_dict)
    time.sleep(10)
    logger.info(foot task finished!)
    return

这个task等待接收一个参数字典,只是简单的打印参数,然后sleep10s就退出了。

让task在后台worker中注册,当有任务分发下来的时候就开始执行。只需执行

python manage.py celery worker --loglevel=info

 

5、分发任务dispatch

任务触发的两种方式:

1)定时调度

2)请求执行

我这里写了一个通用的函数,这个函数用于分发任务

def dispatch(task, param_dict):
    param_json = json.dumps(param_dict)
    try:
        task.apply_async(
            [param_json],
            retry=True,
            retry_policy={
                max_retries: 1,
                interval_start: 0,
                interval_step: 0.2,
                interval_max: 0.2,
            },
        )
    except Exception, ex:
        logger.info(traceback.format_exc())
        raise

如何触发foot.tasks中的任务呢,只需要

import logging
import traceback
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt

from foot.tasks import foot_task
from common.dispatcher import dispatch

logger = logging.getLogger(__name__)

@csrf_exempt
def hello(request):
    if request.method == GET:
        try:
            user = request.GET.get(username)
            dispatch(test_task, {hello: user})
            return JsonResponse({code: 0, msg:success})
        except Exception, ex:
            return JsonResponse({code: -1, msg: traceback.format_exc()})

1)当你在客户端发送请求:hello?username=‘kangaroo‘时

2)服务瞬间返回:{‘code‘: 0, ‘msg‘:‘success‘}

3)后端sleep10秒后执行成功,打印hello:kangaroo

这就是异步的效果。

django celery的分布式异步之路(一) hello world