Dpz.Core.MessageQueue

基于 RabbitMQ 的消息队列基础设施库,提供优雅的消息发布和消费功能。

✨ 核心特性

  • 约定优于配置:自动根据消息类型生成 Exchange、Queue、RoutingKey
  • 类型安全:强类型消息定义和处理
  • 易于使用:简洁的 API 设计,几行代码即可完成配置
  • 可扩展:支持自定义路由约定
  • 可靠性:消息持久化、自动重试、错误处理
  • 依赖注入:完全集成.NET依赖注入系统

📦 安装

dotnet add package RabbitMQ.Client --version 6.8.1
dotnet add reference Dpz.Core.MessageQueue.csproj

🚀 快速开始

1. 定义消息

using Dpz.Core.MessageQueue.Messages;

// 最简单的方式:遵循命名约定
public class NewsArticleMessage : MessageBase
{
    public string Title { get; set; } = string.Empty;
    public string Content { get; set; } = string.Empty;
    public List<string> Tags { get; set; } = new();
}

自动生成的路由信息:

  • Exchange: dpz.news.exchange
  • Queue: dpz.news.article.queue
  • RoutingKey: news.article.#

2. 配置 appsettings.json

{
  "RabbitMQ": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "admin",
    "Password": "your_password",
    "VirtualHost": "/",
    "AutomaticRecoveryEnabled": true
  }
}

3. 注册生产者服务

// Program.cs
builder.Services.AddRabbitMQ(builder.Configuration);

4. 发布消息

public class NewsService
{
    private readonly IMessagePublisher<NewsArticleMessage> _publisher;

    public NewsService(IMessagePublisher<NewsArticleMessage> publisher)
    {
        _publisher = publisher;
    }

    public async Task PublishNewsAsync()
    {
        var message = new NewsArticleMessage
        {
            Title = "新闻标题",
            Content = "新闻内容",
            Tags = ["技术", "RabbitMQ"],
            Source = "NewsService"
        };

        await _publisher.PublishAsync(message);
    }
}

5. 创建消息处理器

using Dpz.Core.MessageQueue.Abstractions;

public class NewsArticleHandler : IMessageHandler<NewsArticleMessage>
{
    private readonly IArticleService _articleService;
    private readonly ILogger<NewsArticleHandler> _logger;

    public NewsArticleHandler(
        IArticleService articleService,
        ILogger<NewsArticleHandler> logger)
    {
        _articleService = articleService;
        _logger = logger;
    }

    public async Task<bool> HandleAsync(
        NewsArticleMessage message,
        CancellationToken cancellationToken = default)
    {
        try
        {
            _logger.LogInformation("处理新闻: {Title}", message.Title);

            // 保存到数据库
            await _articleService.CreateAsync(new CreateArticleRequest
            {
                Title = message.Title,
                Content = message.Content,
                Tags = message.Tags
            });

            return true; // 处理成功
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理新闻失败: {Title}", message.Title);
            return false; // 处理失败,将触发重试
        }
    }
}

6. 注册消费者

// Program.cs
builder.Services.AddRabbitMQ(builder.Configuration);
builder.Services.AddMessageConsumer<NewsArticleMessage, NewsArticleHandler>();

// 应用启动后,会自动开始消费消息

🎨 高级用法

自定义路由配置

如果默认约定不满足需求,可以使用特性自定义:

using Dpz.Core.MessageQueue.Attributes;

[MessageRoute(
    ExchangeName = "custom.exchange",
    RoutingKey = "custom.routing.key",
    QueueName = "custom.queue"
)]
public class CustomMessage : MessageBase
{
    public string Data { get; set; } = string.Empty;
}

批量发布消息

var messages = new List<NewsArticleMessage>
{
    new() { Title = "新闻1", Content = "内容1" },
    new() { Title = "新闻2", Content = "内容2" },
    new() { Title = "新闻3", Content = "内容3" }
};

await _publisher.PublishBatchAsync(messages);

指定自定义路由键

// 使用自定义路由键(覆盖约定)
await _publisher.PublishAsync(message, routingKey: "news.article.ithome");

自定义路由约定

public class MyCustomRoutingConvention : IMessageRoutingConvention
{
    public string GetExchangeName<TMessage>()
    {
        return $"my.{typeof(TMessage).Name.ToLower()}.exchange";
    }

