Dpz.Core.MessageQueue
基于 RabbitMQ 的消息队列基础设施库,提供优雅的消息发布和消费功能。
✨ 核心特性
- ✅ 约定优于配置:自动根据消息类型生成 Exchange、Queue、RoutingKey
- ✅ 类型安全:强类型消息定义和处理
- ✅ 易于使用:简洁的 API 设计,几行代码即可完成配置
- ✅ 可扩展:支持自定义路由约定
- ✅ 可靠性:消息持久化、自动重试、错误处理
- ✅ 依赖注入:完全集成.NET依赖注入系统
📦 安装
dotnet add package RabbitMQ.Client --version 6.8.1
dotnet add reference Dpz.Core.MessageQueue.csproj
🚀 快速开始
1. 定义消息
using Dpz.Core.MessageQueue.Messages;
// 最简单的方式:遵循命名约定
public class NewsArticleMessage : MessageBase
{
public string Title { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public List<string> Tags { get; set; } = new();
}
自动生成的路由信息:
- Exchange:
dpz.news.exchange - Queue:
dpz.news.article.queue - RoutingKey:
news.article.#
2. 配置 appsettings.json
{
"RabbitMQ": {
"HostName": "localhost",
"Port": 5672,
"UserName": "admin",
"Password": "your_password",
"VirtualHost": "/",
"AutomaticRecoveryEnabled": true
}
}
3. 注册生产者服务
// Program.cs
builder.Services.AddRabbitMQ(builder.Configuration);
4. 发布消息
public class NewsService
{
private readonly IMessagePublisher<NewsArticleMessage> _publisher;
public NewsService(IMessagePublisher<NewsArticleMessage> publisher)
{
_publisher = publisher;
}
public async Task PublishNewsAsync()
{
var message = new NewsArticleMessage
{
Title = "新闻标题",
Content = "新闻内容",
Tags = ["技术", "RabbitMQ"],
Source = "NewsService"
};
await _publisher.PublishAsync(message);
}
}
5. 创建消息处理器
using Dpz.Core.MessageQueue.Abstractions;
public class NewsArticleHandler : IMessageHandler<NewsArticleMessage>
{
private readonly IArticleService _articleService;
private readonly ILogger<NewsArticleHandler> _logger;
public NewsArticleHandler(
IArticleService articleService,
ILogger<NewsArticleHandler> logger)
{
_articleService = articleService;
_logger = logger;
}
public async Task<bool> HandleAsync(
NewsArticleMessage message,
CancellationToken cancellationToken = default)
{
try
{
_logger.LogInformation("处理新闻: {Title}", message.Title);
// 保存到数据库
await _articleService.CreateAsync(new CreateArticleRequest
{
Title = message.Title,
Content = message.Content,
Tags = message.Tags
});
return true; // 处理成功
}
catch (Exception ex)
{
_logger.LogError(ex, "处理新闻失败: {Title}", message.Title);
return false; // 处理失败,将触发重试
}
}
}
6. 注册消费者
// Program.cs
builder.Services.AddRabbitMQ(builder.Configuration);
builder.Services.AddMessageConsumer<NewsArticleMessage, NewsArticleHandler>();
// 应用启动后,会自动开始消费消息
🎨 高级用法
自定义路由配置
如果默认约定不满足需求,可以使用特性自定义:
using Dpz.Core.MessageQueue.Attributes;
[MessageRoute(
ExchangeName = "custom.exchange",
RoutingKey = "custom.routing.key",
QueueName = "custom.queue"
)]
public class CustomMessage : MessageBase
{
public string Data { get; set; } = string.Empty;
}
批量发布消息
var messages = new List<NewsArticleMessage>
{
new() { Title = "新闻1", Content = "内容1" },
new() { Title = "新闻2", Content = "内容2" },
new() { Title = "新闻3", Content = "内容3" }
};
await _publisher.PublishBatchAsync(messages);
指定自定义路由键
// 使用自定义路由键(覆盖约定)
await _publisher.PublishAsync(message, routingKey: "news.article.ithome");
自定义路由约定
public class MyCustomRoutingConvention : IMessageRoutingConvention
{
public string GetExchangeName<TMessage>()
{
return $"my.{typeof(TMessage).Name.ToLower()}.exchange";
}
public string GetQueueName<TMessage>()
{
return $"my.{typeof(TMessage).Name.ToLower()}.queue";
}
public string GetRoutingKey<TMessage>()
{
return typeof(TMessage).Name.ToLower();
}
public string GetExchangeType<TMessage>()
{
return "topic";
}
}
// 注册自定义约定
builder.Services.AddRabbitMQ(builder.Configuration);
builder.Services.AddCustomRoutingConvention<MyCustomRoutingConvention>();
📊 命名约定规则
默认约定(DefaultMessageRoutingConvention)
| 消息类型 | Exchange | Queue | RoutingKey |
|---|---|---|---|
| NewsArticleMessage | dpz.news.exchange | dpz.news.article.queue | news.article.# |
| NewsVideoMessage | dpz.news.exchange | dpz.news.video.queue | news.video.# |
| CacheClearMessage | dpz.cache.exchange | dpz.cache.clear.queue | cache.clear.# |
| UserRegisterMessage | dpz.user.exchange | dpz.user.register.queue | user.register.# |
命名规则:
- 从消息类名中提取模块名(第一个词)和类型名(剩余词)
- Exchange:
dpz.{模块}.exchange - Queue:
dpz.{模块}.{类型}.queue - RoutingKey:
{模块}.{类型}.#(Topic模式,支持通配符)
🔧 配置选项
{
"RabbitMQ": {
"HostName": "localhost", // RabbitMQ服务器地址
"Port": 5672, // 端口
"UserName": "guest", // 用户名
"Password": "guest", // 密码
"VirtualHost": "/", // 虚拟主机
"RequestedConnectionTimeout": 30, // 连接超时(秒)
"RequestedHeartbeat": 60, // 心跳超时(秒)
"AutomaticRecoveryEnabled": true, // 自动重连
"NetworkRecoveryInterval": 5 // 网络恢复间隔(秒)
}
}
⚙️ 消息处理机制
重试机制
- 消息处理失败时自动重试
- 最多重试 3 次(通过
message.RetryCount控制) - 超过重试次数后,消息将被丢弃(或发送到死信队列)
QoS设置
- 预取数量(prefetchCount)设为 1,确保公平调度
- 消息必须手动确认(ACK),保证消息不丢失
消息持久化
- Exchange、Queue、Message 均设置为持久化
- 服务重启后消息不丢失
� 多实例消费机制
竞争消费者模式(Competing Consumers Pattern)
当有多个消费者实例连接到同一个队列时,RabbitMQ会自动实现负载均衡:
RabbitMQ Queue (100条消息)
│
┌───────────────┼───────────────┐
│ │ │
↓ ↓ ↓
消费者实例1 消费者实例2 消费者实例3
处理: M1,M4,M7 处理: M2,M5,M8 处理: M3,M6,M9
(33条) (33条) (34条)
│ │ │
└───────────────┴───────────────┘
↓
MongoDB
关键特性
✅ 1. 轮询分发(Round-robin)
- RabbitMQ采用轮询算法分发消息
- 消息1 → 实例1,消息2 → 实例2,消息3 → 实例3,消息4 → 实例1...
- 确保每个实例获得大致相同数量的消息
✅ 2. 每条消息只被消费一次
// RabbitMQ内部保证
Queue: [消息A, 消息B, 消息C]
↓ ↓ ↓
实例1 实例2 实例3
// 消息A不会同时发送给实例1和实例2
// 除非实例1处理失败并重新入队
✅ 3. 并发处理
// 时间轴示例:
T0: Queue有10条消息
T1: 实例1拉取M1, 实例2拉取M2, 实例3拉取M3 (同时进行)
T2: 实例2最快完成M2 → ACK → 立即拉取M4
T3: 实例1完成M1 → ACK → 拉取M5
实例3完成M3 → ACK → 拉取M6
T4: 实例2完成M4 → ACK → 拉取M7
...依此类推
// 结果:吞吐量提升N倍(N=实例数)
✅ 4. 自动负载均衡
// 代码中的QoS配置
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// prefetchCount: 1 的作用:
// - 每个消费者一次只拉取1条消息
// - 处理完(ACK)后才拉取下一条
// - 快的实例自动处理更多消息
// - 慢的实例自动处理更少消息
示例:
实例1处理速度: 100ms/条 → 1秒处理10条
实例2处理速度: 200ms/条 → 1秒处理5条
实例3处理速度: 50ms/条 → 1秒处理20条
在10秒内:
- 实例1: 100条
- 实例2: 50条
- 实例3: 200条
总计: 350条 (单实例只能处理100条)
实际场景演示
场景1: 3个实例,队列中有10条消息
初始状态:
Queue: [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10]
T1: 每个实例拉取1条
实例1: 获得 M1 ← 开始处理
实例2: 获得 M2 ← 开始处理
实例3: 获得 M3 ← 开始处理
Queue: [M4, M5, M6, M7, M8, M9, M10]
T2: 实例2处理最快
实例2: M2处理完成 → BasicAck → 获得 M4 ← 开始处理
Queue: [M5, M6, M7, M8, M9, M10]
T3: 实例1和实例3也完成
实例1: M1处理完成 → BasicAck → 获得 M5
实例3: M3处理完成 → BasicAck → 获得 M6
Queue: [M7, M8, M9, M10]
T4: 继续轮询
实例2: M4处理完成 → 获得 M7
实例1: M5处理完成 → 获得 M8
实例3: M6处理完成 → 获得 M9
Queue: [M10]
T5: 最快的实例获得最后一条
实例2: M7处理完成 → 获得 M10
实例1: M8处理完成
实例3: M9处理完成
Queue: []
T6: 全部完成
实例1: 处理了 M1, M5, M8 (3条)
实例2: 处理了 M2, M4, M7, M10 (4条,因为最快)
实例3: 处理了 M3, M6, M9 (3条)
场景2: 处理失败与重试
T1: 实例1获得消息M1
实例1: 开始处理M1
T2: 处理过程中抛出异常
实例1: catch (Exception ex)
{
return false; // 处理失败
}
T3: 消费者代码自动处理
Consumer: if (!success)
{
message.RetryCount++; // 重试次数 +1
if (message.RetryCount < 3)
{
// 重新入队
_channel.BasicNack(deliveryTag, false, requeue: true);
}
}
T4: 消息M1重新回到队列
Queue: [M1, M4, M5, M6...]
T5: 可能由其他实例处理
实例2: 获得 M1 ← 再次尝试处理
实例3: 获得 M4
// 如果M1连续失败3次
实例2: M1失败 (RetryCount = 3)
Consumer: _channel.BasicNack(deliveryTag, false, requeue: false);
// 消息被丢弃或进入死信队列
幂等性的重要性
// 为什么需要幂等性检查?
情况1: 网络波动导致超时
实例1: 处理消息M1 → 保存到数据库成功
但在发送ACK之前,网络中断
RabbitMQ未收到ACK,消息仍在队列中
实例2: 队列重新分发M1 → 再次尝试保存
如果没有幂等性检查,会重复保存!
✅ 正确的处理方式:
public async Task<bool> HandleAsync(NewsArticleMessage message)
{
// 幂等性检查
var noExists = await _articleService.NoExistsByFromAsync([message.From]);
if (noExists.Count == 0)
{
_logger.LogInformation("新闻已存在,跳过: {From}", message.From);
return true; // 直接返回成功,避免重复保存
}
// 继续处理...
}
多实例部署配置
Docker Compose 多实例配置
version: '3.8'
services:
# RabbitMQ服务
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
networks:
- app-network
# 消费者实例1
consumer-1:
image: dpz.webapi:latest
environment:
RabbitMQ__HostName: rabbitmq
RabbitMQ__Port: 5672
MessageQueue__EnableConsumer: true
depends_on:
- rabbitmq
networks:
- app-network
# 消费者实例2
consumer-2:
image: dpz.webapi:latest
environment:
RabbitMQ__HostName: rabbitmq
RabbitMQ__Port: 5672
MessageQueue__EnableConsumer: true
depends_on:
- rabbitmq
networks:
- app-network
# 消费者实例3
consumer-3:
image: dpz.webapi:latest
environment:
RabbitMQ__HostName: rabbitmq
RabbitMQ__Port: 5672
MessageQueue__EnableConsumer: true
depends_on:
- rabbitmq
networks:
- app-network
networks:
app-network:
driver: bridge
Kubernetes 水平扩展配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: message-consumer
spec:
replicas: 3 # 3个实例
selector:
matchLabels:
app: message-consumer
template:
metadata:
labels:
app: message-consumer
spec:
containers:
- name: consumer
image: dpz.webapi:latest
env:
- name: RabbitMQ__HostName
value: "rabbitmq-service"
- name: MessageQueue__EnableConsumer
value: "true"
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: message-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: message-consumer
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
性能对比
| 场景 | 单实例 | 3实例 | 5实例 | 10实例 |
|---|---|---|---|---|
| 处理速度 | 100条/分钟 | 300条/分钟 | 500条/分钟 | 1000条/分钟 |
| 平均延迟 | 600ms | 200ms | 120ms | 60ms |
| 容错能力 | ❌ 单点故障 | ✅ 2个实例可失败 | ✅ 4个实例可失败 | ✅ 9个实例可失败 |
| 资源消耗 | 低 | 中 | 中高 | 高 |
监控多实例消费
// 在RabbitMQ管理界面查看
// http://your-rabbitmq:15672
Queues → dpz.news.article.queue:
├─ Consumers: 3 // 当前连接的消费者数
├─ Messages ready: 50 // 待消费消息数
├─ Messages unacked: 3 // 正在处理中的消息数(每个实例1条)
├─ Publish rate: 10 msg/s // 生产速率
└─ Consumer rate: 30 msg/s // 消费速率(3个实例 × 10 msg/s)
// 如果发现队列堆积(ready消息数持续增长)
// → 增加消费者实例数
// → 检查消费者处理性能
注意事项
⚠️ 1. 避免无限扩展
消费者数量 ≈ (生产速率 / 消费速率) × 1.5
例如:生产速率 100条/分钟,单实例消费速率 40条/分钟
建议实例数 = (100 / 40) × 1.5 ≈ 4个实例
⚠️ 2. 数据库连接池
// 每个实例都会连接数据库,注意连接池配置
services.Configure<MongoSettings>(options =>
{
options.MaxConnectionPoolSize = 100; // 根据实例数调整
});
// 假设5个实例,每个实例100个连接
// MongoDB总连接数 = 5 × 100 = 500
⚠️ 3. 消息顺序
多实例消费无法保证严格顺序!
时间线:
T1: M1 → 实例1, M2 → 实例2
T2: 实例2处理完M2 → 先保存到数据库
T3: 实例1处理完M1 → 后保存到数据库
结果: M2先于M1保存(顺序颠倒)
解决方案:
- 如需严格顺序,使用单一消费者
- 或使用消息分组(相同分组key的消息发送到同一实例)
�📝 最佳实践
1. 消息设计
// ✅ 好的设计:包含所有需要的信息
public class NewsArticleMessage : MessageBase
{
public string Title { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public string From { get; set; } = string.Empty;
// ... 完整的数据
}
// ❌ 不好的设计:只包含ID
public class NewsArticleMessage : MessageBase
{
public string ArticleId { get; set; } = string.Empty;
// 消费者还需要调用API获取完整数据,增加耦合
}
2. 幂等性处理
public async Task<bool> HandleAsync(NewsArticleMessage message, CancellationToken ct)
{
// 检查是否已处理过
var exists = await _articleService.ExistsByFromAsync(message.From);
if (exists)
{
_logger.LogInformation("消息已处理过,跳过: {From}", message.From);
return true; // 返回成功,避免重复处理
}
// 处理消息...
}
3. 错误处理
public async Task<bool> HandleAsync(NewsArticleMessage message, CancellationToken ct)
{
try
{
// 业务逻辑
await _articleService.CreateAsync(...);
return true;
}
catch (ValidationException ex)
{
// 业务验证错误,不应重试
_logger.LogWarning(ex, "数据验证失败,忽略消息");
return true; // 返回true避免重试
}
catch (Exception ex)
{
// 系统错误或暂时性故障,应该重试
_logger.LogError(ex, "处理失败,将重试");
return false;
}
}
🔍 监控和调试
查看RabbitMQ管理界面
http://localhost:15672
用户名: admin
密码: your_password
日志输出
库已集成完整的日志记录,推荐配置日志级别:
{
"Logging": {
"LogLevel": {
"Dpz.Core.MessageQueue": "Information"
}
}
}
📚 架构说明
基础架构
┌─────────────────┐
│ 生产者服务 │
│ (任意项目) │
└────────┬────────┘
│ IMessagePublisher
↓
┌─────────────────┐
│ RabbitMQ │
│ Exchange/Queue │
└────────┬────────┘
│
↓
┌─────────────────┐
│ 消费者服务 │
│ (后台服务) │
│ IMessageHandler │
└─────────────────┘
完整消息流转时序图
sequenceDiagram
participant HF as Hangfire定时器
participant Crawler as 爬虫服务<br/>ItHomeActivator
participant Pub as 消息发布者<br/>RabbitMQPublisher
participant Ex as Exchange<br/>dpz.news.exchange
participant Q as Queue<br/>dpz.news.article.queue
participant Consumer as 消费者后台服务<br/>RabbitMQConsumerBackgroundService
participant Handler as 消息处理器<br/>NewsArticleMessageHandler
participant Service as ArticleService
participant DB as MongoDB
rect rgb(225, 245, 255)
Note over HF,Crawler: 步骤1-3: 爬取数据(家庭服务器)
HF->>Crawler: 每3小时触发 StartAsync()
Crawler->>Crawler: 获取RSS源
Crawler->>Crawler: 解析文章列表
Crawler->>Crawler: 过滤已存在的文章
loop 处理每篇文章
Crawler->>Crawler: 下载HTML内容
Crawler->>Crawler: 转换为Markdown
Crawler->>Crawler: 下载图片到又拍云
end
end
rect rgb(255, 244, 230)
Note over Crawler,Q: 步骤4-5: 发送消息到云端RabbitMQ
Crawler->>Pub: 创建 NewsArticleMessage
Pub->>Pub: 序列化为JSON
Pub->>Pub: 设置消息属性<br/>(Persistent=true, DeliveryMode=2)
Pub->>Ex: BasicPublish(exchange, routingKey, props, body)
Note over Pub,Ex: HTTPS连接<br/>家庭网络 → 云端
Ex->>Ex: 根据RoutingKey=news.article.#<br/>匹配绑定规则
Ex->>Q: 消息入队<br/>(持久化到磁盘)
Q-->>Pub: 确认接收
Pub-->>Crawler: 发送成功
end
rect rgb(232, 245, 233)
Note over Q,Handler: 步骤6-7: 多实例并发消费(云端)
par 多实例同时消费
Q->>Consumer: 实例1: BasicConsume(autoAck=false)
Consumer->>Consumer: 反序列化JSON
Consumer->>Handler: HandleAsync(message)
Handler->>Service: NoExistsByFromAsync(url)
Note over Handler,Service: 幂等性检查
Service-->>Handler: 检查结果
alt 消息已存在
Handler-->>Consumer: 返回 true
Consumer->>Q: BasicAck ✓
Note over Consumer,Q: 消息删除
else 消息不存在
Handler->>Handler: 转换为CreateArticleRequest
Handler->>Service: CreateArticleAsync(article)
Service->>DB: Insert Document
DB-->>Service: 保存成功
Service-->>Handler: ArticleResponse
Handler-->>Consumer: 返回 true
Consumer->>Q: BasicAck ✓
Note over Consumer,Q: 消息删除
end
and 其他实例同时进行
Note over Q: 实例2处理消息2
Note over Q: 实例3处理消息3
Note over Q: ...并发处理中
end
end
Note over HF,DB: 全流程完成:爬取 → 发送 → 消费 → 保存
多实例并发消费流程图
graph TB
subgraph "生产者(家庭服务器)"
P[爬虫服务<br/>生产消息]
end
subgraph "云端 RabbitMQ"
Q[Queue: dpz.news.article.queue<br/>包含100条消息]
end
subgraph "消费者集群(云端多实例)"
C1[消费者实例1<br/>QoS prefetchCount=1]
C2[消费者实例2<br/>QoS prefetchCount=1]
C3[消费者实例3<br/>QoS prefetchCount=1]
H1[Handler 1]
H2[Handler 2]
H3[Handler 3]
C1 --> H1
C2 --> H2
C3 --> H3
end
subgraph "数据库"
DB[(MongoDB)]
end
P -->|PublishAsync| Q
Q -->|轮询分发<br/>M1,M4,M7...| C1
Q -->|轮询分发<br/>M2,M5,M8...| C2
Q -->|轮询分发<br/>M3,M6,M9...| C3
H1 -->|并发写入| DB
H2 -->|并发写入| DB
H3 -->|并发写入| DB
style Q fill:#fff4e6
style C1 fill:#e8f5e9
style C2 fill:#e8f5e9
style C3 fill:#e8f5e9
style DB fill:#f3e5f5
style P fill:#e1f5ff
消息分发详细过程
graph LR
subgraph "Queue中的消息"
M1[消息1]
M2[消息2]
M3[消息3]
M4[消息4]
M5[消息5]
M6[消息6]
M7[消息7]
M8[消息8]
M9[消息9]
M10[消息10]
end
subgraph "RabbitMQ轮询分发"
RR[Round-robin<br/>轮询算法]
end
subgraph "消费者实例"
I1[实例1<br/>prefetchCount=1]
I2[实例2<br/>prefetchCount=1]
I3[实例3<br/>prefetchCount=1]
end
M1 --> RR
M2 --> RR
M3 --> RR
M4 --> RR
M5 --> RR
M6 --> RR
M7 --> RR
M8 --> RR
M9 --> RR
M10 --> RR
RR -->|第1轮| I1
RR -->|第1轮| I2
RR -->|第1轮| I3
RR -.->|第2轮<br/>实例ACK后| I1
RR -.->|第2轮<br/>实例ACK后| I2
RR -.->|第2轮<br/>实例ACK后| I3
style RR fill:#fff4e6
style I1 fill:#e8f5e9
style I2 fill:#e8f5e9
style I3 fill:#e8f5e9
云端部署架构(推荐生产环境)
graph TB
subgraph "家庭服务器内网"
A[爬虫服务<br/>Dpz.Core.Web.Jobs<br/>只负责生产消息]
A1[Hangfire定时任务<br/>每3小时触发]
A1 --> A
end
subgraph "云服务器集群"
subgraph "RabbitMQ集群(高可用)"
R1[RabbitMQ节点1<br/>主节点]
R2[RabbitMQ节点2<br/>镜像备份]
R3[RabbitMQ节点3<br/>镜像备份]
R1 -.队列镜像同步.-> R2
R2 -.队列镜像同步.-> R3
R3 -.队列镜像同步.-> R1
end
subgraph "消费者集群(水平扩展)"
C1[WebApi实例1<br/>消费者+API服务]
C2[WebApi实例2<br/>消费者+API服务]
C3[WebApi实例3<br/>消费者+API服务]
CN[WebApi实例N<br/>可动态扩展]
end
subgraph "Web服务集群"
W1[Dpz.Core.Web实例1]
W2[Dpz.Core.Web实例2]
end
LB[负载均衡器<br/>Nginx/ALB]
DB[(MongoDB集群<br/>副本集<br/>主+从1+从2)]
R1 --> C1
R1 --> C2
R1 --> C3
R1 --> CN
C1 --> DB
C2 --> DB
C3 --> DB
CN --> DB
LB --> W1
LB --> W2
LB --> C1
LB --> C2
LB --> C3
W1 --> DB
W2 --> DB
end
A -->|HTTPS/TLS<br/>通过公网发送消息| R1
Users[用户流量] -->|HTTPS| LB
style A fill:#e1f5ff
style R1 fill:#fff4e6
style R2 fill:#fff4e6
style R3 fill:#fff4e6
style C1 fill:#e8f5e9
style C2 fill:#e8f5e9
style C3 fill:#e8f5e9
style CN fill:#e8f5e9
style DB fill:#f3e5f5
style LB fill:#fce4ec
style W1 fill:#e3f2fd
style W2 fill:#e3f2fd
故障恢复流程
graph TB
Start([生产者发送消息]) --> Try{尝试发送}
Try -->|成功| Success[消息已发布]
Try -->|失败| CheckError{检查错误类型}
CheckError -->|网络延迟| Delay[等待重试<br/>AutomaticRecovery]
CheckError -->|RabbitMQ宕机| LocalQueue[保存到本地队列<br/>稍后同步]
CheckError -->|其他错误| Log[记录错误日志]
Delay --> Retry[自动重连后重试]
Retry --> Success
LocalQueue --> Background[后台任务定期检查]
Background -->|RabbitMQ恢复| Sync[同步本地队列消息]
Sync --> Success
Success --> End([消息安全送达])
style Success fill:#e8f5e9
style LocalQueue fill:#fff4e6
style End fill:#c8e6c9
🙋 常见问题
Pub->>Ex: BasicPublish(exchange, routingKey, props, body)
Ex->>Ex: 根据RoutingKey匹配绑定规则
Note over Ex,Q: 步骤5: 路由到Queue
Ex->>Q: 消息入队<br/>(持久化到磁盘)
Q-->>Pub: 确认接收
Pub-->>Crawler: 发送成功
end
Note over Consumer: 后台服务自动运行
Consumer->>Q: BasicConsume(queue, autoAck=false)
loop 消费消息
Note over Q,Consumer: 步骤6: 拉取消息
Q->>Consumer: 推送消息 (Received事件)
Consumer->>Consumer: 反序列化JSON
Note over Consumer,Handler: 步骤7: 处理消息
Consumer->>Handler: HandleAsync(message)
Handler->>Service: NoExistsByFromAsync(url)
Service-->>Handler: 检查是否已存在
alt 消息已存在
Handler-->>Consumer: 返回 true (幂等性)
Consumer->>Q: BasicAck(deliveryTag) ✓
else 消息不存在
Handler->>Handler: 转换为 CreateArticleRequest
Handler->>Service: CreateArticleAsync(article, author)
Note over Service,DB: 步骤8: 保存数据
Service->>DB: Insert Document
DB-->>Service: 保存成功
Service-->>Handler: 返回 ArticleResponse
Handler-->>Consumer: 返回 true
Consumer->>Q: BasicAck(deliveryTag) ✓
end
end
Note over Q: 消息被删除
```mermaid
graph TB
subgraph "家庭服务器内网"
A[爬虫服务<br/>只负责生产消息]
A1[Hangfire定时任务]
A1 --> A
end
subgraph "云服务器集群"
subgraph "RabbitMQ集群 (高可用)"
R1[RabbitMQ节点1<br/>主节点]
R2[RabbitMQ节点2<br/>镜像]
R3[RabbitMQ节点3<br/>镜像]
R1 -.镜像同步.-> R2
R2 -.镜像同步.-> R3
end
subgraph "消费者集群 (可水平扩展)"
C1[Web.Jobs 实例1<br/>消费者]
C2[Web.Jobs 实例2<br/>消费者]
C3[Web.Jobs 实例N<br/>消费者]
end
subgraph "应用服务集群"
W1[Dpz.Core.Web 实例1]
W2[Dpz.Core.Web 实例2]
API1[Dpz.Core.WebApi 实例1]
API2[Dpz.Core.WebApi 实例2]
end
LB[负载均衡器<br/>Nginx/HAProxy]
DB[(MongoDB集群<br/>副本集)]
R1 --> C1
R1 --> C2
R1 --> C3
C1 --> DB
C2 --> DB
C3 --> DB
LB --> W1
LB --> W2
LB --> API1
LB --> API2
W1 --> DB
W2 --> DB
API1 --> DB
API2 --> DB
end
A -->|HTTPS/TLS<br/>消息发送| R1
Users[用户] -->|HTTPS请求| LB
style A fill:#e1f5ff
style R1 fill:#fff4e6
style R2 fill:#fff4e6
style R3 fill:#fff4e6
style C1 fill:#e8f5e9
style C2 fill:#e8f5e9
style C3 fill:#e8f5e9
style DB fill:#f3e5f5
style LB fill:#fce4ec
🙋 常见问题
Q1: 如何确保消息不丢失?
A: 三重保证机制:
- 消息持久化 (
Persistent = true, DeliveryMode = 2) - Exchange和Queue持久化 (
durable: true) - 手动ACK确认机制 (
autoAck: false) - 自动重连机制 (
AutomaticRecoveryEnabled = true)
Q2: 如何处理消息顺序?
A: RabbitMQ不保证严格顺序(尤其是多实例消费)。如需顺序处理:
- 使用单一消费者实例
- 或使用消息分组(相同分组key的消息路由到同一实例)
- 或在应用层实现顺序控制(如版本号、时间戳)
Q3: 多个消费者实例如何工作?
A: 竞争消费者模式,RabbitMQ自动负载均衡:
- 轮询分发(Round-robin):消息1→实例1,消息2→实例2,消息3→实例3...
- 每条消息只发送给一个实例
- QoS
prefetchCount=1确保公平分配 - 示例:3个实例处理100条消息,每个实例约处理33条
Q4: 如何监控消息堆积?
A: RabbitMQ管理界面 http://rabbitmq:15672
Queues → dpz.news.article.queue:
├─ Messages ready: 1000 # 如果持续增长→增加消费者实例
├─ Messages unacked: 3 # 正在处理中
├─ Consumers: 3 # 当前消费者数量
├─ Publish rate: 50 msg/s # 生产速率
└─ Consumer rate: 30 msg/s # 消费速率(如果 < 生产速率→堆积)
Q5: 消息处理失败会怎样?
A: 自动重试机制:
- Handler返回
false→ 消息重新入队 - 最多重试3次(通过
message.RetryCount控制) - 超过3次 → 拒绝消息(可配置死信队列接收)
Q6: 如何实现幂等性?
A: 在Handler中检查唯一标识:
// 检查消息From字段(文章URL)是否已存在
var noExists = await _articleService.NoExistsByFromAsync([message.From]);
if (noExists.Count == 0)
{
return true; // 已存在,跳过处理
}
Q7: 如何扩展消费者实例数量?
A:
- Docker:
docker-compose up --scale consumer=5(扩展到5个实例) - Kubernetes:
kubectl scale deployment consumer --replicas=5 - 手动: 多次启动相同的应用程序即可
Q8: prefetchCount=1 是什么意思?
A: QoS(服务质量)配置,每个消费者一次只拉取1条消息:
- 处理完(ACK)后才拉取下一条
- 确保快的实例处理更多,慢的实例处理更少
- 实现动态负载均衡
Q9: 网络延迟会影响什么?
A: 主要影响生产者发送速度:
- 家庭服务器→云端RabbitMQ:延迟可能200-500ms
- 云端消费者→MongoDB:延迟<5ms
- 解决方案:实现本地消息表/本地队列降级
Q10: 如何保证高可用?
A: 多层保障:
- RabbitMQ集群部署(3节点镜像)
- 消费者多实例部署(自动容错)
- MongoDB副本集(主从备份)
- 消息持久化(服务重启不丢失)
📄 许可证
MIT License
评论加载中...