using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Microsoft.Extensions.Logging;
namespace Dpz.Core.MessageQueue.RabbitMQ;
public sealed class OutboxMessagePublisher<TMessage>(
RabbitMQPublisher<TMessage> inner,
IMessageOutboxStore outboxStore,
IMessageRoutingConvention routingConvention,
ILogger<OutboxMessagePublisher<TMessage>> logger
) : IMessagePublisher<TMessage>
where TMessage : MessageBase
{
private readonly string _exchangeName = routingConvention.GetExchangeName<TMessage>();
private readonly string _defaultRoutingKey = routingConvention.GetRoutingKey<TMessage>();
public async Task PublishAsync(
TMessage message,
string? routingKey = null,
CancellationToken cancellationToken = default
)
{
var actualRoutingKey = routingKey ?? _defaultRoutingKey;
var payload = JsonSerializer.Serialize(message);
await outboxStore.CreateAsync(
message.MessageId,
typeof(TMessage).FullName!,
_exchangeName,
actualRoutingKey,
payload,
message.Source,
cancellationToken
);
try
{
await inner.PublishAsync(message, routingKey, cancellationToken);
await outboxStore.MarkSentAsync(message.MessageId, cancellationToken);
}
catch (Exception ex)
{
logger.LogError(
ex,
"消息发布失败,已记录至Outbox: MessageId={MessageId}, Exchange={Exchange}",
message.MessageId,
_exchangeName
);
await outboxStore.MarkPublishFailedAsync(
message.MessageId,
ex.Message,
cancellationToken
);
throw;
}
}
public async Task PublishBatchAsync(
IEnumerable<TMessage> messages,
string? routingKey = null,
CancellationToken cancellationToken = default
)
{
var actualRoutingKey = routingKey ?? _defaultRoutingKey;
var messageList = messages.ToList();
foreach (var message in messageList)
{
var payload = JsonSerializer.Serialize(message);
await outboxStore.CreateAsync(
message.MessageId,
typeof(TMessage).FullName!,
_exchangeName,
actualRoutingKey,
payload,
message.Source,
cancellationToken
);
}
try
{
await inner.PublishBatchAsync(messageList, routingKey, cancellationToken);
foreach (var message in messageList)
{
await outboxStore.MarkSentAsync(message.MessageId, cancellationToken);
}
}
catch (Exception ex)
{
logger.LogError(
ex,
"批量消息发布失败,已记录至Outbox: Count={Count}, Exchange={Exchange}",
messageList.Count,
_exchangeName
);
foreach (var message in messageList)
{
await outboxStore.MarkPublishFailedAsync(
message.MessageId,
ex.Message,
cancellationToken
);
}
throw;
}
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个实现了Outbox模式的RabbitMQ消息发布器,用于确保消息发布的可靠性。让我详细解释这个类的设计和功能:
类概述
OutboxMessagePublisher<TMessage> 是一个泛型类,它装饰了内部的 RabbitMQPublisher,为消息发布添加了Outbox模式的支持。
核心设计模式
1. Outbox模式
这是分布式系统中确保数据一致性的重要模式:
- 在发布消息前,先将消息保存到本地存储(Outbox表)
- 然后尝试发布消息
- 根据发布结果更新Outbox记录的状态
2. 装饰器模式
通过包装 RabbitMQPublisher<TMessage> 来增强其功能,而不修改原有实现。
构造函数参数
public sealed class OutboxMessagePublisher<TMessage>(
RabbitMQPublisher<TMessage> inner, // 被装饰的原始发布器
IMessageOutboxStore outboxStore, // Outbox存储接口
IMessageRoutingConvention routingConvention, // 路由约定
ILogger<OutboxMessagePublisher<TMessage>> logger // 日志记录器
)
核心功能
单消息发布 (PublishAsync)
执行流程:
- 预处理:确定路由键,序列化消息
- 记录Outbox:将消息信息保存到Outbox存储
- 发布消息:调用内部发布器发送消息
- 成功处理:标记消息为已发送
- 异常处理:记录发布失败状态并重新抛出异常
批量消息发布 (PublishBatchAsync)
类似单消息发布,但针对消息集合:
- 先将所有消息记录到Outbox
- 批量发布所有消息
- 根据结果批量更新状态
关键特性
1. 可靠性保证
// 先记录到Outbox,确保消息不会丢失
await outboxStore.CreateAsync(message.MessageId, ...);
// 然后尝试发布
await inner.PublishAsync(message, routingKey, cancellationToken);
2. 状态跟踪
- 成功发布:
MarkSentAsync - 发布失败:
MarkPublishFailedAsync
3. 异常透明性
发布失败时,异常会被重新抛出,保持原有的错误处理语义。
使用场景
这种设计特别适用于:
- 微服务架构:需要确保事务完整性
- 高可靠性要求:不能丢失重要业务消息
- 分布式事务:需要最终一致性保证
优势
- 消息不丢失:即使发布失败,消息也已保存在Outbox中
- 可重试:失败的消息可以通过Outbox重新发布
- 可监控:通过Outbox可以监控消息发布状态
- 事务一致性:支持与业务操作在同一事务中执行
这是一个非常成熟的企业级消息发布解决方案,很好地解决了分布式系统中消息可靠性的问题。
评论加载中...