    public string GetQueueName<TMessage>()
    {
        return $"my.{typeof(TMessage).Name.ToLower()}.queue";
    }

    public string GetRoutingKey<TMessage>()
    {
        return typeof(TMessage).Name.ToLower();
    }

    public string GetExchangeType<TMessage>()
    {
        return "topic";
    }
}

// 注册自定义约定
builder.Services.AddRabbitMQ(builder.Configuration);
builder.Services.AddCustomRoutingConvention<MyCustomRoutingConvention>();

📊 命名约定规则

默认约定(DefaultMessageRoutingConvention)

消息类型ExchangeQueueRoutingKey
NewsArticleMessagedpz.news.exchangedpz.news.article.queuenews.article.#
NewsVideoMessagedpz.news.exchangedpz.news.video.queuenews.video.#
CacheClearMessagedpz.cache.exchangedpz.cache.clear.queuecache.clear.#
UserRegisterMessagedpz.user.exchangedpz.user.register.queueuser.register.#

命名规则:

  1. 从消息类名中提取模块名(第一个词)和类型名(剩余词)
  2. Exchange: dpz.{模块}.exchange
  3. Queue: dpz.{模块}.{类型}.queue
  4. RoutingKey: {模块}.{类型}.#(Topic模式,支持通配符)

🔧 配置选项

{
  "RabbitMQ": {
    "HostName": "localhost",              // RabbitMQ服务器地址
    "Port": 5672,                         // 端口
    "UserName": "guest",                  // 用户名
    "Password": "guest",                  // 密码
    "VirtualHost": "/",                   // 虚拟主机
    "RequestedConnectionTimeout": 30,     // 连接超时(秒)
    "RequestedHeartbeat": 60,             // 心跳超时(秒)
    "AutomaticRecoveryEnabled": true,     // 自动重连
    "NetworkRecoveryInterval": 5          // 网络恢复间隔(秒)
  }
}

⚙️ 消息处理机制

重试机制

  • 消息处理失败时自动重试
  • 最多重试 3 次(通过 message.RetryCount 控制)
  • 超过重试次数后,消息将被丢弃(或发送到死信队列)

QoS设置

  • 预取数量(prefetchCount)设为 1,确保公平调度
  • 消息必须手动确认(ACK),保证消息不丢失

消息持久化

  • Exchange、Queue、Message 均设置为持久化
  • 服务重启后消息不丢失

� 多实例消费机制

竞争消费者模式(Competing Consumers Pattern)

当有多个消费者实例连接到同一个队列时,RabbitMQ会自动实现负载均衡:

                RabbitMQ Queue (100条消息)
                        │
        ┌───────────────┼───────────────┐
        │               │               │
        ↓               ↓               ↓
   消费者实例1      消费者实例2      消费者实例3
   处理: M1,M4,M7   处理: M2,M5,M8   处理: M3,M6,M9
      (33条)          (33条)          (34条)
        │               │               │
        └───────────────┴───────────────┘
                        ↓
                    MongoDB

关键特性

1. 轮询分发(Round-robin)

  • RabbitMQ采用轮询算法分发消息
  • 消息1 → 实例1,消息2 → 实例2,消息3 → 实例3,消息4 → 实例1...
  • 确保每个实例获得大致相同数量的消息

2. 每条消息只被消费一次

// RabbitMQ内部保证
Queue: [消息A, 消息B, 消息C]
         ↓      ↓      ↓
      实例1   实例2   实例3

// 消息A不会同时发送给实例1和实例2
// 除非实例1处理失败并重新入队

3. 并发处理

// 时间轴示例:
T0: Queue有10条消息
T1: 实例1拉取M1, 实例2拉取M2, 实例3拉取M3 (同时进行)
T2: 实例2最快完成M2 → ACK → 立即拉取M4
T3: 实例1完成M1 → ACK → 拉取M5
    实例3完成M3 → ACK → 拉取M6
T4: 实例2完成M4 → ACK → 拉取M7
    ...依此类推

// 结果:吞吐量提升N倍(N=实例数)

4. 自动负载均衡

// 代码中的QoS配置
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

