Celery是Python实现的消息队列中间件
pip install celery
celery worker
控制台输出:
bj@bogon ~/tmp/python » celery worker 64 ↵
celery@bogon v4.2.1 (windowlicker)
Darwin-18.2.0-x86_64-i386-64bit 2019-03-20 18:54:27
[config]
.> app: default:0x109013fd0 (.default.Loader)
.> transport: amqp://guest:**@localhost:5672//
.> results: disabled://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[2019-03-20 18:54:27,417: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 2.00 seconds...
[2019-03-20 18:54:29,441: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 4.00 seconds...
Celery报错,本机5672端口连接被拒绝。5672是RattitMQ的默认端口,可见,RabbitMQ是Celery的默认消息后端。
安装RabbitMQ: brew cask install rabbitmq
启动RabbitMQ: RabbitMQ直接安装为GUI界面的App,直接运行即可
RabbitMQ的默认管理员帐号密码: guest guest
tasks.py
from celery import Celery
from time import sleep
app = Celery(backend='amqp')
@app.task
def add(x, y):
sleep(1)
return x + y
import tasks
from celery.result import AsyncResult
result: AsyncResult = tasks.add.delay(3, 5)
print("Result", result)
print("Ready", result.ready())
print("Real result", result.get())
运行Celery服务端: celery worker --app tasks --loglevel=info
运行Celery客户端: 直接运行python文件即可
Result d44d1ef6-9f30-4ab2-8257-1bbbce04d484
Ready False
Real result 8
[2019-03-20 19:28:37,140: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2019-03-20 19:28:37,156: INFO/MainProcess] mingle: searching for neighbors
[2019-03-20 19:28:38,184: INFO/MainProcess] mingle: all alone
[2019-03-20 19:28:38,210: INFO/MainProcess] celery@bogon ready.
[2019-03-20 19:28:40,721: INFO/MainProcess] Received task: tasks.add[fe38239d-81f0-4e00-99a4-6c66b998592b]
[2019-03-20 19:28:41,753: INFO/ForkPoolWorker-8] Task tasks.add[fe38239d-81f0-4e00-99a4-6c66b998592b] succeeded in 1.0297504499999999s: 8
[2019-03-20 19:28:44,155: INFO/MainProcess] Received task: tasks.add[dfb7b8df-42ba-4dc3-a664-d890875ab02d]
[2019-03-20 19:28:45,186: INFO/ForkPoolWorker-2] Task tasks.add[dfb7b8df-42ba-4dc3-a664-d890875ab02d] succeeded in 1.0284629679999995s: 8
[2019-03-20 19:29:14,826: INFO/MainProcess] Received task: tasks.add[7d6f865b-d7dc-41cc-a3ff-8a46a053ce85]
[2019-03-20 19:29:15,856: INFO/ForkPoolWorker-4] Task tasks.add[7d6f865b-d7dc-41cc-a3ff-8a46a053ce85] succeeded in 1.026909401999994s: 8
[2019-03-20 19:34:49,731: INFO/MainProcess] Received task: tasks.add[c6606323-31eb-4eac-816b-965eaa72d43e]
[2019-03-20 19:34:50,776: INFO/ForkPoolWorker-6] Task tasks.add[c6606323-31eb-4eac-816b-965eaa72d43e] succeeded in 1.0380972709999696s: 8
[2019-03-20 19:34:59,336: INFO/MainProcess] Received task: tasks.add[d44d1ef6-9f30-4ab2-8257-1bbbce04d484]
[2019-03-20 19:35:00,359: INFO/ForkPoolWorker-8] Task tasks.add[d44d1ef6-9f30-4ab2-8257-1bbbce04d484] succeeded in 1.0218639439999606s: 8
可见,Celery开启了多个Worker