using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Microsoft.Extensions.Logging;

namespace Dpz.Core.MessageQueue.RabbitMQ;

/// <summary>
/// Outbox 消息补发执行器,负责读取待重试记录并使用原始消息发布器重新投递。
/// </summary>
/// <param name="outboxStore">Outbox 存储接口。</param>
/// <param name="retryPublisher">绕过 Outbox 装饰器的原始消息发布器。</param>
/// <param name="logger">日志记录器。</param>
public sealed class MessageOutboxRetryService(
    IMessageOutboxStore outboxStore,
    IMessageOutboxRetryPublisher retryPublisher,
    ILogger<MessageOutboxRetryService> logger
) : IMessageOutboxRetryService
{
    /// <summary>
    /// 执行一轮 Outbox 补发,先处理发布失败消息,再处理消费失败消息。
    /// </summary>
    /// <param name="batchSize">本轮每类状态最多处理的消息数量。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <returns>本轮补发的处理结果。</returns>
    public async Task<MessageOutboxRetryResult> RetryAsync(
        int batchSize,
        CancellationToken cancellationToken = default
    )
    {
        var publishResult = await RetryPublishAsync(batchSize, cancellationToken);
        var consumeResult = await RetryConsumeAsync(batchSize, cancellationToken);

        return new MessageOutboxRetryResult(
            publishResult.TotalCount,
            publishResult.FailureCount,
            consumeResult.TotalCount,
            consumeResult.FailureCount
        );
    }

    private async Task<RetryCount> RetryPublishAsync(
        int batchSize,
        CancellationToken cancellationToken
    )
    {
        var entries = await outboxStore.GetPendingPublishRetryAsync(batchSize, cancellationToken);
        if (entries.Count == 0)
        {
            return RetryCount.Empty;
        }

        logger.LogInformation("Outbox 发布重试:共 {Count} 条", entries.Count);

        var failureCount = 0;
        foreach (var entry in entries)
        {
            cancellationToken.ThrowIfCancellationRequested();

            try
            {
                // 直接发布已持久化的 JSON 载荷,避免再次经过 Outbox 装饰器生成新记录。
                await retryPublisher.PublishRawAsync(
                    entry.Exchange,
                    entry.RoutingKey,
                    entry.MessageId,
                    entry.Payload,
                    cancellationToken
                );
                await outboxStore.MarkSentAsync(entry.MessageId, cancellationToken);
                logger.LogInformation(
                    "Outbox 发布重试成功: MessageId={MessageId}",
                    entry.MessageId
                );
            }
            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
            {
                throw;
            }
            catch (Exception ex)
            {
                failureCount++;
                logger.LogError(ex, "Outbox 发布重试失败: MessageId={MessageId}", entry.MessageId);
                await TryMarkPublishFailedAsync(entry.MessageId, ex.Message, cancellationToken);
            }
        }

        return new RetryCount(entries.Count, failureCount);
    }

    private async Task<RetryCount> RetryConsumeAsync(
        int batchSize,
        CancellationToken cancellationToken
    )
    {
        var entries = await outboxStore.GetPendingConsumeRetryAsync(batchSize, cancellationToken);
        if (entries.Count == 0)
        {
            return RetryCount.Empty;
        }

        logger.LogInformation("Outbox 消费重试:共 {Count} 条", entries.Count);

        var failureCount = 0;
        foreach (var entry in entries)
        {
            cancellationToken.ThrowIfCancellationRequested();

            try
            {
                // 消费失败的消息重新发布到原队列,由对应消费者按正常流程再次处理。
                await retryPublisher.PublishRawAsync(
                    entry.Exchange,
                    entry.RoutingKey,
                    entry.MessageId,
                    entry.Payload,
                    cancellationToken
                );
                await outboxStore.MarkSentAsync(entry.MessageId, cancellationToken);
                logger.LogInformation(
                    "Outbox 消费重试:消息已重新入队: MessageId={MessageId}",
                    entry.MessageId
                );
            }
            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
            {
                throw;
            }
            catch (Exception ex)
            {
                failureCount++;
                logger.LogError(ex, "Outbox 消费重试失败: MessageId={MessageId}", entry.MessageId);
                await TryMarkConsumeFailedAsync(entry.MessageId, ex.Message, cancellationToken);
            }
        }

        return new RetryCount(entries.Count, failureCount);
    }

    private async Task TryMarkPublishFailedAsync(
        string messageId,
        string error,
        CancellationToken cancellationToken
    )
    {
        try
        {
            await outboxStore.MarkPublishFailedAsync(messageId, error, cancellationToken);
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            throw;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "更新Outbox发布失败状态失败: MessageId={MessageId}", messageId);
        }
    }

    private async Task TryMarkConsumeFailedAsync(
        string messageId,
        string error,
        CancellationToken cancellationToken
    )
    {
        try
        {
            await outboxStore.MarkConsumeFailedAsync(messageId, error, cancellationToken);
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            throw;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "更新Outbox消费失败状态失败: MessageId={MessageId}", messageId);
        }
    }

    private sealed record RetryCount(int TotalCount, int FailureCount)
    {
        public static RetryCount Empty { get; } = new(0, 0);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个实现 Outbox 模式 的消息重试服务,用于处理消息队列中失败的消息重发。让我详细解释这个代码:

核心功能

这个 MessageOutboxRetryService 类负责从 Outbox 存储中读取失败的消息记录,并重新投递这些消息。它处理两种类型的失败场景:

  1. 发布失败:消息发送到队列时失败
  2. 消费失败:消息被消费者处理时失败

主要方法解析

1. RetryAsync - 主入口方法

public async Task<MessageOutboxRetryResult> RetryAsync(int batchSize, CancellationToken cancellationToken = default)
  • 执行一轮完整的重试操作
  • 先处理发布失败的消息,再处理消费失败的消息
  • 返回统计结果(总数、失败数)

2. RetryPublishAsync - 重试发布失败的消息

private async Task<RetryCount> RetryPublishAsync(int batchSize, CancellationToken cancellationToken)
  • 从 Outbox 存储获取待重试的发布失败消息
  • 使用原始发布器重新发送消息
  • 成功后标记为已发送,失败则记录错误状态

3. RetryConsumeAsync - 重试消费失败的消息

private async Task<RetryCount> RetryConsumeAsync(int batchSize, CancellationToken cancellationToken)
  • 获取消费失败的消息
  • 将消息重新发布到原队列,让消费者重新处理
  • 处理逻辑与发布重试类似

设计特点

1. 批量处理

  • 支持分批处理,通过 batchSize 参数控制每次处理的消息数量
  • 避免一次性处理大量消息导致的性能问题

2. 异常处理

  • 区分取消异常和其他异常
  • 对于普通异常,记录日志并更新失败状态
  • 确保单个消息失败不影响整批处理

3. 状态管理

  • 成功处理后调用 MarkSentAsync 标记为已发送
  • 失败时调用相应的标记方法记录错误信息
  • 使用 Try 前缀的方法确保状态更新失败不会抛出异常

4. 日志记录

  • 详细记录重试过程和结果
  • 包括批次信息、成功/失败的消息ID等

Outbox 模式的价值

这个实现体现了 Outbox 模式 的核心价值:

  1. 可靠性:确保消息最终能够被处理,即使出现临时故障
  2. 可观测性:通过日志和状态记录,可以追踪消息的处理情况
  3. 容错性:单个消息失败不影响其他消息的处理
  4. 一致性:通过持久化存储确保消息不会丢失

这是一个典型的企业级消息队列可靠性保障组件,特别适用于需要高可靠性的分布式系统场景。

评论加载中...