// prefetchCount: 1 的作用:
// - 每个消费者一次只拉取1条消息
// - 处理完(ACK)后才拉取下一条
// - 快的实例自动处理更多消息
// - 慢的实例自动处理更少消息

示例:
实例1处理速度: 100ms/条 → 1秒处理10条
实例2处理速度: 200ms/条 → 1秒处理5条
实例3处理速度: 50ms/条  → 1秒处理20条

在10秒内:
- 实例1: 100条
- 实例2: 50条
- 实例3: 200条
总计: 350条 (单实例只能处理100条)

实际场景演示

场景1: 3个实例,队列中有10条消息

初始状态:
Queue: [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10]

T1: 每个实例拉取1条
实例1: 获得 M1 ← 开始处理
实例2: 获得 M2 ← 开始处理
实例3: 获得 M3 ← 开始处理
Queue: [M4, M5, M6, M7, M8, M9, M10]

T2: 实例2处理最快
实例2: M2处理完成 → BasicAck → 获得 M4 ← 开始处理
Queue: [M5, M6, M7, M8, M9, M10]

T3: 实例1和实例3也完成
实例1: M1处理完成 → BasicAck → 获得 M5
实例3: M3处理完成 → BasicAck → 获得 M6
Queue: [M7, M8, M9, M10]

T4: 继续轮询
实例2: M4处理完成 → 获得 M7
实例1: M5处理完成 → 获得 M8
实例3: M6处理完成 → 获得 M9
Queue: [M10]

T5: 最快的实例获得最后一条
实例2: M7处理完成 → 获得 M10
实例1: M8处理完成
实例3: M9处理完成
Queue: []

T6: 全部完成
实例1: 处理了 M1, M5, M8 (3条)
实例2: 处理了 M2, M4, M7, M10 (4条,因为最快)
实例3: 处理了 M3, M6, M9 (3条)

场景2: 处理失败与重试

T1: 实例1获得消息M1
实例1: 开始处理M1

T2: 处理过程中抛出异常
实例1: catch (Exception ex)
        {
            return false; // 处理失败
        }

T3: 消费者代码自动处理
Consumer: if (!success)
          {
              message.RetryCount++;  // 重试次数 +1
              if (message.RetryCount < 3)
              {
                  // 重新入队
                  _channel.BasicNack(deliveryTag, false, requeue: true);
              }
          }

T4: 消息M1重新回到队列
Queue: [M1, M4, M5, M6...]

T5: 可能由其他实例处理
实例2: 获得 M1 ← 再次尝试处理
实例3: 获得 M4

// 如果M1连续失败3次
实例2: M1失败 (RetryCount = 3)
Consumer: _channel.BasicNack(deliveryTag, false, requeue: false);
         // 消息被丢弃或进入死信队列

幂等性的重要性

// 为什么需要幂等性检查?

情况1: 网络波动导致超时
实例1: 处理消息M1 → 保存到数据库成功
       但在发送ACK之前,网络中断
       RabbitMQ未收到ACK,消息仍在队列中

实例2: 队列重新分发M1 → 再次尝试保存
       如果没有幂等性检查,会重复保存!

✅ 正确的处理方式:
public async Task<bool> HandleAsync(NewsArticleMessage message)
{
    // 幂等性检查
    var noExists = await _articleService.NoExistsByFromAsync([message.From]);
    if (noExists.Count == 0)
    {
        _logger.LogInformation("新闻已存在,跳过: {From}", message.From);
        return true; // 直接返回成功,避免重复保存
    }
    
    // 继续处理...
}

多实例部署配置

Docker Compose 多实例配置

version: '3.8'

services:
  # RabbitMQ服务
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    networks:
      - app-network

  # 消费者实例1
  consumer-1:
    image: dpz.webapi:latest
    environment:
      RabbitMQ__HostName: rabbitmq
      RabbitMQ__Port: 5672
      MessageQueue__EnableConsumer: true
    depends_on:
      - rabbitmq
    networks:
      - app-network

  # 消费者实例2
  consumer-2:
    image: dpz.webapi:latest
    environment:
      RabbitMQ__HostName: rabbitmq
      RabbitMQ__Port: 5672
      MessageQueue__EnableConsumer: true
    depends_on:
      - rabbitmq
    networks:
      - app-network

  # 消费者实例3
  consumer-3:
    image: dpz.webapi:latest
    environment:
      RabbitMQ__HostName: rabbitmq
      RabbitMQ__Port: 5672
      MessageQueue__EnableConsumer: true
    depends_on:
      - rabbitmq
    networks:
      - app-network

