Dpz.Core.MessageQueue
基于 RabbitMQ(v7.x, RabbitMQ.Client)的消息队列基础设施库,提供强类型消息发布/消费、Outbox 可靠投递、分布式批次追踪等能力。
核心特性
- 约定优于配置 — 按消息类型名自动推导 Exchange/Queue/RoutingKey,也支持
[MessageRoute]特性显式覆盖 - 类型安全 — 所有消息继承
MessageBase,处理器通过泛型IMessageHandler<TMessage>/IMessageHandler<TMessage, TResult>强类型绑定 - Outbox 可靠投递 — 发布前先写 MongoDB Outbox 记录,后台服务兜底重试失败消息(发布追踪通过
AddMessageOutbox()开启,补发 Worker 通过AddMessageOutboxRetryWorker()开启) - 分布式批次追踪 —
IBatchTracker基于 FusionCache + 分布式锁,可跨实例追踪批量消息的消费进度 - 优雅断线重连 — 消费者内置指数退避重连(
2^nup to 30s),Channel 断开自动恢复 - 手动 ACK + 重试 —
prefetchCount=1,Handler 返回false时通过重发消息体(而非 requeue)安全地持久化递增RetryCount - 两套处理模型 — 简单
bool返回值 + 带MessageHandlerResult泛型结果的处理模型 - 完全 DI 集成 — 借助
RegisterInject配置驱动 +AddMessageConsumer<TMessage, THandler>显式注册
项目结构
Dpz.Core.MessageQueue/
├── Abstractions/ # 核心接口
│ ├── IMessagePublisher<T> # 发布者
│ ├── IMessageHandler<T> # 处理器 (bool)
│ ├── IMessageHandler<T,TResult># 处理器 (泛型结果)
│ ├── IMessageRoutingConvention # 路由约定
│ ├── IRabbitMQConnectionFactory# 连接工厂
│ ├── IMessageOutboxStore # Outbox 存储
│ ├── IMessageOutboxRetryPublisher # Outbox 重发
│ └── IBatchTracker # 批次追踪
├── RabbitMQ/ # 实现
│ ├── RabbitMQPublisher<T> # 核心发布者
│ ├── OutboxMessagePublisher<T> # Outbox 装饰器
│ ├── MessageOutboxRetryBackgroundService # Outbox 后台补发
│ ├── MessageOutboxRetryService # Outbox 补发执行器
│ ├── RabbitMQRawPublisher # 原始 JSON 重发
│ ├── RabbitMQConnectionFactory # 连接/Channel 工厂
│ ├── RabbitMQConsumerBackgroundService<T> # 消费者 (bool)
│ ├── RabbitMQConsumerBackgroundServiceWithResult<T,TResult> # 消费者 (结果)
│ ├── DefaultMessageRoutingConvention # 默认路由约定
│ ├── DistributedBatchTracker # 分布式批次追踪
│ └── NullMessageOutboxStore/NullMessageOutboxRetryPublisher # 空实现
├── Models/
│ ├── MessageHandlerResult # 处理结果基类 + 泛型
│ ├── MessageOutboxEntry # Outbox 条目 DTO
│ ├── MessageOutboxRetryResult # Outbox 补发结果
│ └── MessageOutboxRetryState # FusionCache 补发状态
├── Enums/
│ ├── ExchangeType # Direct/Topic/Fanout/Headers
│ └── MessageQueueBusinessType # 业务类型 (Article)
├── Attributes/
│ └── MessageRouteAttribute # 自定义路由
└── Extensions/
└── MessageQueueServiceExtensions # DI 注册入口
依赖的外部库:DistributedLock.Core、ZiggyCreatures.FusionCache、RabbitMQ.Client(v7.x)。
快速开始
1. 定义消息
消息类需继承 Dpz.Core.Entity.Base.MessageBase,按惯例放在 Dpz.Core.Public.ViewModel.Messages:
public class NewsArticleMessage : MessageBase
{
public required string Title { get; set; }
public required string Markdown { get; set; }
public required string From { get; set; }
public required DateTime PublishTime { get; set; } = DateTime.Now;
public required VmUserInfo Author { get; set; }
public List<string> Tags { get; set; } = [];
public string? Introduction { get; set; }
public double AdWeight { get; set; }
}
自动推导的路由:
- Exchange:
dpz.news.exchange - Queue:
dpz.news.article.queue - RoutingKey:
news.article.#
2. 配置
{
"RabbitMQ": {
"HostName": "localhost",
"Port": 5672,
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/",
"AutomaticRecoveryEnabled": true
}
}
3. 注册基础服务
// AddRabbitMQ 通常在基础设施层统一调用
// 见 Dpz.Core.Service/ServiceExtensions.cs
services.AddRabbitMQ(configuration);
注册的内容:
RabbitMQOptions配置绑定IRabbitMQConnectionFactory→RabbitMQConnectionFactory(Singleton)IMessageRoutingConvention→DefaultMessageRoutingConvention(Singleton)IMessagePublisher<T>→RabbitMQPublisher<T>(Singleton,开放泛型)IMessageOutboxStore→NullMessageOutboxStore(Singleton,占位)IMessageOutboxRetryPublisher→NullMessageOutboxRetryPublisher(Singleton,占位)
4. 发布消息
public class NewsService(IMessagePublisher<NewsArticleMessage> publisher)
{
public async Task PublishNewsAsync()
{
var message = new NewsArticleMessage
{
Title = "标题",
Markdown = "正文",
From = "https://...",
PublishTime = DateTime.Now,
Author = new VmUserInfo { ... },
Source = "NewsService"
};
await _publisher.PublishAsync(message);
}
}
5. 创建处理器
public class NewsArticleMessageHandler(
IArticleService articleService,
ILogger<NewsArticleMessageHandler> logger
) : IMessageHandler<NewsArticleMessage>
{
public async Task<bool> HandleAsync(
NewsArticleMessage message,
CancellationToken cancellationToken = default
)
{
try
{
var noExists = await articleService.NoExistsByFromAsync(
[message.From], cancellationToken);
if (noExists.Count == 0)
{
logger.LogInformation("新闻已存在,跳过: {From}", message.From);
return true;
}
// 业务处理...
return true;
}
catch (Exception ex)
{
logger.LogError(ex, "处理失败,将重试");
return false;
}
}
}
6. 注册消费者
// 在目标应用层的 DI 配置中注册
services.AddMessageConsumer<NewsArticleMessage, NewsArticleMessageHandler>();
这会注册:
IMessageHandler<NewsArticleMessage>→NewsArticleMessageHandler(Scoped)IHostedService→RabbitMQConsumerBackgroundService<NewsArticleMessage>(Singleton)
应用启动后自动开始消费。
消息定义与项目分布
消息实体统一位于 Dpz.Core.Public.ViewModel.Messages/(而非 MessageQueue 项目内),消费者分布在三个入口应用:
| 应用 | 消息 | 处理器 | 功能 |
|---|---|---|---|
Dpz.Core.Web | ClearCacheMessage | ClearCacheMessageHandler | 清除缓存 |
Dpz.Core.WebApi | NewsArticleMessage | NewsArticleMessageHandler | 保存爬虫文章 |
BatchCompletionMessage | BatchCompletionMessageHandler | 批次完成 → 清缓存 | |
Dpz.Core.Web.Jobs | DeleteMarkdownMessage | DeleteMarkdownHandler | 删除 Markdown |
RemoveImagesMessage | RemoveImagesHandler | 删除图片 | |
AddCommentCountMessage | AddCommentCountHandler | 更新评论数 | |
AddDanmakuCountMessage | AddDanmakuCountHandler | 更新弹幕数 | |
AnalyzeCodeMessage | AnalyzeCodeHandler | 代码分析 | |
SendMasterEmailMessage | SendMasterEmailHandler | 发信给站长 | |
SendReplyEmailMessage | SendReplyEmailHandler | 发回复通知 | |
SendEmailVerifyCodeMessage | SendEmailVerifyCodeHandler | 发验证码 |
高级用法
自定义路由([MessageRoute])
[MessageRoute(
ExchangeName = "dpz.system.exchange",
QueueName = "dpz.system.batch.queue",
RoutingKey = "system.batch.completion"
)]
public class BatchCompletionMessage : MessageBase { ... }
支持设置 ExchangeType(默认 Topic)。
自定义路由约定
public class MyRoutingConvention : IMessageRoutingConvention
{
public string GetExchangeName<TMessage>() => $"my.{typeof(TMessage).Name.ToLower()}.exchange";
public string GetQueueName<TMessage>() => $"my.{typeof(TMessage).Name.ToLower()}.queue";
public string GetRoutingKey<TMessage>() => typeof(TMessage).Name.ToLower();
public ExchangeType GetExchangeType<TMessage>() => ExchangeType.Topic;
}
// 替换默认约定
services.AddCustomRoutingConvention<MyRoutingConvention>();
批量发布
await publisher.PublishBatchAsync(messages, routingKey: "batch.route");
带结果的处理器
// 定义结果类型
public class MyResult : MessageHandlerResult
{
public string Output { get; set; }
}
// 处理器
public class MyHandler : IMessageHandler<MyMessage, MyResult>
{
public async Task<MyResult> HandleAsync(MyMessage message, CancellationToken ct)
{
// ...
return MessageHandlerResult<string>.Ok("done");
}
}
// 注册时需指定 TResult
services.AddMessageConsumer<MyMessage, MyHandler, MyResult>();
Outbox 可靠投递
默认发布路径直接写入 RabbitMQ,在网络抖动或 Broker 不可用时消息会丢失。启用 Outbox 模式后,每条消息先写入 MongoDB,再发送到 RabbitMQ,发送失败由后台 BackgroundService 兜底重试。
启用 Outbox
// 在 AddRabbitMQ 之后调用
services.AddMessageOutbox();
// 在负责补发的宿主应用中启用后台重试
services.AddMessageOutboxRetryWorker();
原理
发布流程:
生产者 → OutboxMessagePublisher<T>
├─ 1. IMessageOutboxStore.CreateAsync() 写入 MongoDB (Pending)
├─ 2. inner.PublishAsync() 发送 RabbitMQ
│ ├─ 成功 → MarkSentAsync() 状态 → Sent
│ └─ 失败 → MarkPublishFailedAsync() 状态 → PublishFailed
└─ 3. 抛出异常给调用方
重试 (BackgroundService, 默认每分钟):
MessageOutboxRetryBackgroundService
→ GetPendingPublishRetryAsync(50)
→ RabbitMQRawPublisher.PublishRawAsync() (跳过 Outbox 装饰器)
→ MarkSentAsync()
→ FusionCache 记录本轮完成时间
消费侧:
消费者成功 → TryMarkConsumedAsync() (状态 → Consumed)
消费者超限失败 → TryMarkConsumeFailedAsync() (状态 → ConsumeFailed)
消费重试:
MessageOutboxRetryBackgroundService
→ GetPendingConsumeRetryAsync(50)
→ 重新发布到队列 → 消费者重新处理
→ FusionCache 记录本轮完成时间
实现参考:Dpz.Core.Service/MessageOutboxExtensions.cs、Dpz.Core.Service/RepositoryServiceImpl/MongoMessageOutboxStore.cs、Dpz.Core.MessageQueue/RabbitMQ/MessageOutboxRetryBackgroundService.cs。
Scoped 适配器
IMessageOutboxStore 注册为 Singleton,但 MongoMessageOutboxStore 是 Scoped(通过 RegisterInject 扫描)。ScopedOutboxStoreAdapter 桥接两者,每次操作创建独立 Scope,避免 captive dependency。
分布式批次追踪
适用于批量导入场景,每条消息携带相同的 BatchId,消费者处理完后通过 IBatchTracker.RecordMessageConsumedAsync 记录进度。所有消息处理完后发送 BatchCompletionMessage 触发后续动作。
实现
DistributedBatchTracker 基于:
IFusionCache(ZiggyCreatures.FusionCache)— 存储进度计数 + 已消费消息 ID 集合IDistributedLockProvider(DistributedLock)— 保护并发写入- 指数退避重试 — 获取锁失败时 50ms→800ms 重试,最多 5 次
使用场景 — 新闻爬虫
爬虫 (Web.Jobs)
├─ 生成 BatchId (guid)
├─ 发布 N 条 NewsArticleMessage (每条携带 BatchId)
└─ 发布 BatchCompletionMessage { TotalMessages = N, BatchId }
消费者 (WebApi)
├─ NewsArticleMessageHandler.RecordMessageConsumedAsync(batchId, msgId)
└─ BatchCompletionMessageHandler
├─ WaitForBatchCompletionAsync: 自适应轮询 (前30s 500ms → 之后 3s)
├─ 超时 120s + 进度停滞检测 (5次无变化则警告)
└─ 完成后发布 ClearCacheMessage + CleanupBatchAsync
消息处理机制
重试策略
不同于传统 BasicNack(requeue: true)(进程内重试次数会随进程重启丢失),本项目采用 Republish 模式:
Handler 返回 false
→ message.RetryCount++
→ channel.BasicPublish(更新后的消息体)
→ channel.BasicAck(原消息)
→ 新消息被投递(可能到其他实例)
超限 (RetryCount > 3):
→ channel.BasicNack(requeue: false) // 丢弃
→ TryMarkConsumeFailedAsync() // 记录到 Outbox
关键代码见 RabbitMQConsumerBackgroundService.cs:44-88。
断线重连
- 自动恢复:
RabbitMQConnectionFactory启用AutomaticRecoveryEnabled+NetworkRecoveryInterval - 消费者强制重连:监听
ChannelShutdownAsync→TaskCompletionSource触发外层循环 → 指数退避至 30s - 发布者:异常发生时清理 Channel(
CleanupChannelAsync),下次调用自动重建
QoS
prefetchCount: 1— 每个消费者同时处理 1 条消息,快的实例自动多处理global: false— 仅限当前 Channel
配置选项
RabbitMQOptions 完整字段(对应 JSON 节 "RabbitMQ"):
{
"RabbitMQ": {
"HostName": "localhost",
"Port": 5672,
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/",
"RequestedConnectionTimeout": 30,
"RequestedHeartbeat": 60,
"AutomaticRecoveryEnabled": true,
"NetworkRecoveryInterval": 5
}
}
日志级别建议:
{
"Logging": {
"LogLevel": {
"Dpz.Core.MessageQueue": "Information"
}
}
}
与通用 MQ 库的差异
| 维度 | 本项目 | 通用 MQ 库 (如 MassTransit, EasyNetQ) |
|---|---|---|
| 消息定义 | 继承 MessageBase,POCO 类 | 通常需实现特定接口 / 继承基类 |
| 路由策略 | 约定 + [MessageRoute] 特性 | 消息类型 → 自动映射,显式配置端点 |
| 消费者注册 | AddMessageConsumer<TMsg, THandler> | 依赖容器扫描 / 总线配置 |
| Outbox | 自研 MongoDB Outbox + 装饰器模式 | 内置 EF Core / NHibernate Outbox |
| 错误重试 | Republish 模式(重发新消息体) | 通常重试过滤器 / 错误队列 + 重试策略 |
| 批量追踪 | 自研分布式缓存 + 锁 | 无标准支持 |
| 上下文 | .NET 10 + RabbitMQ.Client v7 | 支持多 transports (RMQ, SQS, Azure SB) |
| 调度 | 无内置调度,借助 Hangfire | 内置 Quartz / 延迟调度 |
优势与劣势
优势
- 轻量无过度抽象 — 直接使用
RabbitMQ.Client,无中间层总线概念,行为可预测 - Outbox 开箱可配 — 本质是装饰器模式,启用/关闭无需改业务代码
- 批次进度追踪 — 适应本项目批量爬虫场景,通用 MQ 库无此能力
- Republish 重试 — 相比 requeue,进程重启后
RetryCount仍保留在消息体中 - 类型安全 + DI — 强类型处理器,自动 lifetime 管理
劣势
- 单一 Transport — 仅支持 RabbitMQ,无法切换(但本项目不需要多 Transport)
- 无死信队列内置 — 超出重试次数后消息直接丢弃,未内置 DLX/DLQ(可手动配置)
- 无延时消息 — RabbitMQ 原生不直接支持,可通过 TTL+DLX 模拟(本项目未实现)
- 无 Saga / 编排 — 不提供分布式事务协调
- 无内置调度 — 定时/延迟发布需借助 Hangfire
架构图
基础架构
┌─────────────────────────┐ ┌─────────────────────────┐
│ 生产者 (任意应用) │ │ 消费者 (Web/WebApi/ │
│ IMessagePublisher<T> │ │ Web.Jobs) │
│ RabbitMQPublisher<T> │ │ RabbitMQConsumer │
│ (可选 Outbox decorator)│ │ BackgroundService │
└───────────┬─────────────┘ └──────────────┬──────────┘
│ ▲
│ BasicPublish │ BasicConsume
▼ │ autoAck=false
┌────────────────┐ │
│ RabbitMQ │──────────────────────────┘
│ Exchange/Queue │ 推送消息
└────────────────┘
消息生命周期(含 Outbox)
sequenceDiagram
participant App as 业务代码
participant Pub as IMessagePublisher
participant Outbox as OutboxMessagePublisher
participant Store as Outbox Store (Mongo)
participant RMQ as RabbitMQ
participant Worker as Outbox Retry Worker
participant Cons as 消费者
participant Handler as IMessageHandler
alt 未启用 Outbox
App->>Pub: PublishAsync(msg)
Pub->>RMQ: BasicPublish
else 启用 Outbox
App->>Pub: PublishAsync(msg)
Pub->>Outbox: 委托
Outbox->>Store: CreateAsync (Pending)
Outbox->>RMQ: BasicPublish
alt 成功
Outbox->>Store: MarkSentAsync (Sent)
Outbox-->>App: OK
else 失败
Outbox->>Store: MarkPublishFailedAsync
Outbox-->>App: throw
Note over Worker: 默认每分钟触发
Worker->>Store: GetPendingPublishRetryAsync
Worker->>RMQ: PublishRawAsync (跳过Outbox)
Worker->>Store: MarkSentAsync
end
end
RMQ->>Cons: 推送消息
Cons->>Handler: HandleAsync
alt 成功
Cons->>RMQ: BasicAck
Cons->>Store: MarkConsumedAsync (可选)
else 失败 (RetryCount < 3)
Cons->>RMQ: BasicPublish (RetryCount+1)
Cons->>RMQ: BasicAck (原消息)
else 失败 (RetryCount >= 3)
Cons->>RMQ: BasicNack (丢弃)
Cons->>Store: MarkConsumeFailedAsync
Note over Worker: 消费重试
Worker->>Store: GetPendingConsumeRetryAsync
Worker->>RMQ: PublishRawAsync (重新入队)
end
全量消息流(新闻爬虫场景)
sequenceDiagram
participant HF as Hangfire
participant Crawler as ItHomeActivator
participant Pub as RabbitMQPublisher
participant RMQ as RabbitMQ
participant Cons as Consumer (WebApi)
participant Handler as NewsArticleMessageHandler
participant Tracker as IBatchTracker
participant Srv as ArticleService
participant DB as MongoDB
HF->>Crawler: 每3小时触发
Crawler->>Crawler: 抓取RSS、解析、过滤
loop 每篇文章
Crawler->>Crawler: 下载HTML、转Markdown、上传图片
Crawler->>Pub: PublishAsync(msg with BatchId)
Pub->>RMQ: BasicPublish
end
Crawler->>Pub: PublishAsync(BatchCompletionMessage)
RMQ->>Cons: 推送 NewsArticleMessage
Cons->>Handler: HandleAsync
Handler->>Srv: NoExistsByFromAsync
alt 已存在
Handler->>Tracker: RecordMessageConsumedAsync
Handler-->>Cons: true
else 新文章
Handler->>Srv: CreateArticleAsync
Srv->>DB: Insert
Handler->>Tracker: RecordMessageConsumedAsync
Handler-->>Cons: true
end
Cons->>RMQ: BasicAck
Note over Tracker: 所有消息消费完毕
RMQ->>Cons: 推送 BatchCompletionMessage
Cons->>Handler: HandleAsync
Handler->>Tracker: WaitForBatchCompletionAsync
Handler->>Pub: PublishAsync(ClearCacheMessage)
Handler->>Tracker: CleanupBatchAsync
多实例竞争消费
graph TB
subgraph "RabbitMQ Queue"
Q[dpz.news.article.queue]
end
subgraph "消费者集群 (Docker/K8s)"
C1[WebApi Pod 1<br/>prefetchCount=1]
C2[WebApi Pod 2<br/>prefetchCount=1]
C3[WebApi Pod 3<br/>prefetchCount=1]
end
subgraph "MongoDB 副本集"
DB[(MongoDB)]
end
Q -->|轮询分发| C1
Q -->|轮询分发| C2
Q -->|轮询分发| C3
C1 --> DB
C2 --> DB
C3 --> DB
常见问题
Q1: 如何确保消息不丢失? A: 三重保证:持久化消息(Persistent=true) + 持久化 Exchange/Queue(durable: true) + 手动 ACK(autoAck: false)。可选启用 Outbox 模式增加 MongoDB 兜底。
Q2: 消息处理失败如何重试? A: Handler 返回 false 时,消费者将消息体序列化后重新发布(RepublishForRetryAsync),RetryCount 递增。原消息执行 BasicAck 确认。超过 3 次后 BasicNack(requeue: false) 丢弃。
Q3: 多消费者实例如何分配消息? A: RabbitMQ 轮询分发(Round-robin),配合 prefetchCount=1 确保一次只处理 1 条,快的实例自动处理更多。
Q4: 如何实现 Outbox? A: 调用 services.AddMessageOutbox() 启用发布路径,调用 services.AddMessageOutboxRetryWorker() 在负责补发的宿主应用中启用后台重试。需要 MongoDB 中实现了 IMongoMessageOutboxStore(Scoped,通过 RegisterInject 扫描注册)。后台服务默认每分钟执行一次,并通过 FusionCache 记录上次补发完成时间。
Q5: 消息到达最大重试次数后会怎样? A: 调用 BasicNack(requeue: false),消息被 Broker 丢弃。不会自动进入死信队列。如需死信,需在 RabbitMQ 侧为队列配置 x-dead-letter-exchange。
Q6: IBatchTracker 需要什么基础设施? A: 需要 FusionCache(IFusionCache) + IDistributedLockProvider。生产环境推荐基于 Redis 的分布式缓存和锁。
Q7: 为什么重试用 Republish 而不是 requeue? A: BasicNack(requeue: true) 会重置消息到队列头部,但 RetryCount 存储在内存中,进程重启后归零。Republish 将 RetryCount 持久化在消息体内,跨进程/跨重启仍能正确判断。
Q8: 消息消费后 Outbox 状态何时更新? A: 消费者在 BasicAck 后调用 TryMarkConsumedAsync(忽略异常,不阻塞流程)。启用 Outbox 后此步骤可选生效。
Q9: AddRabbitMQ 已在公共基础层注册,各应用层只需 AddMessageConsumer 吗? A: 是的。Dpz.Core.Service/ServiceExtensions.cs 已调用 AddRabbitMQ,各项目只需在 DI 配置中调用 AddMessageConsumer 注册具体消费者。
Q10: 生产者和消费者是否必须在同一应用中? A: 不必。生产者可在 Web.Jobs(爬虫),消费者在 WebApi。只需连接到同一 RabbitMQ 实例。
Q11: Redis / 分布式锁不可用时会怎样? A: DistributedBatchTracker 会抛出异常。IBatchTracker 仅在调用 AddBatchTracking() 的应用中启用(目前只有 WebApi)。
跨语言通信
底层传输基于 RabbitMQ (AMQP 0-9-1),序列化使用 JSON,这两者都是语言无关的。任何语言的 RabbitMQ 客户端只要遵循下列规范,即可与本组件互通。
消息信封 (Message Envelope)
每条消息的 JSON 结构包含两层:信封字段(MessageBase)+ 业务字段(消息自身属性)。
信封字段(必填)
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
messageId | string | 是 | 唯一标识,推荐 GUID 字符串 |
timestamp | string (ISO 8601 UTC) | 是 | 消息创建时间,如 "2025-01-01T00:00:00Z" |
source | string | 否 | 来源标识,默认 "" |
batchId | string | null | 否 | 批次追踪 ID,单条消息传 null |
retryCount | number | 是 | 重试次数,首次发送填 0 |
JSON 示例(NewsArticleMessage)
{
"messageId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"timestamp": "2025-03-15T08:30:00Z",
"source": "CrawlerService",
"batchId": "batch-001",
"retryCount": 0,
"title": "新闻标题",
"markdown": "Markdown 正文",
"from": "https://example.com/news/123",
"publishTime": "2025-03-15T08:00:00Z",
"author": { "id": "u1", "name": "作者" },
"tags": ["tech", "ai"],
"introduction": "摘要",
"adWeight": 0.5
}
AMQP 消息属性
发布消息时必须设置以下 BasicProperties:
| 属性 | 值 | 说明 |
|---|---|---|
ContentType | application/json | 消息体格式 |
DeliveryMode | 2 (Persistent) | 持久化到磁盘 |
MessageId | 同 messageId 字段 | AMQP 消息 ID |
Timestamp | Unix 秒级时间戳 | AMQP 时间戳 |
Headers | (可选) | 可携带额外元数据 |
路由命名规则
路由由消息类型名(去掉 Message 后缀的 PascalCase 类名)自动推导,规则如下:
- 去掉
Message后缀(如NewsArticleMessage→NewsArticle) - 按大写字母拆词(
NewsArticle→["News", "Article"]) - 第一个词为模块名(
moduleName = "news") - 剩余词用
.拼接为类型名(typeSuffix = "article") - 全部转为小写
最终路由:
Exchange: dpz.{moduleName}.exchange
Queue: dpz.{moduleName}.{typeSuffix}.queue
RoutingKey: {moduleName}.{typeSuffix}.#
ExchangeType: topic
对照表
| 消息类型名 | Exchange | Queue | RoutingKey |
|---|---|---|---|
NewsArticleMessage | dpz.news.exchange | dpz.news.article.queue | news.article.# |
ClearCacheMessage | dpz.clear.exchange | dpz.clear.cache.queue | clear.cache.# |
AddCommentCountMessage | dpz.add.exchange | dpz.add.comment.count.queue | add.comment.count.# |
DeleteMarkdownMessage | dpz.delete.exchange | dpz.delete.markdown.queue | delete.markdown.# |
SendReplyEmailMessage | dpz.send.exchange | dpz.send.reply.email.queue | send.reply.email.# |
BatchCompletionMessage | dpz.batch.exchange | dpz.batch.completion.queue | batch.completion.# |
注意:如果某个消息在 .NET 侧使用了
[MessageRoute]特性自定义路由,则以其配置为准,而非上述推导规则。跨语言服务可直接使用该显式声明的 exchange/queue/routingKey。
伪代码实现
function getRoute(className):
name = className.replace(/Message$/i, "") // 去掉 Message 后缀
words = splitByUpperCase(name) // 按大写字母拆词
module = words[0].toLower() // 第一个词 → 模块
type = words[1..].join(".").toLower() // 其余词 → 类型
exchange = "dpz." + module + ".exchange"
queue = "dpz." + module + "." + type + ".queue"
routingKey = module + "." + type + ".#"
return { exchange, queue, routingKey }
消费端约定
队列声明
- Exchange:
topic(默认,或对应[MessageRoute]声明的类型) - Queue: durable,绑定到 Exchange,routingKey 匹配
{module}.{type}.# autoAck = false,prefetchCount = 1
处理成功
- 调用
BasicAck(deliveryTag, false)
处理失败(重试)
- 读取消息体中的
retryCount字段 - 若
retryCount ≤ 3:将retryCount + 1,更新消息体,重新发布到同一 exchange/routingKey,然后BasicAck原消息(不是 requeue,而是发一条新消息) - 若
retryCount > 3:BasicNack(deliveryTag, false, requeue: false)丢弃消息
关键点:重试采用的是 Republish 模式(重新发布一条
retryCount+1的新消息)而非BasicNack(requeue: true)。这确保了retryCount持久化在消息体中,即使消费者重启也能正确计数。
反序列化失败
- 直接
BasicNack(deliveryTag, false, requeue: false)丢弃
批次追踪(可选)
- 如果消息包含
batchId,消费成功后可向批次追踪存储记录进度。其他语言实现时,可忽略此特性或自行实现。
各语言接入示例
Java (amqp-client + Jackson)
// 构造消息
Map<String, Object> message = new LinkedHashMap<>();
message.put("messageId", UUID.randomUUID().toString());
message.put("timestamp", Instant.now().toString());
message.put("source", "JavaService");
message.put("batchId", null);
message.put("retryCount", 0);
message.put("title", "Hello from Java");
message.put("markdown", "...");
String json = new ObjectMapper().writeValueAsString(message);
// 发布
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.messageId((String) message.get("messageId"))
.timestamp(new Date())
.build();
channel.basicPublish("dpz.news.exchange", "news.article.#", props, json.getBytes(StandardCharsets.UTF_8));
// 消费
channel.basicConsume("dpz.news.article.queue", false, (consumerTag, delivery) -> {
String json = new String(delivery.getBody(), StandardCharsets.UTF_8);
Map<String, Object> msg = new ObjectMapper().readValue(json, Map.class);
try {
// 业务处理...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
int retryCount = (int) msg.getOrDefault("retryCount", 0);
if (retryCount < 3) {
msg.put("retryCount", retryCount + 1);
String retryJson = new ObjectMapper().writeValueAsString(msg);
channel.basicPublish("dpz.news.exchange", "news.article.#",
delivery.getProperties(), retryJson.getBytes(StandardCharsets.UTF_8));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
}
}, consumerTag -> {});
Python (pika)
import json, uuid
from datetime import datetime, timezone
import pika
message = {
"messageId": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"source": "PythonService",
"batchId": None,
"retryCount": 0,
"title": "Hello from Python",
"markdown": "..."
}
body = json.dumps(message).encode("utf-8")
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_publish(
exchange="dpz.news.exchange",
routing_key="news.article.#",
body=body,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2,
message_id=message["messageId"],
timestamp=int(datetime.now(timezone.utc).timestamp())
)
)
connection.close()
# 消费
def on_message(ch, method, properties, body):
msg = json.loads(body)
try:
# 业务处理...
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
retry = msg.get("retryCount", 0)
if retry < 3:
msg["retryCount"] = retry + 1
ch.basic_publish(
exchange=method.exchange,
routing_key=method.routing_key,
body=json.dumps(msg).encode("utf-8"),
properties=properties
)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_consume(queue="dpz.news.article.queue", on_message_callback=on_message, auto_ack=False)
channel.start_consuming()
Go (amqp091-go)
import (
"encoding/json"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/google/uuid"
)
type Envelope struct {
MessageID string `json:"messageId"`
Timestamp string `json:"timestamp"`
Source string `json:"source"`
BatchID *string `json:"batchId"`
RetryCount int `json:"retryCount"`
Title string `json:"title"`
Markdown string `json:"markdown"`
}
msg := Envelope{
MessageID: uuid.New().String(),
Timestamp: time.Now().UTC().Format(time.RFC3339),
Source: "GoService",
RetryCount: 0,
Title: "Hello from Go",
Markdown: "...",
}
body, _ := json.Marshal(msg)
ch.PublishWithContext(ctx,
"dpz.news.exchange", // exchange
"news.article.#", // routing key
false, false,
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
MessageId: msg.MessageID,
Timestamp: time.Now(),
Body: body,
},
)
// 消费
msgs, _ := ch.Consume("dpz.news.article.queue", "", false, false, false, false, nil)
for d := range msgs {
var msg Envelope
json.Unmarshal(d.Body, &msg)
err := process(msg)
if err == nil {
d.Ack(false)
} else if msg.RetryCount < 3 {
msg.RetryCount++
retryBody, _ := json.Marshal(msg)
ch.PublishWithContext(ctx, d.Exchange, d.RoutingKey, false, false,
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
MessageId: msg.MessageID,
Body: retryBody,
})
d.Ack(false)
} else {
d.Nack(false, false)
}
}
Node.js (amqplib)
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
const message = {
messageId: uuidv4(),
timestamp: new Date().toISOString(),
source: 'NodeService',
batchId: null,
retryCount: 0,
title: 'Hello from Node.js',
markdown: '...'
};
const body = Buffer.from(JSON.stringify(message));
ch.publish('dpz.news.exchange', 'news.article.#', body, {
contentType: 'application/json',
persistent: true,
messageId: message.messageId,
timestamp: Math.floor(Date.now() / 1000)
});
// 消费
ch.consume('dpz.news.article.queue', (msg) => {
if (!msg) return;
const data = JSON.parse(msg.content.toString());
try {
// 业务处理...
ch.ack(msg);
} catch (err) {
if (data.retryCount < 3) {
data.retryCount++;
ch.publish(msg.fields.exchange, msg.fields.routingKey,
Buffer.from(JSON.stringify(data)), msg.properties);
ch.ack(msg);
} else {
ch.nack(msg, false, false);
}
}
}, { noAck: false });
PHP (php-amqplib)
<?php
require __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 构造消息
$message = [
'messageId' => bin2hex(random_bytes(16)), // 或使用 ramsey/uuid
'timestamp' => gmdate('Y-m-d\TH:i:s\Z'),
'source' => 'PhpService',
'batchId' => null,
'retryCount' => 0,
'title' => 'Hello from PHP',
'markdown' => '...',
];
$body = json_encode($message, JSON_UNESCAPED_UNICODE);
// 发布
$props = [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $message['messageId'],
'timestamp' => time(),
];
$msg = new AMQPMessage($body, $props);
$channel->basic_publish($msg, 'dpz.news.exchange', 'news.article.#');
$channel->close();
$connection->close();
<?php
// 消费
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
$callback = function (AMQPMessage $msg) use ($channel) {
$data = json_decode($msg->getBody(), true);
if ($data === null) {
$msg->nack(false); // JSON 解析失败,丢弃
return;
}
try {
// 业务处理...
$msg->ack();
} catch (\Throwable $e) {
$retryCount = $data['retryCount'] ?? 0;
if ($retryCount < 3) {
$data['retryCount'] = $retryCount + 1;
$retryBody = json_encode($data, JSON_UNESCAPED_UNICODE);
$retryMsg = new AMQPMessage($retryBody, [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $data['messageId'],
]);
$channel->basic_publish($retryMsg, $msg->getExchange(), $msg->getRoutingKey());
$msg->ack();
} else {
$msg->nack(false); // requeue = false,丢弃
}
}
};
$channel->basic_consume(
'dpz.news.article.queue', // queue
'', // consumer tag
false, // no_local
false, // no_ack
false, // exclusive
false, // nowait
$callback
);
while ($channel->is_consuming()) {
$channel->wait();
}
许可证
MIT License