folder
folder
folder-config
folder
folder-plugin
folder-class
folder
readme

Dpz.Core.MessageQueue

基于 RabbitMQ(v7.x, RabbitMQ.Client)的消息队列基础设施库,提供强类型消息发布/消费、Outbox 可靠投递、分布式批次追踪等能力。

核心特性

  • 约定优于配置 — 按消息类型名自动推导 Exchange/Queue/RoutingKey,也支持 [MessageRoute] 特性显式覆盖
  • 类型安全 — 所有消息继承 MessageBase,处理器通过泛型 IMessageHandler<TMessage> / IMessageHandler<TMessage, TResult> 强类型绑定
  • Outbox 可靠投递 — 发布前先写 MongoDB Outbox 记录,后台服务兜底重试失败消息(发布追踪通过 AddMessageOutbox() 开启,补发 Worker 通过 AddMessageOutboxRetryWorker() 开启)
  • 分布式批次追踪IBatchTracker 基于 FusionCache + 分布式锁,可跨实例追踪批量消息的消费进度
  • 优雅断线重连 — 消费者内置指数退避重连(2^n up to 30s),Channel 断开自动恢复
  • 手动 ACK + 重试prefetchCount=1,Handler 返回 false 时通过重发消息体(而非 requeue)安全地持久化递增 RetryCount
  • 两套处理模型 — 简单 bool 返回值 + 带 MessageHandlerResult 泛型结果的处理模型
  • 完全 DI 集成 — 借助 RegisterInject 配置驱动 + AddMessageConsumer<TMessage, THandler> 显式注册

项目结构

Dpz.Core.MessageQueue/
├── Abstractions/               # 核心接口
│   ├── IMessagePublisher<T>      # 发布者
│   ├── IMessageHandler<T>        # 处理器 (bool)
│   ├── IMessageHandler<T,TResult># 处理器 (泛型结果)
│   ├── IMessageRoutingConvention # 路由约定
│   ├── IRabbitMQConnectionFactory# 连接工厂
│   ├── IMessageOutboxStore       # Outbox 存储
│   ├── IMessageOutboxRetryPublisher # Outbox 重发
│   └── IBatchTracker             # 批次追踪
├── RabbitMQ/                   # 实现
│   ├── RabbitMQPublisher<T>      # 核心发布者
│   ├── OutboxMessagePublisher<T> # Outbox 装饰器
│   ├── MessageOutboxRetryBackgroundService # Outbox 后台补发
│   ├── MessageOutboxRetryService # Outbox 补发执行器
│   ├── RabbitMQRawPublisher      # 原始 JSON 重发
│   ├── RabbitMQConnectionFactory # 连接/Channel 工厂
│   ├── RabbitMQConsumerBackgroundService<T>      # 消费者 (bool)
│   ├── RabbitMQConsumerBackgroundServiceWithResult<T,TResult> # 消费者 (结果)
│   ├── DefaultMessageRoutingConvention # 默认路由约定
│   ├── DistributedBatchTracker   # 分布式批次追踪
│   └── NullMessageOutboxStore/NullMessageOutboxRetryPublisher # 空实现
├── Models/
│   ├── MessageHandlerResult      # 处理结果基类 + 泛型
│   ├── MessageOutboxEntry        # Outbox 条目 DTO
│   ├── MessageOutboxRetryResult  # Outbox 补发结果
│   └── MessageOutboxRetryState   # FusionCache 补发状态
├── Enums/
│   ├── ExchangeType              # Direct/Topic/Fanout/Headers
│   └── MessageQueueBusinessType  # 业务类型 (Article)
├── Attributes/
│   └── MessageRouteAttribute     # 自定义路由
└── Extensions/
    └── MessageQueueServiceExtensions # DI 注册入口

依赖的外部库:DistributedLock.CoreZiggyCreatures.FusionCacheRabbitMQ.Client(v7.x)。

快速开始

1. 定义消息

消息类需继承 Dpz.Core.Entity.Base.MessageBase,按惯例放在 Dpz.Core.Public.ViewModel.Messages

public class NewsArticleMessage : MessageBase
{
    public required string Title { get; set; }
    public required string Markdown { get; set; }
    public required string From { get; set; }
    public required DateTime PublishTime { get; set; } = DateTime.Now;
    public required VmUserInfo Author { get; set; }
    public List<string> Tags { get; set; } = [];
    public string? Introduction { get; set; }
    public double AdWeight { get; set; }
}

自动推导的路由:

  • Exchange: dpz.news.exchange
  • Queue: dpz.news.article.queue
  • RoutingKey: news.article.#

2. 配置

