Python Celery 在 Windows 的基本使用

11/6/2024

一、准备工作

  • Python 环境
  • Redis 数据库(如果没有 Redis 推荐使用 Docker 部署)
  • Poetry(推荐使用类似的包管理工具,虚拟环境)

二、Celery 核心概念

概念简介
任务 (Task)定义一个异步执行的函数,可以是发送邮件、处理数据等任务。
队列 (Queue)用于存储待执行的任务,Worker 从队列中取出任务执行。
消息代理 (Broker)消息传递中间件,负责在生产者与 Worker 之间传递任务消息,如 RabbitMQ 或 Redis。
工作节点 (Worker)实际执行任务的进程,从 Broker 获取任务并执行,支持多节点分布式处理。
结果存储 (Result Backend)存储任务的执行结果和状态,便于查询任务状态或获取结果,支持 Redis、数据库等。
Beat定时调度程序,用于管理周期性任务,将定时任务发送到队列中,由 Worker 执行。
任务重试 (Retry)支持自动重试机制,在任务失败时自动重新尝试执行以提高完成率。
链式任务 (Task Chains)支持任务的链式调用,通过 chain()chord() 创建任务流,实现任务依赖。

三、安装依赖

poetry init

poetry add celery[redis] gevent

四、为什么需要 gevent?

因为在 windows 环境中会遇到问题。

Windows 对多进程的支持有限,使用 gevent 可以避免 Celery 在 Windows 上使用多进程时可能遇到的一些问题。通过协程池,你可以避免进程间通信带来的开销和复杂性。

五、docker 部署 redis

docker pull redis:latest # 拉取最新的 redis

docker run -itd --name redis-test -p 6379:6379 redis # 在 docker 中运行redis

image.png

六、使用 poetry 初始化项目

cd you project
poetry init 

七、添加一个 tasks.py

from celery import Celery
import time

broker = "redis://localhost:6379/0"
backend = "redis://localhost:6379/0"
app = Celery(
    "tasks", broker=broker, backend=backend
)

@app.task
def add(x, y):
    print("处理中...")
    time.sleep(5)
    print("处理完成...")
    return x + y

八、添加运行文件 main.py

from tasks import add

result = add.delay(4, 6)

print("任务状态:", result.status)
print("任务结果:", result.get())

九、启动任务

poetry run celery -A tasks worker -P gevent --loglevel=info

需要注意的是:在 windows 环境中,如果你想使用 ctrl + c 是不能停止的。

十、运行任务

poetry run main.py

输出:

image.png

任务状态: PENDING
任务结果: 10

任务输出结果:

image.png

十一、查看 redis ui

我们使用 redis ui 工具 Anthor Desktop Redis Manager 管理我们的任务。

image.png

image.png

这里我们定义两个任务,左侧是两次运行的任务,返回的结果都是 10。

十二、flower 监控

poetry add flower

celery -A tasks.app flower

然后再本地访问:http://localhost:5555/ 得到 web UI界面:

image.png

十三、小结

本文主要讲解是 Python 的 Celery 在 windows 中使用 Celery + Redis (以及 windows gevent)运行基本 Celery 任务。当然也有其他的能力,任务重试、任务链、周期性任务等等。