using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Microsoft.Extensions.Logging;
namespace Dpz.Core.MessageQueue.RabbitMQ;
/// <summary>
/// Outbox 消息补发执行器,负责读取待重试记录并使用原始消息发布器重新投递。
/// </summary>
/// <param name="outboxStore">Outbox 存储接口。</param>
/// <param name="retryPublisher">绕过 Outbox 装饰器的原始消息发布器。</param>
/// <param name="logger">日志记录器。</param>
public sealed class MessageOutboxRetryService(
IMessageOutboxStore outboxStore,
IMessageOutboxRetryPublisher retryPublisher,
ILogger<MessageOutboxRetryService> logger
) : IMessageOutboxRetryService
{
/// <summary>
/// 执行一轮 Outbox 补发,先处理发布失败消息,再处理消费失败消息。
/// </summary>
/// <param name="batchSize">本轮每类状态最多处理的消息数量。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>本轮补发的处理结果。</returns>
public async Task<MessageOutboxRetryResult> RetryAsync(
int batchSize,
CancellationToken cancellationToken = default
)
{
var publishResult = await RetryPublishAsync(batchSize, cancellationToken);
var consumeResult = await RetryConsumeAsync(batchSize, cancellationToken);
return new MessageOutboxRetryResult(
publishResult.TotalCount,
publishResult.FailureCount,
consumeResult.TotalCount,
consumeResult.FailureCount
);
}
private async Task<RetryCount> RetryPublishAsync(
int batchSize,
CancellationToken cancellationToken
)
{
var entries = await outboxStore.GetPendingPublishRetryAsync(batchSize, cancellationToken);
if (entries.Count == 0)
{
return RetryCount.Empty;
}
logger.LogInformation("Outbox 发布重试:共 {Count} 条", entries.Count);
var failureCount = 0;
foreach (var entry in entries)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
// 直接发布已持久化的 JSON 载荷,避免再次经过 Outbox 装饰器生成新记录。
await retryPublisher.PublishRawAsync(
entry.Exchange,
entry.RoutingKey,
entry.MessageId,
entry.Payload,
cancellationToken
);
await outboxStore.MarkSentAsync(entry.MessageId, cancellationToken);
logger.LogInformation(
"Outbox 发布重试成功: MessageId={MessageId}",
entry.MessageId
);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
failureCount++;
logger.LogError(ex, "Outbox 发布重试失败: MessageId={MessageId}", entry.MessageId);
await TryMarkPublishFailedAsync(entry.MessageId, ex.Message, cancellationToken);
}
}
return new RetryCount(entries.Count, failureCount);
}
private async Task<RetryCount> RetryConsumeAsync(
int batchSize,
CancellationToken cancellationToken
)
{
var entries = await outboxStore.GetPendingConsumeRetryAsync(batchSize, cancellationToken);
if (entries.Count == 0)
{
return RetryCount.Empty;
}
logger.LogInformation("Outbox 消费重试:共 {Count} 条", entries.Count);
var failureCount = 0;
foreach (var entry in entries)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
// 消费失败的消息重新发布到原队列,由对应消费者按正常流程再次处理。
await retryPublisher.PublishRawAsync(
entry.Exchange,
entry.RoutingKey,
entry.MessageId,
entry.Payload,
cancellationToken
);
await outboxStore.MarkSentAsync(entry.MessageId, cancellationToken);
logger.LogInformation(
"Outbox 消费重试:消息已重新入队: MessageId={MessageId}",
entry.MessageId
);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
failureCount++;
logger.LogError(ex, "Outbox 消费重试失败: MessageId={MessageId}", entry.MessageId);
await TryMarkConsumeFailedAsync(entry.MessageId, ex.Message, cancellationToken);
}
}
return new RetryCount(entries.Count, failureCount);
}
private async Task TryMarkPublishFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken
)
{
try
{
await outboxStore.MarkPublishFailedAsync(messageId, error, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "更新Outbox发布失败状态失败: MessageId={MessageId}", messageId);
}
}
private async Task TryMarkConsumeFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken
)
{
try
{
await outboxStore.MarkConsumeFailedAsync(messageId, error, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "更新Outbox消费失败状态失败: MessageId={MessageId}", messageId);
}
}
private sealed record RetryCount(int TotalCount, int FailureCount)
{
public static RetryCount Empty { get; } = new(0, 0);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个实现 Outbox 模式 的消息重试服务,用于处理消息队列中失败的消息重发。让我详细解释这个代码:
核心功能
这个 MessageOutboxRetryService 类负责从 Outbox 存储中读取失败的消息记录,并重新投递这些消息。它处理两种类型的失败场景:
- 发布失败:消息发送到队列时失败
- 消费失败:消息被消费者处理时失败
主要方法解析
1. RetryAsync - 主入口方法
public async Task<MessageOutboxRetryResult> RetryAsync(int batchSize, CancellationToken cancellationToken = default)
- 执行一轮完整的重试操作
- 先处理发布失败的消息,再处理消费失败的消息
- 返回统计结果(总数、失败数)
2. RetryPublishAsync - 重试发布失败的消息
private async Task<RetryCount> RetryPublishAsync(int batchSize, CancellationToken cancellationToken)
- 从 Outbox 存储获取待重试的发布失败消息
- 使用原始发布器重新发送消息
- 成功后标记为已发送,失败则记录错误状态
3. RetryConsumeAsync - 重试消费失败的消息
private async Task<RetryCount> RetryConsumeAsync(int batchSize, CancellationToken cancellationToken)
- 获取消费失败的消息
- 将消息重新发布到原队列,让消费者重新处理
- 处理逻辑与发布重试类似
设计特点
1. 批量处理
- 支持分批处理,通过
batchSize参数控制每次处理的消息数量 - 避免一次性处理大量消息导致的性能问题
2. 异常处理
- 区分取消异常和其他异常
- 对于普通异常,记录日志并更新失败状态
- 确保单个消息失败不影响整批处理
3. 状态管理
- 成功处理后调用
MarkSentAsync标记为已发送 - 失败时调用相应的标记方法记录错误信息
- 使用
Try前缀的方法确保状态更新失败不会抛出异常
4. 日志记录
- 详细记录重试过程和结果
- 包括批次信息、成功/失败的消息ID等
Outbox 模式的价值
这个实现体现了 Outbox 模式 的核心价值:
- 可靠性:确保消息最终能够被处理,即使出现临时故障
- 可观测性:通过日志和状态记录,可以追踪消息的处理情况
- 容错性:单个消息失败不影响其他消息的处理
- 一致性:通过持久化存储确保消息不会丢失
这是一个典型的企业级消息队列可靠性保障组件,特别适用于需要高可靠性的分布式系统场景。
评论加载中...