{
  "RabbitMQ": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "AutomaticRecoveryEnabled": true
  }
}

3. 注册基础服务

// AddRabbitMQ 通常在基础设施层统一调用
// 见 Dpz.Core.Service/ServiceExtensions.cs
services.AddRabbitMQ(configuration);

注册的内容:

  • RabbitMQOptions 配置绑定
  • IRabbitMQConnectionFactoryRabbitMQConnectionFactory(Singleton)
  • IMessageRoutingConventionDefaultMessageRoutingConvention(Singleton)
  • IMessagePublisher<T>RabbitMQPublisher<T>(Singleton,开放泛型)
  • IMessageOutboxStoreNullMessageOutboxStore(Singleton,占位)
  • IMessageOutboxRetryPublisherNullMessageOutboxRetryPublisher(Singleton,占位)

4. 发布消息

public class NewsService(IMessagePublisher<NewsArticleMessage> publisher)
{
    public async Task PublishNewsAsync()
    {
        var message = new NewsArticleMessage
        {
            Title = "标题",
            Markdown = "正文",
            From = "https://...",
            PublishTime = DateTime.Now,
            Author = new VmUserInfo { ... },
            Source = "NewsService"
        };
        await _publisher.PublishAsync(message);
    }
}

5. 创建处理器

public class NewsArticleMessageHandler(
    IArticleService articleService,
    ILogger<NewsArticleMessageHandler> logger
) : IMessageHandler<NewsArticleMessage>
{
    public async Task<bool> HandleAsync(
        NewsArticleMessage message,
        CancellationToken cancellationToken = default
    )
    {
        try
        {
            var noExists = await articleService.NoExistsByFromAsync(
                [message.From], cancellationToken);
            if (noExists.Count == 0)
            {
                logger.LogInformation("新闻已存在,跳过: {From}", message.From);
                return true;
            }

            // 业务处理...
            return true;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "处理失败,将重试");
            return false;
        }
    }
}

6. 注册消费者

// 在目标应用层的 DI 配置中注册
services.AddMessageConsumer<NewsArticleMessage, NewsArticleMessageHandler>();

这会注册:

  • IMessageHandler<NewsArticleMessage>NewsArticleMessageHandler(Scoped)
  • IHostedServiceRabbitMQConsumerBackgroundService<NewsArticleMessage>(Singleton)

应用启动后自动开始消费。

消息定义与项目分布

消息实体统一位于 Dpz.Core.Public.ViewModel.Messages/(而非 MessageQueue 项目内),消费者分布在三个入口应用:

应用消息处理器功能
Dpz.Core.WebClearCacheMessageClearCacheMessageHandler清除缓存
Dpz.Core.WebApiNewsArticleMessageNewsArticleMessageHandler保存爬虫文章
BatchCompletionMessageBatchCompletionMessageHandler批次完成 → 清缓存
Dpz.Core.Web.JobsDeleteMarkdownMessageDeleteMarkdownHandler删除 Markdown
RemoveImagesMessageRemoveImagesHandler删除图片
AddCommentCountMessageAddCommentCountHandler更新评论数
AddDanmakuCountMessageAddDanmakuCountHandler更新弹幕数
AnalyzeCodeMessageAnalyzeCodeHandler代码分析
SendMasterEmailMessageSendMasterEmailHandler发信给站长
SendReplyEmailMessageSendReplyEmailHandler发回复通知
SendEmailVerifyCodeMessageSendEmailVerifyCodeHandler发验证码

高级用法

