using System.Text;
using Dpz.Core.MessageQueue.Abstractions;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
namespace Dpz.Core.MessageQueue.RabbitMQ;
public sealed class RabbitMQRawPublisher(
IRabbitMQConnectionFactory connectionFactory,
ILogger<RabbitMQRawPublisher> logger
) : IMessageOutboxRetryPublisher
{
public async Task PublishRawAsync(
string exchange,
string routingKey,
string messageId,
string jsonPayload,
CancellationToken cancellationToken = default
)
{
var channel = await connectionFactory.CreateChannelAsync();
await using var _ = channel;
var body = Encoding.UTF8.GetBytes(jsonPayload);
var properties = new BasicProperties
{
Persistent = true,
MessageId = messageId,
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent,
};
await channel.BasicPublishAsync(
exchange: exchange,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
);
logger.LogInformation(
"Outbox重试发布: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
messageId,
exchange,
routingKey
);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于RabbitMQ的消息发布器类,专门用于处理消息出站箱(Outbox)的重试发布功能。让我详细解释一下这个代码:
类结构和依赖注入
public sealed class RabbitMQRawPublisher(
IRabbitMQConnectionFactory connectionFactory,
ILogger<RabbitMQRawPublisher> logger
) : IMessageOutboxRetryPublisher
- 使用了C# 12的主构造函数语法
- 实现了
IMessageOutboxRetryPublisher接口 - 依赖注入了两个服务:
IRabbitMQConnectionFactory:用于创建RabbitMQ连接和通道ILogger<RabbitMQRawPublisher>:用于日志记录
核心方法:PublishRawAsync
这个方法负责将原始JSON消息发布到RabbitMQ:
1. 通道管理
var channel = await connectionFactory.CreateChannelAsync();
await using var _ = channel;
- 异步创建RabbitMQ通道
- 使用
await using确保通道在方法结束时被正确释放
2. 消息体准备
var body = Encoding.UTF8.GetBytes(jsonPayload);
- 将JSON字符串转换为UTF-8字节数组
3. 消息属性配置
var properties = new BasicProperties
{
Persistent = true, // 消息持久化
MessageId = messageId, // 消息ID
Timestamp = new AmqpTimestamp(...), // 时间戳
ContentType = "application/json", // 内容类型
DeliveryMode = DeliveryModes.Persistent, // 持久化投递模式
};
关键属性说明:
- Persistent: 确保消息在RabbitMQ重启后仍然存在
- MessageId: 用于消息去重和追踪
- Timestamp: 记录消息发送时间
- ContentType: 标识消息格式为JSON
- DeliveryMode: 设置为持久化模式
4. 消息发布
await channel.BasicPublishAsync(
exchange: exchange,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
);
参数说明:
- exchange: 交换机名称
- routingKey: 路由键
- mandatory: false表示如果消息无法路由也不报错
- basicProperties: 消息属性
- body: 消息内容
- cancellationToken: 取消令牌支持
5. 日志记录
logger.LogInformation(
"Outbox重试发布: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
messageId, exchange, routingKey
);
记录发布成功的日志,包含关键信息用于追踪和调试。
设计特点
- Outbox模式支持: 专门用于消息出站箱的重试发布,这是一种确保数据一致性的模式
- 异步操作: 全程使用异步方法,提高性能
- 资源管理: 正确使用
await using管理RabbitMQ通道生命周期 - 消息持久化: 确保消息在系统故障时不会丢失
- 可观测性: 通过日志记录提供良好的可追踪性
- 取消支持: 支持
CancellationToken用于优雅取消操作
这个类是消息队列基础设施的重要组成部分,特别适用于需要保证消息最终一致性的分布式系统场景。
评论加载中...