networks:
  app-network:
    driver: bridge

Kubernetes 水平扩展配置

apiVersion: apps/v1
kind: Deployment
metadata:
  name: message-consumer
spec:
  replicas: 3  # 3个实例
  selector:
    matchLabels:
      app: message-consumer
  template:
    metadata:
      labels:
        app: message-consumer
    spec:
      containers:
      - name: consumer
        image: dpz.webapi:latest
        env:
        - name: RabbitMQ__HostName
          value: "rabbitmq-service"
        - name: MessageQueue__EnableConsumer
          value: "true"
        resources:
          requests:
            memory: "256Mi"
            cpu: "200m"
          limits:
            memory: "512Mi"
            cpu: "500m"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: message-consumer-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: message-consumer
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 80

性能对比

场景单实例3实例5实例10实例
处理速度100条/分钟300条/分钟500条/分钟1000条/分钟
平均延迟600ms200ms120ms60ms
容错能力❌ 单点故障✅ 2个实例可失败✅ 4个实例可失败✅ 9个实例可失败
资源消耗中高

监控多实例消费

// 在RabbitMQ管理界面查看
// http://your-rabbitmq:15672

Queues → dpz.news.article.queue:
├─ Consumers: 3              // 当前连接的消费者数
├─ Messages ready: 50        // 待消费消息数
├─ Messages unacked: 3       // 正在处理中的消息数(每个实例1条)
├─ Publish rate: 10 msg/s    // 生产速率
└─ Consumer rate: 30 msg/s   // 消费速率(3个实例 × 10 msg/s)

// 如果发现队列堆积(ready消息数持续增长)
// → 增加消费者实例数
// → 检查消费者处理性能

注意事项

⚠️ 1. 避免无限扩展

消费者数量 ≈ (生产速率 / 消费速率) × 1.5
例如:生产速率 100条/分钟,单实例消费速率 40条/分钟
建议实例数 = (100 / 40) × 1.5 ≈ 4个实例

⚠️ 2. 数据库连接池

// 每个实例都会连接数据库,注意连接池配置
services.Configure<MongoSettings>(options =>
{
    options.MaxConnectionPoolSize = 100;  // 根据实例数调整
});

// 假设5个实例,每个实例100个连接
// MongoDB总连接数 = 5 × 100 = 500

⚠️ 3. 消息顺序

多实例消费无法保证严格顺序!

时间线:
T1: M1 → 实例1, M2 → 实例2
T2: 实例2处理完M2 → 先保存到数据库
T3: 实例1处理完M1 → 后保存到数据库

结果: M2先于M1保存(顺序颠倒)

解决方案:
- 如需严格顺序,使用单一消费者
- 或使用消息分组(相同分组key的消息发送到同一实例)

�📝 最佳实践

1. 消息设计

// ✅ 好的设计:包含所有需要的信息
public class NewsArticleMessage : MessageBase
{
    public string Title { get; set; } = string.Empty;
    public string Content { get; set; } = string.Empty;
    public string From { get; set; } = string.Empty;
    // ... 完整的数据
}

// ❌ 不好的设计:只包含ID
public class NewsArticleMessage : MessageBase
{
    public string ArticleId { get; set; } = string.Empty;
    // 消费者还需要调用API获取完整数据,增加耦合
}

2. 幂等性处理

public async Task<bool> HandleAsync(NewsArticleMessage message, CancellationToken ct)
{
    // 检查是否已处理过
    var exists = await _articleService.ExistsByFromAsync(message.From);
    if (exists)
    {
        _logger.LogInformation("消息已处理过,跳过: {From}", message.From);
        return true; // 返回成功,避免重复处理
    }

    // 处理消息...
}

3. 错误处理

