随笔

耗时任务处理的另一选择-Worker

引言

开发系统的时候,总会遇到一些耗时的任务,比如解析文档、处理图片、跑数据分析什么的,尤其最近在写 RAG 系统就会有些很重的任务。这些活如果放在主线程里跑,那简直就是一场灾难。所以我们需要把这些任务丢给后台的 Worker 去处理,除了消息队列的另外一种选择。

Celery 差不多属于 Python 里最火的任务队列库,它和其他方案比起来有什么优缺点,Go 语言里有什么对应的工具,以及和现在很火的 Temporal 有什么区别?

Celery Worker 架构详解

基本概念

Celery 说白了就是一个任务分发系统。你把任务丢给它,它帮你分配给不同的 Worker 去执行。中间通过 Redis 或者 RabbitMQ 这些消息代理来传递任务。

核心架构:

Web App ──[发送任务]──> Message Broker ──[分发]──> Worker Pool
   │                     (Redis/RabbitMQ)              │
   └─────[查询结果]────────── Result Backend ←──[存储结果]┘
                         (Redis/Database)
# 典型的 Celery 任务定义
@celery_app.task(bind=True)
def process_document(self, file_id: str):
    return chain(
        parse_document_task.s(file_id),
        chunk_document_task.s(),
        generate_embeddings_task.s(),
        store_vectors_task.s()
    ).apply_async()

链式任务处理

Celery 最牛的地方就是支持链式任务(Chain),可以把几个任务串起来:

  1. 自动数据传递:前面任务的输出自动变成后面任务的输入
  2. 阶段性恢复:某个任务挂了,重试的时候只重跑这个任务,前面完成的不用重复
  3. 状态持久化:任务状态和结果都能保存到数据库

可靠性保证与架构考虑

通过这些配置,Celery 能保证任务不轻易丢失:

celery_app.conf.update(
    task_acks_late=True,              # 任务完成后才确认
    worker_prefetch_multiplier=1,     # 控制并发
    task_reject_on_worker_lost=True,  # Worker 丢失时拒绝任务
    result_persistent=True,           # 结果持久化
    
    # 重要:消息路由和队列设计
    task_routes={
        'tasks.heavy_document_task': {'queue': 'heavy_processing'},
        'tasks.light_notification_task': {'queue': 'notifications'},
    },
    
    # 关键:序列化安全
    task_serializer='json',           # 避免pickle安全风险
    accept_content=['json'],
    result_serializer='json',
)

Worker 配置的那些坑

为什么 prefetch_multiplier 设成 1

你可能会想,为啥 worker_prefetch_multiplier=1 这么保守?设大点不是吞吐量更高吗?

这里有个权衡:可靠性 vs 吞吐量

设置为 1 的情况:

Worker1: [Task A] ← 正在处理
Queue:   [Task B, Task C, Task D] ← 等待中

设置为 4 的情况:

Worker1: [Task A, Task B, Task C, Task D] ← 都预取到本地
Queue:   [] ← 空了

如果 Worker1 这时候挂了:

  • 设置为 1:只丢失 Task A,其他任务还在队列里
  • 设置为 4:丢失 4 个任务,都得重跑

对于文档处理这种耗时长、内存占用大的任务,保守点比较好。如果你的任务都是轻量级的,可以适当调高。

Worker 怎么启动

# 生产环境部署策略
# 1. 按队列启动不同的Worker实例
celery -A tasks worker -Q heavy_processing -c 2 --prefetch-multiplier=1
celery -A tasks worker -Q notifications -c 10 --prefetch-multiplier=4

# 2. 监控和管理
celery -A tasks flower  # Web监控界面
celery -A tasks inspect active  # 查看活跃任务
celery -A tasks purge  # 清空队列(危险操作)

# 开发环境需要启动这些服务
# Terminal 1: Redis
redis-server

# Terminal 2: Celery Worker  
celery -A tasks worker --loglevel=info

