引言
开发系统的时候,总会遇到一些耗时的任务,比如解析文档、处理图片、跑数据分析什么的,尤其最近在写 RAG 系统就会有些很重的任务。这些活如果放在主线程里跑,那简直就是一场灾难。所以我们需要把这些任务丢给后台的 Worker 去处理,除了消息队列的另外一种选择。
Celery 差不多属于 Python 里最火的任务队列库,它和其他方案比起来有什么优缺点,Go 语言里有什么对应的工具,以及和现在很火的 Temporal 有什么区别?
Celery Worker 架构详解
基本概念
Celery 说白了就是一个任务分发系统。你把任务丢给它,它帮你分配给不同的 Worker 去执行。中间通过 Redis 或者 RabbitMQ 这些消息代理来传递任务。
核心架构:
Web App ──[发送任务]──> Message Broker ──[分发]──> Worker Pool
│ (Redis/RabbitMQ) │
└─────[查询结果]────────── Result Backend ←──[存储结果]┘
(Redis/Database)
# 典型的 Celery 任务定义
@celery_app.task(bind=True)
def process_document(self, file_id: str):
return chain(
parse_document_task.s(file_id),
chunk_document_task.s(),
generate_embeddings_task.s(),
store_vectors_task.s()
).apply_async()
链式任务处理
Celery 最牛的地方就是支持链式任务(Chain),可以把几个任务串起来:
- 自动数据传递:前面任务的输出自动变成后面任务的输入
- 阶段性恢复:某个任务挂了,重试的时候只重跑这个任务,前面完成的不用重复
- 状态持久化:任务状态和结果都能保存到数据库
可靠性保证与架构考虑
通过这些配置,Celery 能保证任务不轻易丢失:
celery_app.conf.update(
task_acks_late=True, # 任务完成后才确认
worker_prefetch_multiplier=1, # 控制并发
task_reject_on_worker_lost=True, # Worker 丢失时拒绝任务
result_persistent=True, # 结果持久化
# 重要:消息路由和队列设计
task_routes={
'tasks.heavy_document_task': {'queue': 'heavy_processing'},
'tasks.light_notification_task': {'queue': 'notifications'},
},
# 关键:序列化安全
task_serializer='json', # 避免pickle安全风险
accept_content=['json'],
result_serializer='json',
)
Worker 配置的那些坑
为什么 prefetch_multiplier 设成 1
你可能会想,为啥 worker_prefetch_multiplier=1
这么保守?设大点不是吞吐量更高吗?
这里有个权衡:可靠性 vs 吞吐量
设置为 1 的情况:
Worker1: [Task A] ← 正在处理
Queue: [Task B, Task C, Task D] ← 等待中
设置为 4 的情况:
Worker1: [Task A, Task B, Task C, Task D] ← 都预取到本地
Queue: [] ← 空了
如果 Worker1 这时候挂了:
- 设置为 1:只丢失 Task A,其他任务还在队列里
- 设置为 4:丢失 4 个任务,都得重跑
对于文档处理这种耗时长、内存占用大的任务,保守点比较好。如果你的任务都是轻量级的,可以适当调高。
Worker 怎么启动
# 生产环境部署策略
# 1. 按队列启动不同的Worker实例
celery -A tasks worker -Q heavy_processing -c 2 --prefetch-multiplier=1
celery -A tasks worker -Q notifications -c 10 --prefetch-multiplier=4
# 2. 监控和管理
celery -A tasks flower # Web监控界面
celery -A tasks inspect active # 查看活跃任务
celery -A tasks purge # 清空队列(危险操作)
# 开发环境需要启动这些服务
# Terminal 1: Redis
redis-server
# Terminal 2: Celery Worker
celery -A tasks worker --loglevel=info
# Terminal 3: 你的 Web 应用
python app.py
架构最佳实践:
- 🏗️ 按业务域隔离队列(文档处理、通知、报表等)
- 📊 不同队列使用不同的Worker配置(并发数、内存限制)
- 🔄 实现Circuit Breaker模式防止级联失败
- 📈 监控队列长度和处理延迟
对比传统消息队列
传统消息队列的问题
1. 缺乏任务编排能力
传统消息队列(比如 Kafka、Pulsar)就是简单的发送接收,复杂的工作流得自己写:
// 传统消息队列模式 - 需要手动管理任务流程
func processWithQueue() {
// 发送到解析队列
producer.Send("parse_queue", parseMessage)
// 得手动监听解析完成,然后发送到下一个队列
consumer.Listen("parse_result", func(msg) {
producer.Send("chunk_queue", chunkMessage)
})
}
2. 状态管理头疼
- 得自己搞个地方存状态
- 没有内置重试机制
- 很难知道任务跑到哪一步了
3. 错误处理麻烦
- 死信队列得自己实现
- 重试策略得自己写
- 部分失败的情况很难处理
Celery 的优势
1. 内置任务编排
# Celery 原生支持复杂工作流
workflow = chain(
task1.s(input),
group(task2.s(), task3.s()), # 并行执行
task4.s() # 汇总结果
)
2. 丰富的任务类型
- Chain:一个接一个执行
- Group:同时执行多个
- Chord:先并行执行,再汇总结果
- Map:批量处理
3. 自动重试和错误处理
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def robust_task(self):
# 出错了会自动重试 3 次
pass
Go 语言中的对应实现
Machinery - Go 版本的 Celery
Go 里面最像 Celery 的应该是 Machinery:
package main
import (
"github.com/RichardKnill/machinery/v1"
"github.com/RichardKnill/machinery/v1/config"
"github.com/RichardKnill/machinery/v1/tasks"
)
func main() {
// 配置服务器
cnf := &config.Config{
Broker: "redis://localhost:6379",
DefaultQueue: "machinery_tasks",
ResultBackend: "redis://localhost:6379",
}
server, _ := machinery.NewServer(cnf)
// 定义任务
taskMap := map[string]interface{}{
"parse_document": parseDocument,
"chunk_document": chunkDocument,
}
server.RegisterTasks(taskMap)
// 创建工作流
parseTask := &tasks.Signature{
Name: "parse_document",
Args: []tasks.Arg{
{Type: "string", Value: "file_123"},
},
}
chunkTask := &tasks.Signature{
Name: "chunk_document",
}
chain, _ := tasks.NewChain(parseTask, chunkTask)
_, err := server.SendChain(chain)
if err != nil {
panic(err)
}
}
func parseDocument(fileID string) (string, error) {
// 解析逻辑
return "parsed_content", nil
}
func chunkDocument(content string) ([]string, error) {
// 分块逻辑
return []string{"chunk1", "chunk2"}, nil
}
Asynq - 简洁的任务队列
如果你喜欢简单点的,Asynq 不错:
package main
import (
"github.com/hibiken/asynq"
"context"
"log"
)
func main() {
// 客户端 - 发送任务
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()
// 发送任务
task := asynq.NewTask("process_document", []byte(`{"file_id": "123"}`))
info, err := client.Enqueue(task)
if err != nil {
log.Fatal(err)
}
log.Printf("任务已入队: %s", info.ID)
}
// 在另一个进程中运行服务器
func runServer() {
// 服务器 - 处理任务
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("process_document", processDocumentHandler)
if err := srv.Run(mux); err != nil {
log.Fatalf("服务器启动失败: %v", err)
}
}
func processDocumentHandler(ctx context.Context, t *asynq.Task) error {
// 处理逻辑
log.Printf("处理任务: %s", t.Payload())
return nil
}
Go vs Python 对比
特性 | Celery (Python) | Machinery/Asynq (Go) |
---|---|---|
性能 | 还行 | 更快 |
内存使用 | 比较高 | 比较低 |
部署复杂度 | 还好 | 更简单(单个可执行文件) |
生态成熟度 | 非常成熟 | 还在发展 |
工作流支持 | 很丰富 | 比较基础 |
监控工具 | Flower 等 | 选择较少 |
对比 Temporal
Temporal 的特点
Temporal 是新一代的工作流引擎,功能更强大:
1. 工作流即代码
func DocumentProcessingWorkflow(ctx workflow.Context, fileID string) error {
// 解析阶段
var parseResult string
err := workflow.ExecuteActivity(ctx, ParseDocument, fileID).Get(ctx, &parseResult)
if err != nil {
return err
}
// 分块阶段
var chunks []string
err = workflow.ExecuteActivity(ctx, ChunkDocument, parseResult).Get(ctx, &chunks)
if err != nil {
return err
}
// 并行向量化
var futures []workflow.Future
for _, chunk := range chunks {
future := workflow.ExecuteActivity(ctx, GenerateEmbedding, chunk)
futures = append(futures, future)
}
// 等待所有向量化完成
var embeddings []string
for _, future := range futures {
var embedding string
err := future.Get(ctx, &embedding)
if err != nil {
return err
}
embeddings = append(embeddings, embedding)
}
// 存储阶段
return workflow.ExecuteActivity(ctx, StoreVectors, embeddings).Get(ctx, nil)
}
第一次看到这个代码会有个疑问:这不是顺序执行的吗?如果在第二步中断了,怎么在重启后跳过第一步?
Temporal 通过 Event Sourcing 实现这个魔法:
工作流执行历史(存储在数据库中):
┌─────────────────────────────────────────┐
│ Event 1: WorkflowStarted │
│ Event 2: ActivityScheduled (ParseDoc) │
│ Event 3: ActivityCompleted (ParseDoc) │ ← 已完成,有结果
│ Event 4: ActivityScheduled (ChunkDoc) │ ← 刚开始,无结果
│ Event 5: ... (系统崩溃) │
└─────────────────────────────────────────┘
恢复时:
- 读取所有 Event
- 重放(replay)工作流代码
- 遇到 ParseDoc → 发现 Event 3 有结果 → 直接返回结果
- 遇到 ChunkDoc → 发现没有完成事件 → 重新执行
这就是 Temporal 比 Celery 强大的地方 - 精确的断点恢复,而不是简单的任务重试。
2. 强一致性保证
- 工作流状态完全持久化
- 支持超长时间运行(几天几个月都没问题)
- 断点恢复非常精确
3. 时间处理能力
// 支持复杂的时间逻辑
workflow.Sleep(ctx, time.Hour*24) // 等待一天
workflow.NewTimer(ctx, time.Minute*30) // 设置定时器
Temporal vs Celery 对比
特性 | Celery | Temporal |
---|---|---|
学习成本 | 比较低 | 比较高 |
状态一致性 | 最终一致 | 强一致 |
长期任务 | 支持有限 | 原生支持 |
版本管理 | 比较麻烦 | 内置支持 |
监控界面 | Flower | Temporal Web UI |
复杂工作流 | 基础支持 | 非常强大 |
怎么选择
选择 Celery 如果:
- 想快速搭建原型
- 团队主要用 Python
- 任务复杂度中等
- 大家对 Celery 比较熟
选择 Temporal 如果:
- 需要非常可靠的状态保证
- 业务流程很复杂
- 有长时间运行的任务
- 对错误处理要求很高
选择 Go 任务队列如果:
- 对性能要求很高
- 服务器资源紧张
- 任务相对简单
- 团队主要用 Go
Redis 可靠性问题与架构设计
说到 Celery,就不得不提 Redis 的可靠性问题。很多人问:Redis 挂了怎么办?会不会丢失所有任务?这其实是一个**单点故障(SPOF)**的经典问题。
Redis 挂了会怎样
立即影响:
❌ 队列中等待的任务 → 丢失(如果Redis没有持久化)
❌ 正在传递的任务消息 → 丢失
❌ 任务结果缓存 → 丢失(如果存储在Redis中)
✅ 正在执行的任务 → 继续跑(已经在 Worker 内存里了)
✅ 已完成并确认的任务 → 不受影响
架构层面的思考:
这暴露了一个根本问题:Broker 和 Result Backend 的职责混淆。理想的架构应该是:
- Message Broker:专注消息传递,追求高吞吐和低延迟
- Result Backend:专注数据持久化,追求一致性和可靠性
解决方案
方案1:Redis 持久化配置
# redis.conf
save 900 1 # 15分钟内有1次写入就保存
save 300 10 # 5分钟内有10次写入就保存
save 60 10000 # 1分钟内有10000次写入就保存
appendonly yes # 开启AOF日志
appendfsync everysec # 每秒同步一次
# 重要:内存策略
maxmemory 2gb
maxmemory-policy allkeys-lru # 内存不足时清理策略
方案2:架构分离 - 数据库后端存储结果(推荐)
# 职责分离:Redis做消息队列,PostgreSQL存结果
DATABASE_URL = 'postgresql://doc_user:doc_password@localhost:5432/doc_analysis'
celery_app = Celery(
'doc_processor',
broker='redis://localhost:6379/0', # Redis 专门做消息传递
backend=f'db+{DATABASE_URL}' # 数据库专门存结果
)
celery_app.conf.update(
result_backend=f'db+{DATABASE_URL}',
result_persistent=True,
database_table_schemas={
'task': 'celery',
'group': 'celery'
},
)
方案3:Redis 高可用集群(生产级别)
# Redis Sentinel 配置
from kombu import Connection
# 主从复制 + 哨兵监控
REDIS_SENTINEL_CONFIG = {
'sentinels': [
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
],
'service_name': 'mymaster',
'socket_timeout': 0.5,
}
BROKER_URL = 'sentinel://;' + ';'.join([
f'{h}:{p}' for h, p in REDIS_SENTINEL_CONFIG['sentinels']
]) + f';service_name={REDIS_SENTINEL_CONFIG["service_name"]}'
方案4:换成 RabbitMQ(企业级选择)
# RabbitMQ 集群配置
celery_app = Celery(
'doc_processor',
broker='pyamqp://guest@rabbitmq-cluster-lb:5672//',
backend='rpc://' # 或者继续用数据库
)
# RabbitMQ 的优势
celery_app.conf.update(
task_serializer='json',
broker_connection_retry_on_startup=True,
broker_transport_options={
'master_name': 'rabbitmq-cluster',
'fanout_prefix': True,
'fanout_patterns': True,
}
)
实际建议
架构演进路径:
阶段1:快速启动(适合小团队)
- ✅ Redis 单机 + 基础持久化
- ✅ 数据库存储任务结果
- ✅ 基础监控
阶段2:可靠性提升(适合成长期)
- ✅ Redis Sentinel 主从
- ✅ 队列分离和专用Worker
- ✅ Circuit Breaker 和限流
阶段3:企业级(适合大规模)
- ✅ RabbitMQ 集群
- ✅ 多数据中心部署
- ✅ 完整的可观测性体系
关键架构原则:
- 📊 监控先行:没有监控就没有可靠性
- 🔄 渐进式改进:不要一步到位,分阶段演进
- 🎯 明确 SLA:99.9% 还是 99.99% 决定了架构复杂度
- 💡 故障演练:定期测试故障场景和恢复流程
实际项目选型建议
文档处理系统场景
拿我们的文档处理系统(解析→分块→向量化→存储)来说,各个方案的适用性:
Celery 方案(现在在用)
# 优势:简单直观,Python 生态丰富
chain(
parse_document_task.s(file_id),
chunk_document_task.s(),
generate_embeddings_task.s(),
store_vectors_task.s()
)
- ✅ 快速上手
- ✅ AI/ML 库很丰富
- ❌ Python GIL 限制性能
- ❌ 内存用得比较多
Temporal 方案
// 优势:超级可靠,支持复杂流程
func DocumentWorkflow(ctx workflow.Context, fileID string) error {
// 可以处理各种复杂情况:条件分支、重试策略、超时等
}
- ✅ 企业级可靠性
- ✅ 复杂流程支持很好
- ❌ 学习成本高
- ❌ 基础设施比较复杂
Go + Redis 方案
// 优势:性能高,资源占用少
client.Enqueue(asynq.NewTask("process_doc", payload))
- ✅ 性能很好
- ✅ 资源占用少
- ❌ AI/ML 生态不如 Python
- ❌ 需要写更多代码
总结:架构师视角的技术选型
从软件架构的角度看,每种技术方案都有其适用的业务场景和团队阶段:
技术选型矩阵
维度 | Celery | 传统MQ | Go队列 | Temporal |
---|---|---|---|---|
学习成本 | 低 | 中 | 中 | 高 |
运维复杂度 | 中 | 低 | 低 | 高 |
水平扩展性 | 中 | 高 | 高 | 高 |
状态一致性 | 最终一致 | 无状态 | 无状态 | 强一致 |
故障恢复 | 任务级 | 无 | 无 | 精确恢复 |
可观测性 | 中(Flower) | 需自建 | 需自建 | 强(内置) |
架构演进建议
Stage 1: MVP阶段(<10万任务/天)
- 选择: Celery + Redis 单机
- 理由: 快速上线,学习成本低
- 风险: 单点故障,状态丢失
Stage 2: 成长阶段(10万-100万任务/天)
- 选择: Celery + Redis Sentinel + 数据库后端
- 理由: 高可用,状态持久化
- 投入: 增加运维复杂度
Stage 3: 规模化阶段(100万+任务/天)
- 选择:
- 继续Celery但换RabbitMQ集群
- 或迁移到Temporal处理复杂流程
- 理由: 企业级可靠性,复杂业务支持
关键架构决策
1. 状态管理策略
# ❌ 错误:状态散布在各处
task_status = redis.get(f"task:{task_id}")
result = database.get_result(task_id)
progress = cache.get(f"progress:{task_id}")
# ✅ 正确:集中状态管理
class TaskStateManager:
def get_task_state(self, task_id):
# 统一状态查询入口
pass
2. 失败处理模式
- 快速失败: 轻量任务,失败后立即报警
- 重试机制: 网络相关任务,指数退避重试
- 补偿机制: 金融业务,记录所有操作支持回滚
3. 监控体系设计
# 关键指标监控
METRICS = {
'queue_length': '队列堆积情况',
'task_duration_p95': '任务处理延迟',
'worker_health': 'Worker健康状态',
'error_rate': '任务失败率',
'throughput': '系统吞吐量'
}
选择的时候主要考虑这几个因素:团队能力、业务复杂度、可靠性需求、长期演进。
对于大部分 Web 应用的后台任务处理,Celery 还是很好的起点。但如果你的业务流程很复杂,对可靠性要求很高,那 Temporal 值得好好研究一下。
记住一点:架构是演进的,不是一蹴而就的。从简单开始,随着业务复杂度提升再逐步演进,这才是务实的架构之道。