自定义路由([MessageRoute]

[MessageRoute(
    ExchangeName = "dpz.system.exchange",
    QueueName = "dpz.system.batch.queue",
    RoutingKey = "system.batch.completion"
)]
public class BatchCompletionMessage : MessageBase { ... }

支持设置 ExchangeType(默认 Topic)。

自定义路由约定

public class MyRoutingConvention : IMessageRoutingConvention
{
    public string GetExchangeName<TMessage>() => $"my.{typeof(TMessage).Name.ToLower()}.exchange";
    public string GetQueueName<TMessage>() => $"my.{typeof(TMessage).Name.ToLower()}.queue";
    public string GetRoutingKey<TMessage>() => typeof(TMessage).Name.ToLower();
    public ExchangeType GetExchangeType<TMessage>() => ExchangeType.Topic;
}

// 替换默认约定
services.AddCustomRoutingConvention<MyRoutingConvention>();

批量发布

await publisher.PublishBatchAsync(messages, routingKey: "batch.route");

带结果的处理器

// 定义结果类型
public class MyResult : MessageHandlerResult
{
    public string Output { get; set; }
}

// 处理器
public class MyHandler : IMessageHandler<MyMessage, MyResult>
{
    public async Task<MyResult> HandleAsync(MyMessage message, CancellationToken ct)
    {
        // ...
        return MessageHandlerResult<string>.Ok("done");
    }
}

// 注册时需指定 TResult
services.AddMessageConsumer<MyMessage, MyHandler, MyResult>();

Outbox 可靠投递

默认发布路径直接写入 RabbitMQ,在网络抖动或 Broker 不可用时消息会丢失。启用 Outbox 模式后,每条消息先写入 MongoDB,再发送到 RabbitMQ,发送失败由后台 BackgroundService 兜底重试。

启用 Outbox

// 在 AddRabbitMQ 之后调用
services.AddMessageOutbox();

// 在负责补发的宿主应用中启用后台重试
services.AddMessageOutboxRetryWorker();

原理

发布流程:
  生产者 → OutboxMessagePublisher<T>
           ├─ 1. IMessageOutboxStore.CreateAsync()   写入 MongoDB (Pending)
           ├─ 2. inner.PublishAsync()                发送 RabbitMQ
           │   ├─ 成功 → MarkSentAsync()             状态 → Sent
           │   └─ 失败 → MarkPublishFailedAsync()     状态 → PublishFailed
           └─ 3. 抛出异常给调用方

重试 (BackgroundService, 默认每分钟):
  MessageOutboxRetryBackgroundService
    → GetPendingPublishRetryAsync(50)
    → RabbitMQRawPublisher.PublishRawAsync()  (跳过 Outbox 装饰器)
    → MarkSentAsync()
    → FusionCache 记录本轮完成时间

消费侧:
  消费者成功 → TryMarkConsumedAsync()  (状态 → Consumed)
  消费者超限失败 → TryMarkConsumeFailedAsync() (状态 → ConsumeFailed)

消费重试:
  MessageOutboxRetryBackgroundService
    → GetPendingConsumeRetryAsync(50)
    → 重新发布到队列 → 消费者重新处理
    → FusionCache 记录本轮完成时间

实现参考:Dpz.Core.Service/MessageOutboxExtensions.csDpz.Core.Service/RepositoryServiceImpl/MongoMessageOutboxStore.csDpz.Core.MessageQueue/RabbitMQ/MessageOutboxRetryBackgroundService.cs

Scoped 适配器

IMessageOutboxStore 注册为 Singleton,但 MongoMessageOutboxStore 是 Scoped(通过 RegisterInject 扫描)。ScopedOutboxStoreAdapter 桥接两者,每次操作创建独立 Scope,避免 captive dependency。

分布式批次追踪

适用于批量导入场景,每条消息携带相同的 BatchId,消费者处理完后通过 IBatchTracker.RecordMessageConsumedAsync 记录进度。所有消息处理完后发送 BatchCompletionMessage 触发后续动作。

实现

DistributedBatchTracker 基于:

  • IFusionCache(ZiggyCreatures.FusionCache)— 存储进度计数 + 已消费消息 ID 集合
  • IDistributedLockProvider(DistributedLock)— 保护并发写入
  • 指数退避重试 — 获取锁失败时 50ms→800ms 重试,最多 5 次

使用场景 — 新闻爬虫

爬虫 (Web.Jobs)
 ├─ 生成 BatchId  (guid)
 ├─ 发布 N 条 NewsArticleMessage (每条携带 BatchId)
 └─ 发布 BatchCompletionMessage { TotalMessages = N, BatchId }

消费者 (WebApi)
 ├─ NewsArticleMessageHandler.RecordMessageConsumedAsync(batchId, msgId)
 └─ BatchCompletionMessageHandler
     ├─ WaitForBatchCompletionAsync: 自适应轮询 (前30s 500ms → 之后 3s)
     ├─ 超时 120s + 进度停滞检测 (5次无变化则警告)
     └─ 完成后发布 ClearCacheMessage + CleanupBatchAsync

消息处理机制

重试策略

不同于传统 BasicNack(requeue: true)(进程内重试次数会随进程重启丢失),本项目采用 Republish 模式

Handler 返回 false
  → message.RetryCount++
  → channel.BasicPublish(更新后的消息体)
  → channel.BasicAck(原消息)
  → 新消息被投递(可能到其他实例)

超限 (RetryCount > 3):
  → channel.BasicNack(requeue: false)  // 丢弃
  → TryMarkConsumeFailedAsync()        // 记录到 Outbox

关键代码见 RabbitMQConsumerBackgroundService.cs:44-88

断线重连

  • 自动恢复:RabbitMQConnectionFactory 启用 AutomaticRecoveryEnabled + NetworkRecoveryInterval
  • 消费者强制重连:监听 ChannelShutdownAsyncTaskCompletionSource 触发外层循环 → 指数退避至 30s
  • 发布者:异常发生时清理 Channel(CleanupChannelAsync),下次调用自动重建

QoS

  • prefetchCount: 1 — 每个消费者同时处理 1 条消息,快的实例自动多处理
  • global: false — 仅限当前 Channel

配置选项

RabbitMQOptions 完整字段(对应 JSON 节 "RabbitMQ"):

{
  "RabbitMQ": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "RequestedConnectionTimeout": 30,
    "RequestedHeartbeat": 60,
    "AutomaticRecoveryEnabled": true,
    "NetworkRecoveryInterval": 5
  }
}

