using System.Text;
using Dpz.Core.MessageQueue.Abstractions;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;

namespace Dpz.Core.MessageQueue.RabbitMQ;

public sealed class RabbitMQRawPublisher(
    IRabbitMQConnectionFactory connectionFactory,
    ILogger<RabbitMQRawPublisher> logger
) : IMessageOutboxRetryPublisher
{
    public async Task PublishRawAsync(
        string exchange,
        string routingKey,
        string messageId,
        string jsonPayload,
        CancellationToken cancellationToken = default
    )
    {
        var channel = await connectionFactory.CreateChannelAsync();
        await using var _ = channel;

        var body = Encoding.UTF8.GetBytes(jsonPayload);
        var properties = new BasicProperties
        {
            Persistent = true,
            MessageId = messageId,
            Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
            ContentType = "application/json",
            DeliveryMode = DeliveryModes.Persistent,
        };

        await channel.BasicPublishAsync(
            exchange: exchange,
            routingKey: routingKey,
            mandatory: false,
            basicProperties: properties,
            body: body,
            cancellationToken: cancellationToken
        );

        logger.LogInformation(
            "Outbox重试发布: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
            messageId,
            exchange,
            routingKey
        );
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于RabbitMQ的消息发布器类,专门用于处理消息出站箱(Outbox)的重试发布功能。让我详细解释一下这个代码:

类结构和依赖注入

public sealed class RabbitMQRawPublisher(
    IRabbitMQConnectionFactory connectionFactory,
    ILogger<RabbitMQRawPublisher> logger
) : IMessageOutboxRetryPublisher
  • 使用了C# 12的主构造函数语法
  • 实现了 IMessageOutboxRetryPublisher 接口
  • 依赖注入了两个服务:
    • IRabbitMQConnectionFactory:用于创建RabbitMQ连接和通道
    • ILogger<RabbitMQRawPublisher>:用于日志记录

核心方法:PublishRawAsync

这个方法负责将原始JSON消息发布到RabbitMQ:

1. 通道管理

var channel = await connectionFactory.CreateChannelAsync();
await using var _ = channel;
  • 异步创建RabbitMQ通道
  • 使用 await using 确保通道在方法结束时被正确释放

2. 消息体准备

var body = Encoding.UTF8.GetBytes(jsonPayload);
  • 将JSON字符串转换为UTF-8字节数组

3. 消息属性配置

var properties = new BasicProperties
{
    Persistent = true,                    // 消息持久化
    MessageId = messageId,               // 消息ID
    Timestamp = new AmqpTimestamp(...),  // 时间戳
    ContentType = "application/json",    // 内容类型
    DeliveryMode = DeliveryModes.Persistent, // 持久化投递模式
};

关键属性说明:

  • Persistent: 确保消息在RabbitMQ重启后仍然存在
  • MessageId: 用于消息去重和追踪
  • Timestamp: 记录消息发送时间
  • ContentType: 标识消息格式为JSON
  • DeliveryMode: 设置为持久化模式

4. 消息发布

await channel.BasicPublishAsync(
    exchange: exchange,
    routingKey: routingKey,
    mandatory: false,
    basicProperties: properties,
    body: body,
    cancellationToken: cancellationToken
);

参数说明:

  • exchange: 交换机名称
  • routingKey: 路由键
  • mandatory: false表示如果消息无法路由也不报错
  • basicProperties: 消息属性
  • body: 消息内容
  • cancellationToken: 取消令牌支持

5. 日志记录

logger.LogInformation(
    "Outbox重试发布: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
    messageId, exchange, routingKey
);

记录发布成功的日志,包含关键信息用于追踪和调试。

设计特点

  1. Outbox模式支持: 专门用于消息出站箱的重试发布,这是一种确保数据一致性的模式
  2. 异步操作: 全程使用异步方法,提高性能
  3. 资源管理: 正确使用 await using 管理RabbitMQ通道生命周期
  4. 消息持久化: 确保消息在系统故障时不会丢失
  5. 可观测性: 通过日志记录提供良好的可追踪性
  6. 取消支持: 支持 CancellationToken 用于优雅取消操作

这个类是消息队列基础设施的重要组成部分,特别适用于需要保证消息最终一致性的分布式系统场景。

评论加载中...