# Terminal 3: 你的 Web 应用
python app.py

架构最佳实践:

  • 🏗️ 按业务域隔离队列(文档处理、通知、报表等)
  • 📊 不同队列使用不同的Worker配置(并发数、内存限制)
  • 🔄 实现Circuit Breaker模式防止级联失败
  • 📈 监控队列长度和处理延迟

对比传统消息队列

传统消息队列的问题

1. 缺乏任务编排能力

传统消息队列(比如 Kafka、Pulsar)就是简单的发送接收,复杂的工作流得自己写:

// 传统消息队列模式 - 需要手动管理任务流程
func processWithQueue() {
    // 发送到解析队列
    producer.Send("parse_queue", parseMessage)
    
    // 得手动监听解析完成,然后发送到下一个队列
    consumer.Listen("parse_result", func(msg) {
        producer.Send("chunk_queue", chunkMessage)
    })
}

2. 状态管理头疼

  • 得自己搞个地方存状态
  • 没有内置重试机制
  • 很难知道任务跑到哪一步了

3. 错误处理麻烦

  • 死信队列得自己实现
  • 重试策略得自己写
  • 部分失败的情况很难处理

Celery 的优势

1. 内置任务编排

# Celery 原生支持复杂工作流
workflow = chain(
    task1.s(input),
    group(task2.s(), task3.s()),  # 并行执行
    task4.s()                     # 汇总结果
)

2. 丰富的任务类型

  • Chain:一个接一个执行
  • Group:同时执行多个
  • Chord:先并行执行,再汇总结果
  • Map:批量处理

3. 自动重试和错误处理

@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def robust_task(self):
    # 出错了会自动重试 3 次
    pass

Go 语言中的对应实现

Machinery - Go 版本的 Celery

Go 里面最像 Celery 的应该是 Machinery:

package main

import (
    "github.com/RichardKnill/machinery/v1"
    "github.com/RichardKnill/machinery/v1/config"
    "github.com/RichardKnill/machinery/v1/tasks"
)

func main() {
    // 配置服务器
    cnf := &config.Config{
        Broker:        "redis://localhost:6379",
        DefaultQueue:  "machinery_tasks",
        ResultBackend: "redis://localhost:6379",
    }
    
    server, _ := machinery.NewServer(cnf)
    
    // 定义任务
    taskMap := map[string]interface{}{
        "parse_document": parseDocument,
        "chunk_document": chunkDocument,
    }
    server.RegisterTasks(taskMap)
    
    // 创建工作流
    parseTask := &tasks.Signature{
        Name: "parse_document",
        Args: []tasks.Arg{
            {Type: "string", Value: "file_123"},
        },
    }
    
    chunkTask := &tasks.Signature{
        Name: "chunk_document",
    }
    
    chain, _ := tasks.NewChain(parseTask, chunkTask)
    _, err := server.SendChain(chain)
    if err != nil {
        panic(err)
    }
}

func parseDocument(fileID string) (string, error) {
    // 解析逻辑
    return "parsed_content", nil
}

func chunkDocument(content string) ([]string, error) {
    // 分块逻辑
    return []string{"chunk1", "chunk2"}, nil
}

Asynq - 简洁的任务队列

如果你喜欢简单点的,Asynq 不错:

package main

import (
    "github.com/hibiken/asynq"
    "context"
    "log"
)

func main() {
    // 客户端 - 发送任务
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()
    
    // 发送任务
    task := asynq.NewTask("process_document", []byte(`{"file_id": "123"}`))
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("任务已入队: %s", info.ID)
}

// 在另一个进程中运行服务器
func runServer() {
    // 服务器 - 处理任务
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )
    
    mux := asynq.NewServeMux()
    mux.HandleFunc("process_document", processDocumentHandler)
    
    if err := srv.Run(mux); err != nil {
        log.Fatalf("服务器启动失败: %v", err)
    }
}