日志级别建议:

{
  "Logging": {
    "LogLevel": {
      "Dpz.Core.MessageQueue": "Information"
    }
  }
}

与通用 MQ 库的差异

维度本项目通用 MQ 库 (如 MassTransit, EasyNetQ)
消息定义继承 MessageBase,POCO 类通常需实现特定接口 / 继承基类
路由策略约定 + [MessageRoute] 特性消息类型 → 自动映射,显式配置端点
消费者注册AddMessageConsumer<TMsg, THandler>依赖容器扫描 / 总线配置
Outbox自研 MongoDB Outbox + 装饰器模式内置 EF Core / NHibernate Outbox
错误重试Republish 模式(重发新消息体)通常重试过滤器 / 错误队列 + 重试策略
批量追踪自研分布式缓存 + 锁无标准支持
上下文.NET 10 + RabbitMQ.Client v7支持多 transports (RMQ, SQS, Azure SB)
调度无内置调度,借助 Hangfire内置 Quartz / 延迟调度

优势与劣势

优势

  • 轻量无过度抽象 — 直接使用 RabbitMQ.Client,无中间层总线概念,行为可预测
  • Outbox 开箱可配 — 本质是装饰器模式,启用/关闭无需改业务代码
  • 批次进度追踪 — 适应本项目批量爬虫场景,通用 MQ 库无此能力
  • Republish 重试 — 相比 requeue,进程重启后 RetryCount 仍保留在消息体中
  • 类型安全 + DI — 强类型处理器,自动 lifetime 管理

劣势

  • 单一 Transport — 仅支持 RabbitMQ,无法切换(但本项目不需要多 Transport)
  • 无死信队列内置 — 超出重试次数后消息直接丢弃,未内置 DLX/DLQ(可手动配置)
  • 无延时消息 — RabbitMQ 原生不直接支持,可通过 TTL+DLX 模拟(本项目未实现)
  • 无 Saga / 编排 — 不提供分布式事务协调
  • 无内置调度 — 定时/延迟发布需借助 Hangfire

架构图

基础架构

┌─────────────────────────┐     ┌─────────────────────────┐
│  生产者 (任意应用)       │     │  消费者 (Web/WebApi/    │
│  IMessagePublisher<T>   │     │       Web.Jobs)          │
│  RabbitMQPublisher<T>   │     │  RabbitMQConsumer        │
│  (可选 Outbox decorator)│     │  BackgroundService      │
└───────────┬─────────────┘     └──────────────┬──────────┘
            │                                   ▲
            │ BasicPublish                      │ BasicConsume
            ▼                                   │ autoAck=false
    ┌────────────────┐                          │
    │   RabbitMQ     │──────────────────────────┘
    │ Exchange/Queue │   推送消息
    └────────────────┘

消息生命周期(含 Outbox)

sequenceDiagram
    participant App as 业务代码
    participant Pub as IMessagePublisher
    participant Outbox as OutboxMessagePublisher
    participant Store as Outbox Store (Mongo)
    participant RMQ as RabbitMQ
    participant Worker as Outbox Retry Worker
    participant Cons as 消费者
    participant Handler as IMessageHandler

    alt 未启用 Outbox
        App->>Pub: PublishAsync(msg)
        Pub->>RMQ: BasicPublish
    else 启用 Outbox
        App->>Pub: PublishAsync(msg)
        Pub->>Outbox: 委托
        Outbox->>Store: CreateAsync (Pending)
        Outbox->>RMQ: BasicPublish
        alt 成功
            Outbox->>Store: MarkSentAsync (Sent)
            Outbox-->>App: OK
        else 失败
            Outbox->>Store: MarkPublishFailedAsync
            Outbox-->>App: throw
            Note over Worker: 默认每分钟触发
            Worker->>Store: GetPendingPublishRetryAsync
            Worker->>RMQ: PublishRawAsync (跳过Outbox)
            Worker->>Store: MarkSentAsync
        end
    end

    RMQ->>Cons: 推送消息
    Cons->>Handler: HandleAsync
    alt 成功
        Cons->>RMQ: BasicAck
        Cons->>Store: MarkConsumedAsync (可选)
    else 失败 (RetryCount < 3)
        Cons->>RMQ: BasicPublish (RetryCount+1)
        Cons->>RMQ: BasicAck (原消息)
    else 失败 (RetryCount >= 3)
        Cons->>RMQ: BasicNack (丢弃)
        Cons->>Store: MarkConsumeFailedAsync
        Note over Worker: 消费重试
        Worker->>Store: GetPendingConsumeRetryAsync
        Worker->>RMQ: PublishRawAsync (重新入队)
    end