public async Task<bool> HandleAsync(NewsArticleMessage message, CancellationToken ct)
{
    try
    {
        // 业务逻辑
        await _articleService.CreateAsync(...);
        return true;
    }
    catch (ValidationException ex)
    {
        // 业务验证错误,不应重试
        _logger.LogWarning(ex, "数据验证失败,忽略消息");
        return true; // 返回true避免重试
    }
    catch (Exception ex)
    {
        // 系统错误或暂时性故障,应该重试
        _logger.LogError(ex, "处理失败,将重试");
        return false;
    }
}

🔍 监控和调试

查看RabbitMQ管理界面

http://localhost:15672
用户名: admin
密码: your_password

日志输出

库已集成完整的日志记录,推荐配置日志级别:

{
  "Logging": {
    "LogLevel": {
      "Dpz.Core.MessageQueue": "Information"
    }
  }
}

📚 架构说明

基础架构

┌─────────────────┐
│  生产者服务      │
│  (任意项目)      │
└────────┬────────┘
         │ IMessagePublisher
         ↓
┌─────────────────┐
│  RabbitMQ       │
│  Exchange/Queue │
└────────┬────────┘
         │
         ↓
┌─────────────────┐
│  消费者服务      │
│  (后台服务)      │
│ IMessageHandler  │
└─────────────────┘

完整消息流转时序图

sequenceDiagram
    participant HF as Hangfire定时器
    participant Crawler as 爬虫服务<br/>ItHomeActivator
    participant Pub as 消息发布者<br/>RabbitMQPublisher
    participant Ex as Exchange<br/>dpz.news.exchange
    participant Q as Queue<br/>dpz.news.article.queue
    participant Consumer as 消费者后台服务<br/>RabbitMQConsumerBackgroundService
    participant Handler as 消息处理器<br/>NewsArticleMessageHandler
    participant Service as ArticleService
    participant DB as MongoDB

    rect rgb(225, 245, 255)
        Note over HF,Crawler: 步骤1-3: 爬取数据(家庭服务器)
        HF->>Crawler: 每3小时触发 StartAsync()
        Crawler->>Crawler: 获取RSS源
        Crawler->>Crawler: 解析文章列表
        Crawler->>Crawler: 过滤已存在的文章
        
        loop 处理每篇文章
            Crawler->>Crawler: 下载HTML内容
            Crawler->>Crawler: 转换为Markdown
            Crawler->>Crawler: 下载图片到又拍云
        end
    end
    
    rect rgb(255, 244, 230)
        Note over Crawler,Q: 步骤4-5: 发送消息到云端RabbitMQ
        Crawler->>Pub: 创建 NewsArticleMessage
        Pub->>Pub: 序列化为JSON
        Pub->>Pub: 设置消息属性<br/>(Persistent=true, DeliveryMode=2)
        
        Pub->>Ex: BasicPublish(exchange, routingKey, props, body)
        Note over Pub,Ex: HTTPS连接<br/>家庭网络 → 云端
        Ex->>Ex: 根据RoutingKey=news.article.#<br/>匹配绑定规则
        Ex->>Q: 消息入队<br/>(持久化到磁盘)
        Q-->>Pub: 确认接收
        Pub-->>Crawler: 发送成功
    end
    
    rect rgb(232, 245, 233)
        Note over Q,Handler: 步骤6-7: 多实例并发消费(云端)
        
        par 多实例同时消费
            Q->>Consumer: 实例1: BasicConsume(autoAck=false)
            Consumer->>Consumer: 反序列化JSON
            Consumer->>Handler: HandleAsync(message)
            
            Handler->>Service: NoExistsByFromAsync(url)
            Note over Handler,Service: 幂等性检查
            Service-->>Handler: 检查结果
            
            alt 消息已存在
                Handler-->>Consumer: 返回 true
                Consumer->>Q: BasicAck ✓
                Note over Consumer,Q: 消息删除
            else 消息不存在
                Handler->>Handler: 转换为CreateArticleRequest
                Handler->>Service: CreateArticleAsync(article)
                Service->>DB: Insert Document
                DB-->>Service: 保存成功
                Service-->>Handler: ArticleResponse
                Handler-->>Consumer: 返回 true
                Consumer->>Q: BasicAck ✓
                Note over Consumer,Q: 消息删除
            end
        and 其他实例同时进行
            Note over Q: 实例2处理消息2
            Note over Q: 实例3处理消息3
            Note over Q: ...并发处理中
        end
    end
    
    Note over HF,DB: 全流程完成:爬取 → 发送 → 消费 → 保存

