using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Medallion.Threading;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using ZiggyCreatures.Caching.Fusion;

namespace Dpz.Core.MessageQueue.RabbitMQ;

/// <summary>
/// Outbox 消息补发后台服务,负责周期性扫描发布失败或消费失败的消息并重新投递。
/// </summary>
/// <param name="retryService">Outbox 补发执行器。</param>
/// <param name="fusionCache">用于记录上次补发完成状态的 FusionCache 实例。</param>
/// <param name="distributedLockProvider">用于多实例互斥补发的分布式锁提供器。</param>
/// <param name="logger">日志记录器。</param>
public sealed class MessageOutboxRetryBackgroundService(
    IMessageOutboxRetryService retryService,
    IFusionCache fusionCache,
    IDistributedLockProvider distributedLockProvider,
    ILogger<MessageOutboxRetryBackgroundService> logger
) : BackgroundService
{
    private const int BatchSize = 50;
    private const string StateCacheKey = "Dpz.Core.MessageQueue.OutboxRetry.LastState";
    private const string LockKey = "Dpz.Core.MessageQueue.OutboxRetry.RunLock";
    private static readonly TimeSpan RetryInterval = TimeSpan.FromMinutes(1);
    private static readonly TimeSpan StateCacheDuration = TimeSpan.FromDays(7);
    private static readonly TimeSpan LockWaitTimeout = TimeSpan.FromSeconds(3);

    private DateTimeOffset? _lastCompletedAtUtc;

    /// <summary>
    /// 启动 Outbox 补发循环。
    /// </summary>
    /// <param name="stoppingToken">宿主停止信号。</param>
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await LogLastRetryStateAsync(stoppingToken);
        logger.LogInformation("Outbox 重试后台服务已启动");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var delay = await GetDelayBeforeNextRunAsync(stoppingToken);
                if (delay > TimeSpan.Zero)
                {
                    await DelayAsync(delay, stoppingToken);
                    continue;
                }

                await RunOnceAsync(stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                break;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Outbox 重试后台服务执行异常");
                await DelayAsync(RetryInterval, stoppingToken);
            }
        }

        logger.LogInformation("Outbox 重试后台服务已停止");
    }

    private async Task RunOnceAsync(CancellationToken cancellationToken)
    {
        await using var lockHandle = await TryAcquireRetryLockAsync(cancellationToken);
        if (lockHandle == null)
        {
            logger.LogInformation("未获取到Outbox重试锁,跳过本轮补发");
            return;
        }

        // 获取锁后再次检查时间,避免多实例在锁外使用同一份过期的缓存状态。
        var delay = await GetDelayBeforeNextRunAsync(cancellationToken);
        if (delay > TimeSpan.Zero)
        {
            return;
        }

        var startedAtUtc = DateTimeOffset.UtcNow;
        var result = await retryService.RetryAsync(BatchSize, cancellationToken);
        if (result.TotalRetryCount == 0)
        {
            return;
        }
        var completedAtUtc = DateTimeOffset.UtcNow;

        var state = new MessageOutboxRetryState(
            startedAtUtc,
            completedAtUtc,
            result.PublishRetryCount,
            result.PublishFailureCount,
            result.ConsumeRetryCount,
            result.ConsumeFailureCount
        );

        _lastCompletedAtUtc = completedAtUtc;

        await fusionCache.SetAsync(
            StateCacheKey,
            state,
            StateCacheDuration,
            token: cancellationToken
        );

        logger.LogInformation(
            "Outbox 重试完成: Publish={PublishCount}, PublishFailed={PublishFailed}, Consume={ConsumeCount}, ConsumeFailed={ConsumeFailed}, CompletedAtUtc={CompletedAtUtc}",
            result.PublishRetryCount,
            result.PublishFailureCount,
            result.ConsumeRetryCount,
            result.ConsumeFailureCount,
            completedAtUtc
        );
    }

    private async Task<IDistributedSynchronizationHandle?> TryAcquireRetryLockAsync(
        CancellationToken cancellationToken
    )
    {
        using var lockCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken
        );
        lockCancellationTokenSource.CancelAfter(LockWaitTimeout);

        try
        {
            return await distributedLockProvider.AcquireLockAsync(
                LockKey,
                timeout: null,
                cancellationToken: lockCancellationTokenSource.Token
            );
        }
        catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
        {
            return null;
        }
    }

    private async Task<TimeSpan> GetDelayBeforeNextRunAsync(CancellationToken cancellationToken)
    {
        var lastCompletedAtUtc = await GetLastCompletedAtUtcAsync(cancellationToken);
        if (!lastCompletedAtUtc.HasValue)
        {
            return TimeSpan.Zero;
        }

        var nextRunAtUtc = lastCompletedAtUtc.Value.Add(RetryInterval);
        var delay = nextRunAtUtc - DateTimeOffset.UtcNow;
        return delay > TimeSpan.Zero ? delay : TimeSpan.Zero;
    }

    private async Task<DateTimeOffset?> GetLastCompletedAtUtcAsync(
        CancellationToken cancellationToken
    )
    {
        try
        {
            var cache = await fusionCache.TryGetAsync<MessageOutboxRetryState>(
                StateCacheKey,
                token: cancellationToken
            );
            return cache.HasValue ? cache.Value.CompletedAtUtc : _lastCompletedAtUtc;
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            throw;
        }
        catch (Exception ex)
        {
            logger.LogWarning(ex, "读取Outbox重试缓存状态失败");
            return _lastCompletedAtUtc;
        }
    }

    private async Task LogLastRetryStateAsync(CancellationToken cancellationToken)
    {
        try
        {
            var cache = await fusionCache.TryGetAsync<MessageOutboxRetryState>(
                StateCacheKey,
                token: cancellationToken
            );
            if (!cache.HasValue)
            {
                return;
            }

            _lastCompletedAtUtc = cache.Value.CompletedAtUtc;
            logger.LogInformation(
                "Outbox 上次重试完成时间: CompletedAtUtc={CompletedAtUtc}, Publish={PublishCount}, Consume={ConsumeCount}",
                cache.Value.CompletedAtUtc,
                cache.Value.PublishRetryCount,
                cache.Value.ConsumeRetryCount
            );
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            throw;
        }
        catch (Exception ex)
        {
            logger.LogWarning(ex, "读取Outbox上次重试状态失败");
        }
    }

    private static async Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken)
    {
        if (delay <= TimeSpan.Zero)
        {
            return;
        }

        await Task.Delay(delay, cancellationToken);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个实现了 Outbox 模式 的消息重试后台服务,主要用于处理消息队列中发布失败或消费失败的消息补发机制。让我详细解释一下这个代码:

核心功能

这个服务继承自 BackgroundService,作为一个后台服务运行,负责:

  1. 周期性扫描发布失败或消费失败的消息
  2. 重新投递这些失败的消息
  3. 记录重试状态并支持多实例部署

关键设计特点

1. 构造函数依赖注入

public sealed class MessageOutboxRetryBackgroundService(
    IMessageOutboxRetryService retryService,      // 执行具体重试逻辑
    IFusionCache fusionCache,                     // 缓存重试状态
    IDistributedLockProvider distributedLockProvider, // 分布式锁
    ILogger<MessageOutboxRetryBackgroundService> logger
)

2. 重要常量配置

private const int BatchSize = 50;                          // 每次处理50条消息
private const string StateCacheKey = "...OutboxRetry.LastState";  // 缓存键
private const string LockKey = "...OutboxRetry.RunLock";          // 分布式锁键
private static readonly TimeSpan RetryInterval = TimeSpan.FromMinutes(1); // 重试间隔1分钟

核心执行流程

1. 主循环 (ExecuteAsync)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    // 1. 记录上次重试状态
    await LogLastRetryStateAsync(stoppingToken);
    
    // 2. 进入无限循环
    while (!stoppingToken.IsCancellationRequested)
    {
        // 3. 计算是否需要延迟
        var delay = await GetDelayBeforeNextRunAsync(stoppingToken);
        if (delay > TimeSpan.Zero)
        {
            await DelayAsync(delay, stoppingToken);
            continue;
        }

        // 4. 执行一次重试
        await RunOnceAsync(stoppingToken);
    }
}