全量消息流(新闻爬虫场景)

sequenceDiagram
    participant HF as Hangfire
    participant Crawler as ItHomeActivator
    participant Pub as RabbitMQPublisher
    participant RMQ as RabbitMQ
    participant Cons as Consumer (WebApi)
    participant Handler as NewsArticleMessageHandler
    participant Tracker as IBatchTracker
    participant Srv as ArticleService
    participant DB as MongoDB

    HF->>Crawler: 每3小时触发
    Crawler->>Crawler: 抓取RSS、解析、过滤
    loop 每篇文章
        Crawler->>Crawler: 下载HTML、转Markdown、上传图片
        Crawler->>Pub: PublishAsync(msg with BatchId)
        Pub->>RMQ: BasicPublish
    end
    Crawler->>Pub: PublishAsync(BatchCompletionMessage)

    RMQ->>Cons: 推送 NewsArticleMessage
    Cons->>Handler: HandleAsync
    Handler->>Srv: NoExistsByFromAsync
    alt 已存在
        Handler->>Tracker: RecordMessageConsumedAsync
        Handler-->>Cons: true
    else 新文章
        Handler->>Srv: CreateArticleAsync
        Srv->>DB: Insert
        Handler->>Tracker: RecordMessageConsumedAsync
        Handler-->>Cons: true
    end
    Cons->>RMQ: BasicAck

    Note over Tracker: 所有消息消费完毕
    RMQ->>Cons: 推送 BatchCompletionMessage
    Cons->>Handler: HandleAsync
    Handler->>Tracker: WaitForBatchCompletionAsync
    Handler->>Pub: PublishAsync(ClearCacheMessage)
    Handler->>Tracker: CleanupBatchAsync

多实例竞争消费

graph TB
    subgraph "RabbitMQ Queue"
        Q[dpz.news.article.queue]
    end

    subgraph "消费者集群 (Docker/K8s)"
        C1[WebApi Pod 1<br/>prefetchCount=1]
        C2[WebApi Pod 2<br/>prefetchCount=1]
        C3[WebApi Pod 3<br/>prefetchCount=1]
    end

    subgraph "MongoDB 副本集"
        DB[(MongoDB)]
    end

    Q -->|轮询分发| C1
    Q -->|轮询分发| C2
    Q -->|轮询分发| C3
    C1 --> DB
    C2 --> DB
    C3 --> DB

常见问题

Q1: 如何确保消息不丢失? A: 三重保证:持久化消息(Persistent=true) + 持久化 Exchange/Queue(durable: true) + 手动 ACK(autoAck: false)。可选启用 Outbox 模式增加 MongoDB 兜底。

Q2: 消息处理失败如何重试? A: Handler 返回 false 时,消费者将消息体序列化后重新发布(RepublishForRetryAsync),RetryCount 递增。原消息执行 BasicAck 确认。超过 3 次后 BasicNack(requeue: false) 丢弃。

Q3: 多消费者实例如何分配消息? A: RabbitMQ 轮询分发(Round-robin),配合 prefetchCount=1 确保一次只处理 1 条,快的实例自动处理更多。

Q4: 如何实现 Outbox? A: 调用 services.AddMessageOutbox() 启用发布路径,调用 services.AddMessageOutboxRetryWorker() 在负责补发的宿主应用中启用后台重试。需要 MongoDB 中实现了 IMongoMessageOutboxStore(Scoped,通过 RegisterInject 扫描注册)。后台服务默认每分钟执行一次,并通过 FusionCache 记录上次补发完成时间。

Q5: 消息到达最大重试次数后会怎样? A: 调用 BasicNack(requeue: false),消息被 Broker 丢弃。不会自动进入死信队列。如需死信,需在 RabbitMQ 侧为队列配置 x-dead-letter-exchange

Q6: IBatchTracker 需要什么基础设施? A: 需要 FusionCache(IFusionCache) + IDistributedLockProvider。生产环境推荐基于 Redis 的分布式缓存和锁。