多实例并发消费流程图

graph TB
    subgraph "生产者(家庭服务器)"
        P[爬虫服务<br/>生产消息]
    end
    
    subgraph "云端 RabbitMQ"
        Q[Queue: dpz.news.article.queue<br/>包含100条消息]
    end
    
    subgraph "消费者集群(云端多实例)"
        C1[消费者实例1<br/>QoS prefetchCount=1]
        C2[消费者实例2<br/>QoS prefetchCount=1]
        C3[消费者实例3<br/>QoS prefetchCount=1]
        
        H1[Handler 1]
        H2[Handler 2]
        H3[Handler 3]
        
        C1 --> H1
        C2 --> H2
        C3 --> H3
    end
    
    subgraph "数据库"
        DB[(MongoDB)]
    end
    
    P -->|PublishAsync| Q
    
    Q -->|轮询分发<br/>M1,M4,M7...| C1
    Q -->|轮询分发<br/>M2,M5,M8...| C2
    Q -->|轮询分发<br/>M3,M6,M9...| C3
    
    H1 -->|并发写入| DB
    H2 -->|并发写入| DB
    H3 -->|并发写入| DB
    
    style Q fill:#fff4e6
    style C1 fill:#e8f5e9
    style C2 fill:#e8f5e9
    style C3 fill:#e8f5e9
    style DB fill:#f3e5f5
    style P fill:#e1f5ff

消息分发详细过程

graph LR
    subgraph "Queue中的消息"
        M1[消息1]
        M2[消息2]
        M3[消息3]
        M4[消息4]
        M5[消息5]
        M6[消息6]
        M7[消息7]
        M8[消息8]
        M9[消息9]
        M10[消息10]
    end
    
    subgraph "RabbitMQ轮询分发"
        RR[Round-robin<br/>轮询算法]
    end
    
    subgraph "消费者实例"
        I1[实例1<br/>prefetchCount=1]
        I2[实例2<br/>prefetchCount=1]
        I3[实例3<br/>prefetchCount=1]
    end
    
    M1 --> RR
    M2 --> RR
    M3 --> RR
    M4 --> RR
    M5 --> RR
    M6 --> RR
    M7 --> RR
    M8 --> RR
    M9 --> RR
    M10 --> RR
    
    RR -->|第1轮| I1
    RR -->|第1轮| I2
    RR -->|第1轮| I3
    
    RR -.->|第2轮<br/>实例ACK后| I1
    RR -.->|第2轮<br/>实例ACK后| I2
    RR -.->|第2轮<br/>实例ACK后| I3
    
    style RR fill:#fff4e6
    style I1 fill:#e8f5e9
    style I2 fill:#e8f5e9
    style I3 fill:#e8f5e9

云端部署架构(推荐生产环境)

graph TB
    subgraph "家庭服务器内网"
        A[爬虫服务<br/>Dpz.Core.Web.Jobs<br/>只负责生产消息]
        A1[Hangfire定时任务<br/>每3小时触发]
        
        A1 --> A
    end
    
    subgraph "云服务器集群"
        subgraph "RabbitMQ集群(高可用)"
            R1[RabbitMQ节点1<br/>主节点]
            R2[RabbitMQ节点2<br/>镜像备份]
            R3[RabbitMQ节点3<br/>镜像备份]
            
            R1 -.队列镜像同步.-> R2
            R2 -.队列镜像同步.-> R3
            R3 -.队列镜像同步.-> R1
        end
        
        subgraph "消费者集群(水平扩展)"
            C1[WebApi实例1<br/>消费者+API服务]
            C2[WebApi实例2<br/>消费者+API服务]
            C3[WebApi实例3<br/>消费者+API服务]
            CN[WebApi实例N<br/>可动态扩展]
        end
        
        subgraph "Web服务集群"
            W1[Dpz.Core.Web实例1]
            W2[Dpz.Core.Web实例2]
        end
        
        LB[负载均衡器<br/>Nginx/ALB]
        
        DB[(MongoDB集群<br/>副本集<br/>主+从1+从2)]
        
        R1 --> C1
        R1 --> C2
        R1 --> C3
        R1 --> CN
        
        C1 --> DB
        C2 --> DB
        C3 --> DB
        CN --> DB
        
        LB --> W1
        LB --> W2
        LB --> C1
        LB --> C2
        LB --> C3
        
        W1 --> DB
        W2 --> DB
    end
    
    A -->|HTTPS/TLS<br/>通过公网发送消息| R1
    
    Users[用户流量] -->|HTTPS| LB
    
    style A fill:#e1f5ff
    style R1 fill:#fff4e6
    style R2 fill:#fff4e6
    style R3 fill:#fff4e6
    style C1 fill:#e8f5e9
    style C2 fill:#e8f5e9
    style C3 fill:#e8f5e9
    style CN fill:#e8f5e9
    style DB fill:#f3e5f5
    style LB fill:#fce4ec
    style W1 fill:#e3f2fd
    style W2 fill:#e3f2fd