2. 单次执行逻辑 (RunOnceAsync)

private async Task RunOnceAsync(CancellationToken cancellationToken)
{
    // 1. 尝试获取分布式锁(防止多实例重复执行)
    await using var lockHandle = await TryAcquireRetryLockAsync(cancellationToken);
    if (lockHandle == null) return; // 未获取到锁就跳过

    // 2. 获取锁后再次检查时间
    var delay = await GetDelayBeforeNextRunAsync(cancellationToken);
    if (delay > TimeSpan.Zero) return;

    // 3. 执行重试并记录结果
    var startedAtUtc = DateTimeOffset.UtcNow;
    var result = await retryService.RetryAsync(BatchSize, cancellationToken);
    var completedAtUtc = DateTimeOffset.UtcNow;

    // 4. 保存状态到缓存
    var state = new MessageOutboxRetryState(...);
    await fusionCache.SetAsync(StateCacheKey, state, StateCacheDuration, token: cancellationToken);
}

关键技术实现

1. 分布式锁机制

private async Task<IDistributedSynchronizationHandle?> TryAcquireRetryLockAsync(...)
{
    // 使用超时机制,避免无限等待
    lockCancellationTokenSource.CancelAfter(LockWaitTimeout);
    
    return await distributedLockProvider.AcquireLockAsync(
        LockKey,
        timeout: null,
        cancellationToken: lockCancellationTokenSource.Token
    );
}

2. 智能延迟计算

private async Task<TimeSpan> GetDelayBeforeNextRunAsync(...)
{
    // 获取上次完成时间
    var lastCompletedAtUtc = await GetLastCompletedAtUtcAsync(cancellationToken);
    
    // 计算下次执行时间
    var nextRunAtUtc = lastCompletedAtUtc.Value.Add(RetryInterval);
    var delay = nextRunAtUtc - DateTimeOffset.UtcNow;
    
    return delay > TimeSpan.Zero ? delay : TimeSpan.Zero;
}

3. 状态持久化

使用 FusionCache 来缓存重试状态,支持:

  • 跨应用重启的状态恢复
  • 多实例间的状态同步
  • 7天的缓存持续时间

设计优势

  1. 高可用性: 通过分布式锁确保多实例部署时只有一个实例执行重试
  2. 状态持久化: 重试状态缓存到外部存储,支持应用重启后恢复
  3. 错误恢复: 完善的异常处理机制,确保服务稳定运行
  4. 可观测性: 详细的日志记录,便于监控和排查问题
  5. 性能优化: 批处理机制和智能延迟,避免频繁无效执行

这是一个设计良好的企业级消息重试服务,充分考虑了分布式环境下的各种复杂场景。

评论加载中...