Q7: 为什么重试用 Republish 而不是 requeue? A: BasicNack(requeue: true) 会重置消息到队列头部,但 RetryCount 存储在内存中,进程重启后归零。Republish 将 RetryCount 持久化在消息体内,跨进程/跨重启仍能正确判断。

Q8: 消息消费后 Outbox 状态何时更新? A: 消费者在 BasicAck 后调用 TryMarkConsumedAsync(忽略异常,不阻塞流程)。启用 Outbox 后此步骤可选生效。

Q9: AddRabbitMQ 已在公共基础层注册,各应用层只需 AddMessageConsumer 吗? A: 是的。Dpz.Core.Service/ServiceExtensions.cs 已调用 AddRabbitMQ,各项目只需在 DI 配置中调用 AddMessageConsumer 注册具体消费者。

Q10: 生产者和消费者是否必须在同一应用中? A: 不必。生产者可在 Web.Jobs(爬虫),消费者在 WebApi。只需连接到同一 RabbitMQ 实例。

Q11: Redis / 分布式锁不可用时会怎样? A: DistributedBatchTracker 会抛出异常。IBatchTracker 仅在调用 AddBatchTracking() 的应用中启用(目前只有 WebApi)。

跨语言通信

底层传输基于 RabbitMQ (AMQP 0-9-1),序列化使用 JSON,这两者都是语言无关的。任何语言的 RabbitMQ 客户端只要遵循下列规范,即可与本组件互通。

消息信封 (Message Envelope)

每条消息的 JSON 结构包含两层:信封字段MessageBase)+ 业务字段(消息自身属性)。

信封字段(必填)

字段类型必填说明
messageIdstring唯一标识,推荐 GUID 字符串
timestampstring (ISO 8601 UTC)消息创建时间,如 "2025-01-01T00:00:00Z"
sourcestring来源标识,默认 ""
batchIdstring | null批次追踪 ID,单条消息传 null
retryCountnumber重试次数,首次发送填 0

