Python Celery 在 Windows 的基本使用
11/6/2024
一、准备工作
- Python 环境
- Redis 数据库(如果没有 Redis 推荐使用 Docker 部署)
- Poetry(推荐使用类似的包管理工具,虚拟环境)
二、Celery 核心概念
概念 | 简介 |
---|---|
任务 (Task) | 定义一个异步执行的函数,可以是发送邮件、处理数据等任务。 |
队列 (Queue) | 用于存储待执行的任务,Worker 从队列中取出任务执行。 |
消息代理 (Broker) | 消息传递中间件,负责在生产者与 Worker 之间传递任务消息,如 RabbitMQ 或 Redis。 |
工作节点 (Worker) | 实际执行任务的进程,从 Broker 获取任务并执行,支持多节点分布式处理。 |
结果存储 (Result Backend) | 存储任务的执行结果和状态,便于查询任务状态或获取结果,支持 Redis、数据库等。 |
Beat | 定时调度程序,用于管理周期性任务,将定时任务发送到队列中,由 Worker 执行。 |
任务重试 (Retry) | 支持自动重试机制,在任务失败时自动重新尝试执行以提高完成率。 |
链式任务 (Task Chains) | 支持任务的链式调用,通过 chain() 或 chord() 创建任务流,实现任务依赖。 |
三、安装依赖
poetry init
poetry add celery[redis] gevent
四、为什么需要 gevent?
因为在 windows 环境中会遇到问题。
Windows 对多进程的支持有限,使用
gevent
可以避免 Celery 在 Windows 上使用多进程时可能遇到的一些问题。通过协程池,你可以避免进程间通信带来的开销和复杂性。
五、docker 部署 redis
docker pull redis:latest # 拉取最新的 redis
docker run -itd --name redis-test -p 6379:6379 redis # 在 docker 中运行redis
六、使用 poetry 初始化项目
cd you project
poetry init
七、添加一个 tasks.py
from celery import Celery
import time
broker = "redis://localhost:6379/0"
backend = "redis://localhost:6379/0"
app = Celery(
"tasks", broker=broker, backend=backend
)
@app.task
def add(x, y):
print("处理中...")
time.sleep(5)
print("处理完成...")
return x + y
八、添加运行文件 main.py
from tasks import add
result = add.delay(4, 6)
print("任务状态:", result.status)
print("任务结果:", result.get())
九、启动任务
poetry run celery -A tasks worker -P gevent --loglevel=info
需要注意的是:在 windows 环境中,如果你想使用 ctrl + c
是不能停止的。
十、运行任务
poetry run main.py
输出:
任务状态: PENDING
任务结果: 10
任务输出结果:
十一、查看 redis ui
我们使用 redis ui 工具 Anthor Desktop Redis Manager 管理我们的任务。
这里我们定义两个任务,左侧是两次运行的任务,返回的结果都是 10。
十二、flower 监控
poetry add flower
celery -A tasks.app flower
然后再本地访问:http://localhost:5555/
得到 web UI界面:
十三、小结
本文主要讲解是 Python 的 Celery 在 windows 中使用 Celery + Redis (以及 windows gevent)运行基本 Celery 任务。当然也有其他的能力,任务重试、任务链、周期性任务等等。