故障恢复流程

graph TB
    Start([生产者发送消息]) --> Try{尝试发送}
    
    Try -->|成功| Success[消息已发布]
    Try -->|失败| CheckError{检查错误类型}
    
    CheckError -->|网络延迟| Delay[等待重试<br/>AutomaticRecovery]
    CheckError -->|RabbitMQ宕机| LocalQueue[保存到本地队列<br/>稍后同步]
    CheckError -->|其他错误| Log[记录错误日志]
    
    Delay --> Retry[自动重连后重试]
    Retry --> Success
    
    LocalQueue --> Background[后台任务定期检查]
    Background -->|RabbitMQ恢复| Sync[同步本地队列消息]
    Sync --> Success
    
    Success --> End([消息安全送达])
    
    style Success fill:#e8f5e9
    style LocalQueue fill:#fff4e6
    style End fill:#c8e6c9

🙋 常见问题

    Pub->>Ex: BasicPublish(exchange, routingKey, props, body)
    Ex->>Ex: 根据RoutingKey匹配绑定规则
    
    Note over Ex,Q: 步骤5: 路由到Queue
    Ex->>Q: 消息入队<br/>(持久化到磁盘)
    Q-->>Pub: 确认接收
    Pub-->>Crawler: 发送成功
end

Note over Consumer: 后台服务自动运行
Consumer->>Q: BasicConsume(queue, autoAck=false)

loop 消费消息
    Note over Q,Consumer: 步骤6: 拉取消息
    Q->>Consumer: 推送消息 (Received事件)
    Consumer->>Consumer: 反序列化JSON
    
    Note over Consumer,Handler: 步骤7: 处理消息
    Consumer->>Handler: HandleAsync(message)
    
    Handler->>Service: NoExistsByFromAsync(url)
    Service-->>Handler: 检查是否已存在
    
    alt 消息已存在
        Handler-->>Consumer: 返回 true (幂等性)
        Consumer->>Q: BasicAck(deliveryTag) ✓
    else 消息不存在
        Handler->>Handler: 转换为 CreateArticleRequest
        Handler->>Service: CreateArticleAsync(article, author)
        
        Note over Service,DB: 步骤8: 保存数据
        Service->>DB: Insert Document
        DB-->>Service: 保存成功
        Service-->>Handler: 返回 ArticleResponse
        Handler-->>Consumer: 返回 true
        Consumer->>Q: BasicAck(deliveryTag) ✓
    end
end

Note over Q: 消息被删除