JSON 示例(NewsArticleMessage

{
  "messageId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "timestamp": "2025-03-15T08:30:00Z",
  "source": "CrawlerService",
  "batchId": "batch-001",
  "retryCount": 0,
  "title": "新闻标题",
  "markdown": "Markdown 正文",
  "from": "https://example.com/news/123",
  "publishTime": "2025-03-15T08:00:00Z",
  "author": { "id": "u1", "name": "作者" },
  "tags": ["tech", "ai"],
  "introduction": "摘要",
  "adWeight": 0.5
}

AMQP 消息属性

发布消息时必须设置以下 BasicProperties

属性说明
ContentTypeapplication/json消息体格式
DeliveryMode2 (Persistent)持久化到磁盘
MessageIdmessageId 字段AMQP 消息 ID
TimestampUnix 秒级时间戳AMQP 时间戳
Headers(可选)可携带额外元数据

路由命名规则

路由由消息类型名(去掉 Message 后缀的 PascalCase 类名)自动推导,规则如下:

  1. 去掉 Message 后缀(如 NewsArticleMessageNewsArticle
  2. 大写字母拆词(NewsArticle["News", "Article"]
  3. 第一个词为模块名(moduleName = "news"
  4. 剩余词. 拼接为类型名(typeSuffix = "article"
  5. 全部转为小写

最终路由:

Exchange:    dpz.{moduleName}.exchange
Queue:       dpz.{moduleName}.{typeSuffix}.queue
RoutingKey:  {moduleName}.{typeSuffix}.#
ExchangeType: topic

对照表

消息类型名ExchangeQueueRoutingKey
NewsArticleMessagedpz.news.exchangedpz.news.article.queuenews.article.#
ClearCacheMessagedpz.clear.exchangedpz.clear.cache.queueclear.cache.#
AddCommentCountMessagedpz.add.exchangedpz.add.comment.count.queueadd.comment.count.#
DeleteMarkdownMessagedpz.delete.exchangedpz.delete.markdown.queuedelete.markdown.#
SendReplyEmailMessagedpz.send.exchangedpz.send.reply.email.queuesend.reply.email.#
BatchCompletionMessagedpz.batch.exchangedpz.batch.completion.queuebatch.completion.#

注意:如果某个消息在 .NET 侧使用了 [MessageRoute] 特性自定义路由,则以其配置为准,而非上述推导规则。跨语言服务可直接使用该显式声明的 exchange/queue/routingKey。

伪代码实现

function getRoute(className):
    name = className.replace(/Message$/i, "")    // 去掉 Message 后缀
    words = splitByUpperCase(name)                // 按大写字母拆词
    module = words[0].toLower()                   // 第一个词 → 模块
    type   = words[1..].join(".").toLower()       // 其余词 → 类型

    exchange    = "dpz." + module + ".exchange"
    queue       = "dpz." + module + "." + type + ".queue"
    routingKey  = module + "." + type + ".#"

    return { exchange, queue, routingKey }

消费端约定

队列声明

  • Exchange: topic(默认,或对应 [MessageRoute] 声明的类型)
  • Queue: durable,绑定到 Exchange,routingKey 匹配 {module}.{type}.#
  • autoAck = falseprefetchCount = 1

处理成功

  • 调用 BasicAck(deliveryTag, false)

处理失败(重试)

  1. 读取消息体中的 retryCount 字段
  2. retryCount ≤ 3:将 retryCount + 1,更新消息体,重新发布到同一 exchange/routingKey,然后 BasicAck 原消息(不是 requeue,而是发一条新消息)
  3. retryCount > 3BasicNack(deliveryTag, false, requeue: false) 丢弃消息

关键点:重试采用的是 Republish 模式(重新发布一条 retryCount+1 的新消息)而非 BasicNack(requeue: true)。这确保了 retryCount 持久化在消息体中,即使消费者重启也能正确计数。

反序列化失败

  • 直接 BasicNack(deliveryTag, false, requeue: false) 丢弃

批次追踪(可选)

  • 如果消息包含 batchId,消费成功后可向批次追踪存储记录进度。其他语言实现时,可忽略此特性或自行实现。

各语言接入示例

Java (amqp-client + Jackson)

// 构造消息
Map<String, Object> message = new LinkedHashMap<>();
message.put("messageId", UUID.randomUUID().toString());
message.put("timestamp", Instant.now().toString());
message.put("source", "JavaService");
message.put("batchId", null);
message.put("retryCount", 0);
message.put("title", "Hello from Java");
message.put("markdown", "...");

String json = new ObjectMapper().writeValueAsString(message);

// 发布
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .contentType("application/json")
    .deliveryMode(2)
    .messageId((String) message.get("messageId"))
    .timestamp(new Date())
    .build();

channel.basicPublish("dpz.news.exchange", "news.article.#", props, json.getBytes(StandardCharsets.UTF_8));
// 消费
channel.basicConsume("dpz.news.article.queue", false, (consumerTag, delivery) -> {
    String json = new String(delivery.getBody(), StandardCharsets.UTF_8);
    Map<String, Object> msg = new ObjectMapper().readValue(json, Map.class);
    try {
        // 业务处理...
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        int retryCount = (int) msg.getOrDefault("retryCount", 0);
        if (retryCount < 3) {
            msg.put("retryCount", retryCount + 1);
            String retryJson = new ObjectMapper().writeValueAsString(msg);
            channel.basicPublish("dpz.news.exchange", "news.article.#",
                delivery.getProperties(), retryJson.getBytes(StandardCharsets.UTF_8));
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        } else {
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        }
    }
}, consumerTag -> {});

Python (pika)

import json, uuid
from datetime import datetime, timezone
import pika

message = {
    "messageId": str(uuid.uuid4()),
    "timestamp": datetime.now(timezone.utc).isoformat(),
    "source": "PythonService",
    "batchId": None,
    "retryCount": 0,
    "title": "Hello from Python",
    "markdown": "..."
}
body = json.dumps(message).encode("utf-8")

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_publish(
    exchange="dpz.news.exchange",
    routing_key="news.article.#",
    body=body,
    properties=pika.BasicProperties(
        content_type="application/json",
        delivery_mode=2,
        message_id=message["messageId"],
        timestamp=int(datetime.now(timezone.utc).timestamp())
    )
)
connection.close()
# 消费
def on_message(ch, method, properties, body):
    msg = json.loads(body)
    try:
        # 业务处理...
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        retry = msg.get("retryCount", 0)
        if retry < 3:
            msg["retryCount"] = retry + 1
            ch.basic_publish(
                exchange=method.exchange,
                routing_key=method.routing_key,
                body=json.dumps(msg).encode("utf-8"),
                properties=properties
            )
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

channel.basic_consume(queue="dpz.news.article.queue", on_message_callback=on_message, auto_ack=False)
channel.start_consuming()

Go (amqp091-go)

import (
    "encoding/json"
    "time"
    amqp "github.com/rabbitmq/amqp091-go"
    "github.com/google/uuid"
)

type Envelope struct {
    MessageID  string  `json:"messageId"`
    Timestamp  string  `json:"timestamp"`
    Source     string  `json:"source"`
    BatchID    *string `json:"batchId"`
    RetryCount int     `json:"retryCount"`
    Title      string  `json:"title"`
    Markdown   string  `json:"markdown"`
}

msg := Envelope{
    MessageID: uuid.New().String(),
    Timestamp: time.Now().UTC().Format(time.RFC3339),
    Source:    "GoService",
    RetryCount: 0,
    Title:     "Hello from Go",
    Markdown:  "...",
}
body, _ := json.Marshal(msg)

ch.PublishWithContext(ctx,
    "dpz.news.exchange",  // exchange
    "news.article.#",     // routing key
    false, false,
    amqp.Publishing{
        ContentType:  "application/json",
        DeliveryMode: amqp.Persistent,
        MessageId:    msg.MessageID,
        Timestamp:    time.Now(),
        Body:         body,
    },
)
// 消费
msgs, _ := ch.Consume("dpz.news.article.queue", "", false, false, false, false, nil)
for d := range msgs {
    var msg Envelope
    json.Unmarshal(d.Body, &msg)
    err := process(msg)
    if err == nil {
        d.Ack(false)
    } else if msg.RetryCount < 3 {
        msg.RetryCount++
        retryBody, _ := json.Marshal(msg)
        ch.PublishWithContext(ctx, d.Exchange, d.RoutingKey, false, false,
            amqp.Publishing{
                ContentType:  "application/json",
                DeliveryMode: amqp.Persistent,
                MessageId:    msg.MessageID,
                Body:         retryBody,
            })
        d.Ack(false)
    } else {
        d.Nack(false, false)
    }
}

Node.js (amqplib)

const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');

const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();

const message = {
    messageId: uuidv4(),
    timestamp: new Date().toISOString(),
    source: 'NodeService',
    batchId: null,
    retryCount: 0,
    title: 'Hello from Node.js',
    markdown: '...'
};
const body = Buffer.from(JSON.stringify(message));

ch.publish('dpz.news.exchange', 'news.article.#', body, {
    contentType: 'application/json',
    persistent: true,
    messageId: message.messageId,
    timestamp: Math.floor(Date.now() / 1000)
});
// 消费
ch.consume('dpz.news.article.queue', (msg) => {
    if (!msg) return;
    const data = JSON.parse(msg.content.toString());
    try {
        // 业务处理...
        ch.ack(msg);
    } catch (err) {
        if (data.retryCount < 3) {
            data.retryCount++;
            ch.publish(msg.fields.exchange, msg.fields.routingKey,
                Buffer.from(JSON.stringify(data)), msg.properties);
            ch.ack(msg);
        } else {
            ch.nack(msg, false, false);
        }
    }
}, { noAck: false });

PHP (php-amqplib)

<?php

require __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 构造消息
$message = [
    'messageId'  => bin2hex(random_bytes(16)),  // 或使用 ramsey/uuid
    'timestamp'  => gmdate('Y-m-d\TH:i:s\Z'),
    'source'     => 'PhpService',
    'batchId'    => null,
    'retryCount' => 0,
    'title'      => 'Hello from PHP',
    'markdown'   => '...',
];
$body = json_encode($message, JSON_UNESCAPED_UNICODE);

// 发布
$props = [
    'content_type'  => 'application/json',
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'message_id'    => $message['messageId'],
    'timestamp'     => time(),
];
$msg = new AMQPMessage($body, $props);
$channel->basic_publish($msg, 'dpz.news.exchange', 'news.article.#');

$channel->close();
$connection->close();
<?php
// 消费

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

$callback = function (AMQPMessage $msg) use ($channel) {
    $data = json_decode($msg->getBody(), true);
    if ($data === null) {
        $msg->nack(false);  // JSON 解析失败,丢弃
        return;
    }
    try {
        // 业务处理...
        $msg->ack();
    } catch (\Throwable $e) {
        $retryCount = $data['retryCount'] ?? 0;
        if ($retryCount < 3) {
            $data['retryCount'] = $retryCount + 1;
            $retryBody = json_encode($data, JSON_UNESCAPED_UNICODE);
            $retryMsg = new AMQPMessage($retryBody, [
                'content_type'  => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id'    => $data['messageId'],
            ]);
            $channel->basic_publish($retryMsg, $msg->getExchange(), $msg->getRoutingKey());
            $msg->ack();
        } else {
            $msg->nack(false);  // requeue = false,丢弃
        }
    }
};

$channel->basic_consume(
    'dpz.news.article.queue',  // queue
    '',                         // consumer tag
    false,                      // no_local
    false,                      // no_ack
    false,                      // exclusive
    false,                      // nowait
    $callback
);

while ($channel->is_consuming()) {
    $channel->wait();
}

许可证

MIT License

评论加载中...