首页 > 代码库 > A simple case to use Celery:

A simple case to use Celery:

Prerequisites: 

 1: Install RabbitMQ as it would be used as message broker for Celery. In windows, it would create a service, make sure the service is started.

 2: Install Celery:   pip install celery 

Meat and Potatoes:

Senario 1: don‘t specify the backend for celery, if we don‘t care about the result

1. Create a module named tasks.py

from __future__ import absolute_importfrom celery import Celeryimport timeapp = Celery(tasks, broker=amqp://guest@localhost:5672//)@app.taskdef add(x, y):    print hello celery    time.sleep(10)    return x + y

 

2. Start Celery worker 

celery worker -A tasks --loglevel=INFO

You would see the console output like below, 

 -------------- celery@YUFA-7W v3.1.10 (Cipater)---- **** -------- * ***  * -- Windows-7-6.1.7601-SP1-- * - **** ---- ** ---------- [config]- ** ---------- .> app:         tasks:0x36871d0- ** ---------- .> transport:   amqp://guest@localhost:5672//- ** ---------- .> results:     disabled- *** --- * --- .> concurrency: 8 (prefork)-- ******* ------- ***** ----- [queues] -------------- .> celery           exchange=celery(direct) key=celery[tasks]  . tasks.add[2014-03-26 15:43:11,263: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//[2014-03-26 15:43:11,285: INFO/MainProcess] mingle: searching for neighbors[2014-03-26 15:43:12,293: INFO/MainProcess] mingle: all alone[2014-03-26 15:43:12,302: WARNING/MainProcess] celery@YUFA-7W ready.

 

3. Test the method 

Call the function "add", 

>>> from tasks import add>>> result = add.delay(3,5)>>>

You would see something like below from Celery worker console,

[2014-03-26 15:55:04,117: INFO/MainProcess] Received task: tasks.add[0a52fd72-c7cd-4dc7-91a8-be51f1ff4df2][2014-03-26 15:55:04,118: WARNING/Worker-1] hello celery[2014-03-26 15:55:14,130: INFO/MainProcess] Task tasks.add[0a52fd72-c7cd-4dc7-91a8-be51f1ff4df2] succeeded in 10.0110001564s: 8

If you want to see task status from client, you can use call "result.ready()". However, as we didn‘t specify the backend for Celery, by defualt it would use "DisabledBackend", you would encounter the following error, 

>>> result.ready()Traceback (most recent call last):  File "<stdin>", line 1, in <module>  File "C:\Python27\lib\site-packages\celery\result.py", line 254, in ready    return self.state in self.backend.READY_STATES  File "C:\Python27\lib\site-packages\celery\result.py", line 390, in state    return self._get_task_meta()[status]  File "C:\Python27\lib\site-packages\celery\result.py", line 327, in _get_task_meta    meta = self.backend.get_task_meta(self.id)  File "C:\Python27\lib\site-packages\celery\backends\base.py", line 291, in get_task_meta    meta = self._get_task_meta_for(task_id)AttributeError: DisabledBackend object has no attribute _get_task_meta_for

 

To resolve this issue, here comes the following second senario.

 

Senario 2: Specify the backend for celery, if we do care about the result

1. Update the module tasks.py to specify parameter "backend" as "amqp". For other backend specification, refer to doc

from __future__ import absolute_importfrom celery import Celeryimport timeapp = Celery(tasks, backend="amqp", broker=amqp://guest@localhost:5672//)@app.taskdef add(x, y):    print hello celery    time.sleep(10)    return x + y

2. Restart celery worker and open a new python shell. (This is important, otherwise the code update above won‘t take effect)

3. Test 

>>> from tasks import add>>> result = add.delay(3,5)>>> result.ready()False>>> result.statePENDING>>> result.statusSUCCESS>>> result.stateSUCCESS>>> result.ready()True>>> result.get()8>>>

 

 See also: https://denibertovic.com/posts/celery-best-practices/