func processDocumentHandler(ctx context.Context, t *asynq.Task) error {
    // 处理逻辑
    log.Printf("处理任务: %s", t.Payload())
    return nil
}

Go vs Python 对比

特性 Celery (Python) Machinery/Asynq (Go)
性能 还行 更快
内存使用 比较高 比较低
部署复杂度 还好 更简单(单个可执行文件)
生态成熟度 非常成熟 还在发展
工作流支持 很丰富 比较基础
监控工具 Flower 等 选择较少

对比 Temporal

Temporal 的特点

Temporal 是新一代的工作流引擎,功能更强大:

1. 工作流即代码

func DocumentProcessingWorkflow(ctx workflow.Context, fileID string) error {
    // 解析阶段
    var parseResult string
    err := workflow.ExecuteActivity(ctx, ParseDocument, fileID).Get(ctx, &parseResult)
    if err != nil {
        return err
    }
    
    // 分块阶段
    var chunks []string
    err = workflow.ExecuteActivity(ctx, ChunkDocument, parseResult).Get(ctx, &chunks)
    if err != nil {
        return err
    }
    
    // 并行向量化
    var futures []workflow.Future
    for _, chunk := range chunks {
        future := workflow.ExecuteActivity(ctx, GenerateEmbedding, chunk)
        futures = append(futures, future)
    }
    
    // 等待所有向量化完成
    var embeddings []string
    for _, future := range futures {
        var embedding string
        err := future.Get(ctx, &embedding)
        if err != nil {
            return err
        }
        embeddings = append(embeddings, embedding)
    }
    
    // 存储阶段
    return workflow.ExecuteActivity(ctx, StoreVectors, embeddings).Get(ctx, nil)
}

第一次看到这个代码会有个疑问:这不是顺序执行的吗?如果在第二步中断了,怎么在重启后跳过第一步?

Temporal 通过 Event Sourcing 实现这个魔法:

工作流执行历史(存储在数据库中):

  ┌─────────────────────────────────────────┐
  │ Event 1: WorkflowStarted                │
  │ Event 2: ActivityScheduled (ParseDoc)   │
  │ Event 3: ActivityCompleted (ParseDoc)   │ ← 已完成,有结果
  │ Event 4: ActivityScheduled (ChunkDoc)   │ ← 刚开始,无结果
  │ Event 5: ... (系统崩溃)                  │
  └─────────────────────────────────────────┘

恢复时:

  • 读取所有 Event
  • 重放(replay)工作流代码
  • 遇到 ParseDoc → 发现 Event 3 有结果 → 直接返回结果
  • 遇到 ChunkDoc → 发现没有完成事件 → 重新执行

这就是 Temporal 比 Celery 强大的地方 - 精确的断点恢复,而不是简单的任务重试。

2. 强一致性保证

  • 工作流状态完全持久化
  • 支持超长时间运行(几天几个月都没问题)
  • 断点恢复非常精确

3. 时间处理能力

// 支持复杂的时间逻辑
workflow.Sleep(ctx, time.Hour*24) // 等待一天
workflow.NewTimer(ctx, time.Minute*30) // 设置定时器

Temporal vs Celery 对比

特性 Celery Temporal
学习成本 比较低 比较高
状态一致性 最终一致 强一致
长期任务 支持有限 原生支持
版本管理 比较麻烦 内置支持
监控界面 Flower Temporal Web UI
复杂工作流 基础支持 非常强大

怎么选择

选择 Celery 如果:

  • 想快速搭建原型
  • 团队主要用 Python
  • 任务复杂度中等
  • 大家对 Celery 比较熟

选择 Temporal 如果:

  • 需要非常可靠的状态保证
  • 业务流程很复杂
  • 有长时间运行的任务
  • 对错误处理要求很高

选择 Go 任务队列如果:

  • 对性能要求很高
  • 服务器资源紧张
  • 任务相对简单
  • 团队主要用 Go