```mermaid
graph TB
    subgraph "家庭服务器内网"
        A[爬虫服务<br/>只负责生产消息]
        A1[Hangfire定时任务]
        
        A1 --> A
    end
    
    subgraph "云服务器集群"
        subgraph "RabbitMQ集群 (高可用)"
            R1[RabbitMQ节点1<br/>主节点]
            R2[RabbitMQ节点2<br/>镜像]
            R3[RabbitMQ节点3<br/>镜像]
            
            R1 -.镜像同步.-> R2
            R2 -.镜像同步.-> R3
        end
        
        subgraph "消费者集群 (可水平扩展)"
            C1[Web.Jobs 实例1<br/>消费者]
            C2[Web.Jobs 实例2<br/>消费者]
            C3[Web.Jobs 实例N<br/>消费者]
        end
        
        subgraph "应用服务集群"
            W1[Dpz.Core.Web 实例1]
            W2[Dpz.Core.Web 实例2]
            API1[Dpz.Core.WebApi 实例1]
            API2[Dpz.Core.WebApi 实例2]
        end
        
        LB[负载均衡器<br/>Nginx/HAProxy]
        
        DB[(MongoDB集群<br/>副本集)]
        
        R1 --> C1
        R1 --> C2
        R1 --> C3
        
        C1 --> DB
        C2 --> DB
        C3 --> DB
        
        LB --> W1
        LB --> W2
        LB --> API1
        LB --> API2
        
        W1 --> DB
        W2 --> DB
        API1 --> DB
        API2 --> DB
    end
    
    A -->|HTTPS/TLS<br/>消息发送| R1
    
    Users[用户] -->|HTTPS请求| LB
    
    style A fill:#e1f5ff
    style R1 fill:#fff4e6
    style R2 fill:#fff4e6
    style R3 fill:#fff4e6
    style C1 fill:#e8f5e9
    style C2 fill:#e8f5e9
    style C3 fill:#e8f5e9
    style DB fill:#f3e5f5
    style LB fill:#fce4ec

🙋 常见问题

Q1: 如何确保消息不丢失?
A: 三重保证机制:

  • 消息持久化 (Persistent = true, DeliveryMode = 2)
  • Exchange和Queue持久化 (durable: true)
  • 手动ACK确认机制 (autoAck: false)
  • 自动重连机制 (AutomaticRecoveryEnabled = true)

Q2: 如何处理消息顺序?
A: RabbitMQ不保证严格顺序(尤其是多实例消费)。如需顺序处理:

  • 使用单一消费者实例
  • 或使用消息分组(相同分组key的消息路由到同一实例)
  • 或在应用层实现顺序控制(如版本号、时间戳)

Q3: 多个消费者实例如何工作?
A: 竞争消费者模式,RabbitMQ自动负载均衡:

  • 轮询分发(Round-robin):消息1→实例1,消息2→实例2,消息3→实例3...
  • 每条消息只发送给一个实例
  • QoS prefetchCount=1 确保公平分配
  • 示例:3个实例处理100条消息,每个实例约处理33条

Q4: 如何监控消息堆积?
A: RabbitMQ管理界面 http://rabbitmq:15672

Queues → dpz.news.article.queue:
├─ Messages ready: 1000      # 如果持续增长→增加消费者实例
├─ Messages unacked: 3       # 正在处理中
├─ Consumers: 3              # 当前消费者数量
├─ Publish rate: 50 msg/s    # 生产速率
└─ Consumer rate: 30 msg/s   # 消费速率(如果 < 生产速率→堆积)

Q5: 消息处理失败会怎样?
A: 自动重试机制:

  • Handler返回false → 消息重新入队
  • 最多重试3次(通过message.RetryCount控制)
  • 超过3次 → 拒绝消息(可配置死信队列接收)

Q6: 如何实现幂等性?
A: 在Handler中检查唯一标识:

// 检查消息From字段(文章URL)是否已存在
var noExists = await _articleService.NoExistsByFromAsync([message.From]);
if (noExists.Count == 0)
{
    return true; // 已存在,跳过处理
}

Q7: 如何扩展消费者实例数量?
A:

  • Docker: docker-compose up --scale consumer=5 (扩展到5个实例)
  • Kubernetes: kubectl scale deployment consumer --replicas=5
  • 手动: 多次启动相同的应用程序即可

Q8: prefetchCount=1 是什么意思?
A: QoS(服务质量)配置,每个消费者一次只拉取1条消息:

  • 处理完(ACK)后才拉取下一条
  • 确保快的实例处理更多,慢的实例处理更少
  • 实现动态负载均衡

Q9: 网络延迟会影响什么?
A: 主要影响生产者发送速度:

  • 家庭服务器→云端RabbitMQ:延迟可能200-500ms
  • 云端消费者→MongoDB:延迟<5ms
  • 解决方案:实现本地消息表/本地队列降级

Q10: 如何保证高可用?
A: 多层保障:

  • RabbitMQ集群部署(3节点镜像)
  • 消费者多实例部署(自动容错)
  • MongoDB副本集(主从备份)
  • 消息持久化(服务重启不丢失)

📄 许可证

MIT License

评论加载中...