白季飞龙的个人主页

Celery大杂烩

Celery是什么

Celery是Python实现的消息队列中间件

Celery的安装

pip install celery

Celery的启动

celery worker

控制台输出:

bj@bogon ~/tmp/python » celery worker                                                              64 ↵


celery@bogon v4.2.1 (windowlicker)

Darwin-18.2.0-x86_64-i386-64bit 2019-03-20 18:54:27

[config]
.> app:         default:0x109013fd0 (.default.Loader)
.> transport:   amqp://guest:**@localhost:5672//
.> results:     disabled://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[2019-03-20 18:54:27,417: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 2.00 seconds...

[2019-03-20 18:54:29,441: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 4.00 seconds...

Celery报错,本机5672端口连接被拒绝。5672是RattitMQ的默认端口,可见,RabbitMQ是Celery的默认消息后端。

安装RabbitMQ: brew cask install rabbitmq

启动RabbitMQ: RabbitMQ直接安装为GUI界面的App,直接运行即可

RabbitMQ的默认管理员帐号密码: guest guest

Celery的基本应用

Celery服务端

tasks.py

from celery import Celery
from time import sleep

app = Celery(backend='amqp')


@app.task
def add(x, y):
    sleep(1)
    return x + y

Celery客户端

import tasks
from celery.result import AsyncResult

result: AsyncResult = tasks.add.delay(3, 5)
print("Result", result)
print("Ready", result.ready())
print("Real result", result.get())

运行Celery服务端: celery worker --app tasks --loglevel=info

运行Celery客户端: 直接运行python文件即可

客户端控制台输出

Result d44d1ef6-9f30-4ab2-8257-1bbbce04d484
Ready False
Real result 8

服务端控制台输出

[2019-03-20 19:28:37,140: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2019-03-20 19:28:37,156: INFO/MainProcess] mingle: searching for neighbors
[2019-03-20 19:28:38,184: INFO/MainProcess] mingle: all alone
[2019-03-20 19:28:38,210: INFO/MainProcess] celery@bogon ready.
[2019-03-20 19:28:40,721: INFO/MainProcess] Received task: tasks.add[fe38239d-81f0-4e00-99a4-6c66b998592b]
[2019-03-20 19:28:41,753: INFO/ForkPoolWorker-8] Task tasks.add[fe38239d-81f0-4e00-99a4-6c66b998592b] succeeded in 1.0297504499999999s: 8
[2019-03-20 19:28:44,155: INFO/MainProcess] Received task: tasks.add[dfb7b8df-42ba-4dc3-a664-d890875ab02d]
[2019-03-20 19:28:45,186: INFO/ForkPoolWorker-2] Task tasks.add[dfb7b8df-42ba-4dc3-a664-d890875ab02d] succeeded in 1.0284629679999995s: 8
[2019-03-20 19:29:14,826: INFO/MainProcess] Received task: tasks.add[7d6f865b-d7dc-41cc-a3ff-8a46a053ce85]
[2019-03-20 19:29:15,856: INFO/ForkPoolWorker-4] Task tasks.add[7d6f865b-d7dc-41cc-a3ff-8a46a053ce85] succeeded in 1.026909401999994s: 8
[2019-03-20 19:34:49,731: INFO/MainProcess] Received task: tasks.add[c6606323-31eb-4eac-816b-965eaa72d43e]
[2019-03-20 19:34:50,776: INFO/ForkPoolWorker-6] Task tasks.add[c6606323-31eb-4eac-816b-965eaa72d43e] succeeded in 1.0380972709999696s: 8
[2019-03-20 19:34:59,336: INFO/MainProcess] Received task: tasks.add[d44d1ef6-9f30-4ab2-8257-1bbbce04d484]
[2019-03-20 19:35:00,359: INFO/ForkPoolWorker-8] Task tasks.add[d44d1ef6-9f30-4ab2-8257-1bbbce04d484] succeeded in 1.0218639439999606s: 8

可见,Celery开启了多个Worker

文章首发: https://baijifeilong.github.io


漫漫路,莫论逍遥;潜心修,只为悟道