Redis 可靠性问题与架构设计

说到 Celery,就不得不提 Redis 的可靠性问题。很多人问:Redis 挂了怎么办?会不会丢失所有任务?这其实是一个**单点故障(SPOF)**的经典问题。

Redis 挂了会怎样

立即影响:

❌ 队列中等待的任务 → 丢失(如果Redis没有持久化)
❌ 正在传递的任务消息 → 丢失  
❌ 任务结果缓存 → 丢失(如果存储在Redis中)
✅ 正在执行的任务 → 继续跑(已经在 Worker 内存里了)
✅ 已完成并确认的任务 → 不受影响

架构层面的思考:
这暴露了一个根本问题:Broker 和 Result Backend 的职责混淆。理想的架构应该是:

  • Message Broker:专注消息传递,追求高吞吐和低延迟
  • Result Backend:专注数据持久化,追求一致性和可靠性

解决方案

方案1:Redis 持久化配置

# redis.conf
save 900 1          # 15分钟内有1次写入就保存
save 300 10         # 5分钟内有10次写入就保存  
save 60 10000       # 1分钟内有10000次写入就保存
appendonly yes      # 开启AOF日志
appendfsync everysec # 每秒同步一次

# 重要:内存策略
maxmemory 2gb
maxmemory-policy allkeys-lru  # 内存不足时清理策略

方案2:架构分离 - 数据库后端存储结果(推荐)

# 职责分离:Redis做消息队列,PostgreSQL存结果
DATABASE_URL = 'postgresql://doc_user:doc_password@localhost:5432/doc_analysis'

celery_app = Celery(
    'doc_processor',
    broker='redis://localhost:6379/0',     # Redis 专门做消息传递
    backend=f'db+{DATABASE_URL}'          # 数据库专门存结果
)

celery_app.conf.update(
    result_backend=f'db+{DATABASE_URL}',
    result_persistent=True,
    database_table_schemas={
        'task': 'celery',
        'group': 'celery'
    },
)

方案3:Redis 高可用集群(生产级别)

# Redis Sentinel 配置
from kombu import Connection

# 主从复制 + 哨兵监控
REDIS_SENTINEL_CONFIG = {
    'sentinels': [
        ('sentinel1', 26379),
        ('sentinel2', 26379),
        ('sentinel3', 26379)
    ],
    'service_name': 'mymaster',
    'socket_timeout': 0.5,
}

BROKER_URL = 'sentinel://;' + ';'.join([
    f'{h}:{p}' for h, p in REDIS_SENTINEL_CONFIG['sentinels']
]) + f';service_name={REDIS_SENTINEL_CONFIG["service_name"]}'

方案4:换成 RabbitMQ(企业级选择)

# RabbitMQ 集群配置
celery_app = Celery(
    'doc_processor',
    broker='pyamqp://guest@rabbitmq-cluster-lb:5672//',
    backend='rpc://'  # 或者继续用数据库
)

# RabbitMQ 的优势
celery_app.conf.update(
    task_serializer='json',
    broker_connection_retry_on_startup=True,
    broker_transport_options={
        'master_name': 'rabbitmq-cluster',
        'fanout_prefix': True,
        'fanout_patterns': True,
    }
)

实际建议

架构演进路径:

阶段1:快速启动(适合小团队)

  1. ✅ Redis 单机 + 基础持久化
  2. ✅ 数据库存储任务结果
  3. ✅ 基础监控

阶段2:可靠性提升(适合成长期)

  1. ✅ Redis Sentinel 主从
  2. ✅ 队列分离和专用Worker
  3. ✅ Circuit Breaker 和限流

阶段3:企业级(适合大规模)

  1. ✅ RabbitMQ 集群
  2. ✅ 多数据中心部署
  3. ✅ 完整的可观测性体系

