using Dpz.Core.MessageQueue.Abstractions;
using Medallion.Threading;
using Microsoft.Extensions.Logging;
using ZiggyCreatures.Caching.Fusion;

namespace Dpz.Core.MessageQueue.RabbitMQ;

/// <summary>
/// 基于分布式缓存的批次追踪器
/// </summary>
public class DistributedBatchTracker(
    IFusionCache fusionCache,
    IDistributedLockProvider lockProvider,
    ILogger<DistributedBatchTracker> logger
) : IBatchTracker
{
    private readonly TimeSpan _expirationTime = TimeSpan.FromHours(24);

    /// <summary>
    /// 锁持有超时时间
    /// </summary>
    private readonly TimeSpan _lockTimeout = TimeSpan.FromSeconds(5);

    /// <summary>
    /// 尝试获取锁的超时时间
    /// </summary>
    private readonly TimeSpan _tryAcquireLockTimeout = TimeSpan.FromMilliseconds(100);

    /// <summary>
    /// 获取锁失败后的最大重试次数
    /// </summary>
    private const int MaxRetries = 5;

    /// <summary>
    /// 重试基础延迟
    /// </summary>
    private readonly TimeSpan _retryBaseDelay = TimeSpan.FromMilliseconds(50);

    public async Task RecordMessageConsumedAsync(string batchId, string messageId)
    {
        var lockKey = GetBatchLockKey(batchId);
        var retryCount = 0;

        while (true)
        {
            // 尝试快速获取锁,不要长时间阻塞
            var @lock = await TryAcquireLockAsync(lockKey);

            if (@lock == null)
            {
                // 获取锁失败,指数退避后重试
                retryCount++;
                if (retryCount > MaxRetries)
                {
                    throw new TimeoutException(
                        $"获取批次锁超时: BatchId={batchId}, MessageId={messageId}, Retries={retryCount}"
                    );
                }

                // 指数退避:50ms, 100ms, 200ms, 400ms, 800ms
                var delay = TimeSpan.FromMilliseconds(
                    _retryBaseDelay.TotalMilliseconds * Math.Pow(2, retryCount - 1)
                );
                logger.LogDebug(
                    "获取批次锁失败,等待{Delay}ms后重试 ({Retry}/{MaxRetries}): BatchId={BatchId}",
                    delay.TotalMilliseconds,
                    retryCount,
                    MaxRetries,
                    batchId
                );
                await Task.Delay(delay);
                continue;
            }

            // 成功获取锁,执行操作
            try
            {
                await using (@lock)
                {
                    var key = GetBatchKey(batchId);
                    var messagesKey = GetBatchMessagesKey(batchId);

                    // 记录消息ID到集合(用于去重)
                    var cacheValue = await fusionCache.TryGetAsync<HashSet<string>>(messagesKey);
                    var messages = new HashSet<string>();
                    if (cacheValue.HasValue)
                    {
                        messages = cacheValue.Value;
                    }

                    // 检查是否已经记录过(防止重复消费导致计数错误)
                    if (!messages.Add(messageId))
                    {
                        logger.LogWarning(
                            "消息已被记录过: BatchId={BatchId}, MessageId={MessageId}",
                            batchId,
                            messageId
                        );
                        return;
                    }

                    // 更新计数
                    var newCount = messages.Count;
                    await fusionCache.SetAsync(
                        key,
                        newCount,
                        options => options.SetDuration(_expirationTime)
                    );

                    // 更新消息集合
                    await fusionCache.SetAsync(
                        messagesKey,
                        messages,
                        options => options.SetDuration(_expirationTime)
                    );

                    // 只有在重试或达到里程碑时才记录日志,避免日志轰炸
                    var shouldLog =
                        retryCount > 0 || newCount % 10 == 0 || newCount == 1 || newCount >= 100;
                    if (shouldLog)
                    {
                        logger.LogInformation(
                            "已记录消息消费: BatchId={BatchId}, TotalConsumed={Count}, RetryCount={RetryCount}",
                            batchId,
                            newCount,
                            retryCount
                        );
                    }

                    // 成功后直接返回
                    return;
                }
            }
            catch (Exception ex)
            {
                logger.LogError(
                    ex,
                    "记录消息消费失败: BatchId={BatchId}, MessageId={MessageId}",
                    batchId,
                    messageId
                );
                throw;
            }
        }
    }

    /// <summary>
    /// 尝试获取分布式锁,快速失败
    /// </summary>
    private async Task<IDistributedSynchronizationHandle?> TryAcquireLockAsync(string lockKey)
    {
        using var cts = new CancellationTokenSource();
        cts.CancelAfter(_tryAcquireLockTimeout);

        try
        {
            return await lockProvider.AcquireLockAsync(
                lockKey,
                timeout: _lockTimeout,
                cancellationToken: cts.Token
            );
        }
        catch (OperationCanceledException)
        {
            // 超时
            return null;
        }
    }

    public async Task<bool> IsBatchCompletedAsync(string batchId, int expectedTotal)
    {
        try
        {
            var progress = await GetBatchProgressAsync(batchId);
            var completed = progress >= expectedTotal;

            if (completed)
            {
                logger.LogInformation(
                    "批次已完成: BatchId={BatchId}, Progress={Progress}/{Expected}",
                    batchId,
                    progress,
                    expectedTotal
                );
            }

            return completed;
        }
        catch (Exception ex)
        {
            logger.LogError(
                ex,
                "检查批次完成状态失败: BatchId={BatchId}, ExpectedTotal={ExpectedTotal}",
                batchId,
                expectedTotal
            );
            return false;
        }
    }

    public async Task<int> GetBatchProgressAsync(string batchId)
    {
        try
        {
            var key = GetBatchKey(batchId);
            var cacheResult = await fusionCache.TryGetAsync<int>(key);
            var count = cacheResult.HasValue ? cacheResult.Value : 0;
            return count;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "获取批次进度失败: BatchId={BatchId}", batchId);
            return 0;
        }
    }

    public async Task CleanupBatchAsync(string batchId)
    {
        try
        {
            await fusionCache.RemoveAsync(GetBatchKey(batchId));
            await fusionCache.RemoveAsync(GetBatchMessagesKey(batchId));

            logger.LogInformation("批次记录已清理: BatchId={BatchId}", batchId);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "清理批次记录失败: BatchId={BatchId}", batchId);
            throw;
        }
    }

    private static string GetBatchKey(string batchId) => $"batch:progress:{batchId}";

    private static string GetBatchMessagesKey(string batchId) => $"batch:messages:{batchId}";

    private static string GetBatchLockKey(string batchId) => $"batch:lock:{batchId}";
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于分布式缓存的批次追踪器实现,用于在消息队列系统中跟踪批次消息的消费进度。让我详细解释一下这个代码:

类的职责和用途

DistributedBatchTracker 实现了 IBatchTracker 接口,主要用于:

  • 跟踪批次中每个消息的消费状态
  • 防止消息重复计数
  • 检查批次是否完成
  • 提供分布式环境下的线程安全保证

核心依赖

public class DistributedBatchTracker(
    IFusionCache fusionCache,           // 分布式缓存,用于存储数据
    IDistributedLockProvider lockProvider, // 分布式锁提供者,保证并发安全
    ILogger<DistributedBatchTracker> logger // 日志记录器
)

关键配置参数

  • _expirationTime: 24小时 - 缓存数据过期时间
  • _lockTimeout: 5秒 - 锁的持有超时时间
  • _tryAcquireLockTimeout: 100毫秒 - 尝试获取锁的超时时间
  • MaxRetries: 5次 - 获取锁失败的最大重试次数
  • _retryBaseDelay: 50毫秒 - 重试的基础延迟时间

核心方法分析

1. RecordMessageConsumedAsync - 记录消息消费

这是最复杂的方法,实现了以下功能:

分布式锁机制

var @lock = await TryAcquireLockAsync(lockKey);
  • 使用快速失败策略获取锁
  • 实现指数退避重试机制(50ms → 100ms → 200ms → 400ms → 800ms)

防重复消费

var messages = new HashSet<string>(); // 存储已消费的消息ID
if (!messages.Add(messageId)) {
    // 消息已被处理过,直接返回
}

原子性更新

// 同时更新计数和消息集合
await fusionCache.SetAsync(key, newCount, ...);
await fusionCache.SetAsync(messagesKey, messages, ...);

2. IsBatchCompletedAsync - 检查批次完成状态

var progress = await GetBatchProgressAsync(batchId);
return progress >= expectedTotal;

3. GetBatchProgressAsync - 获取批次进度

从缓存中读取当前已消费的消息数量。

4. CleanupBatchAsync - 清理批次数据

删除批次相关的所有缓存数据。

缓存键设计

private static string GetBatchKey(string batchId) => $"batch:progress:{batchId}";      // 存储消费计数
private static string GetBatchMessagesKey(string batchId) => $"batch:messages:{batchId}"; // 存储消息ID集合
private static string GetBatchLockKey(string batchId) => $"batch:lock:{batchId}";     // 分布式锁键

设计亮点

  1. 并发安全:使用分布式锁确保多个消费者同时处理同一批次时的数据一致性

  2. 防重复处理:通过 HashSet 存储已处理的消息ID,避免重复计数

  3. 容错机制

    • 指数退避重试
    • 异常处理和日志记录
    • 快速失败的锁获取策略
  4. 性能优化

    • 短锁持有时间
    • 条件性日志记录(避免日志轰炸)
    • 使用 HashSet 进行 O(1) 去重检查
  5. 可观测性:详细的日志记录,包括重试次数、进度里程碑等

使用场景

这个类特别适用于以下场景:

  • 消息队列的批处理任务
  • 需要确保所有消息都被处理完成的场景
  • 分布式环境下的并发消息处理
  • 需要防止消息重复处理的系统

总的来说,这是一个设计良好的分布式批次追踪器,充分考虑了并发安全、性能和可靠性。

评论加载中...