关键架构原则:

  • 📊 监控先行:没有监控就没有可靠性
  • 🔄 渐进式改进:不要一步到位,分阶段演进
  • 🎯 明确 SLA:99.9% 还是 99.99% 决定了架构复杂度
  • 💡 故障演练:定期测试故障场景和恢复流程

实际项目选型建议

文档处理系统场景

拿我们的文档处理系统(解析→分块→向量化→存储)来说,各个方案的适用性:

Celery 方案(现在在用)

# 优势:简单直观,Python 生态丰富
chain(
    parse_document_task.s(file_id),
    chunk_document_task.s(),
    generate_embeddings_task.s(),
    store_vectors_task.s()
)
  • ✅ 快速上手
  • ✅ AI/ML 库很丰富
  • ❌ Python GIL 限制性能
  • ❌ 内存用得比较多

Temporal 方案

// 优势:超级可靠,支持复杂流程
func DocumentWorkflow(ctx workflow.Context, fileID string) error {
    // 可以处理各种复杂情况:条件分支、重试策略、超时等
}
  • ✅ 企业级可靠性
  • ✅ 复杂流程支持很好
  • ❌ 学习成本高
  • ❌ 基础设施比较复杂

Go + Redis 方案

// 优势:性能高,资源占用少
client.Enqueue(asynq.NewTask("process_doc", payload))
  • ✅ 性能很好
  • ✅ 资源占用少
  • ❌ AI/ML 生态不如 Python
  • ❌ 需要写更多代码

总结:架构师视角的技术选型

从软件架构的角度看,每种技术方案都有其适用的业务场景和团队阶段:

技术选型矩阵

维度 Celery 传统MQ Go队列 Temporal
学习成本
运维复杂度
水平扩展性
状态一致性 最终一致 无状态 无状态 强一致
故障恢复 任务级 精确恢复
可观测性 中(Flower) 需自建 需自建 强(内置)

架构演进建议

Stage 1: MVP阶段(<10万任务/天)

  • 选择: Celery + Redis 单机
  • 理由: 快速上线,学习成本低
  • 风险: 单点故障,状态丢失

Stage 2: 成长阶段(10万-100万任务/天)

  • 选择: Celery + Redis Sentinel + 数据库后端
  • 理由: 高可用,状态持久化
  • 投入: 增加运维复杂度

Stage 3: 规模化阶段(100万+任务/天)

  • 选择:
    • 继续Celery但换RabbitMQ集群
    • 或迁移到Temporal处理复杂流程
  • 理由: 企业级可靠性,复杂业务支持

关键架构决策

1. 状态管理策略

# ❌ 错误:状态散布在各处
task_status = redis.get(f"task:{task_id}")
result = database.get_result(task_id)
progress = cache.get(f"progress:{task_id}")

# ✅ 正确:集中状态管理
class TaskStateManager:
    def get_task_state(self, task_id):
        # 统一状态查询入口
        pass

2. 失败处理模式

  • 快速失败: 轻量任务,失败后立即报警
  • 重试机制: 网络相关任务,指数退避重试
  • 补偿机制: 金融业务,记录所有操作支持回滚

3. 监控体系设计

# 关键指标监控
METRICS = {
    'queue_length': '队列堆积情况',
    'task_duration_p95': '任务处理延迟',
    'worker_health': 'Worker健康状态',
    'error_rate': '任务失败率',
    'throughput': '系统吞吐量'
}

选择的时候主要考虑这几个因素:团队能力、业务复杂度、可靠性需求、长期演进

对于大部分 Web 应用的后台任务处理,Celery 还是很好的起点。但如果你的业务流程很复杂,对可靠性要求很高,那 Temporal 值得好好研究一下。

记住一点:架构是演进的,不是一蹴而就的。从简单开始,随着业务复杂度提升再逐步演进,这才是务实的架构之道。

本文链接:https://note.lilonghe.net/post/time-consuming-task-processing-alternative-worker.html

-- EOF --