using Dpz.Core.MessageQueue.Abstractions;
using Medallion.Threading;
using Microsoft.Extensions.Logging;
using ZiggyCreatures.Caching.Fusion;
namespace Dpz.Core.MessageQueue.RabbitMQ;
/// <summary>
/// 基于分布式缓存的批次追踪器
/// </summary>
public class DistributedBatchTracker(
IFusionCache fusionCache,
IDistributedLockProvider lockProvider,
ILogger<DistributedBatchTracker> logger
) : IBatchTracker
{
private readonly TimeSpan _expirationTime = TimeSpan.FromHours(24);
/// <summary>
/// 锁持有超时时间
/// </summary>
private readonly TimeSpan _lockTimeout = TimeSpan.FromSeconds(5);
/// <summary>
/// 尝试获取锁的超时时间
/// </summary>
private readonly TimeSpan _tryAcquireLockTimeout = TimeSpan.FromMilliseconds(100);
/// <summary>
/// 获取锁失败后的最大重试次数
/// </summary>
private const int MaxRetries = 5;
/// <summary>
/// 重试基础延迟
/// </summary>
private readonly TimeSpan _retryBaseDelay = TimeSpan.FromMilliseconds(50);
public async Task RecordMessageConsumedAsync(string batchId, string messageId)
{
var lockKey = GetBatchLockKey(batchId);
var retryCount = 0;
while (true)
{
// 尝试快速获取锁,不要长时间阻塞
var @lock = await TryAcquireLockAsync(lockKey);
if (@lock == null)
{
// 获取锁失败,指数退避后重试
retryCount++;
if (retryCount > MaxRetries)
{
throw new TimeoutException(
$"获取批次锁超时: BatchId={batchId}, MessageId={messageId}, Retries={retryCount}"
);
}
// 指数退避:50ms, 100ms, 200ms, 400ms, 800ms
var delay = TimeSpan.FromMilliseconds(
_retryBaseDelay.TotalMilliseconds * Math.Pow(2, retryCount - 1)
);
logger.LogDebug(
"获取批次锁失败,等待{Delay}ms后重试 ({Retry}/{MaxRetries}): BatchId={BatchId}",
delay.TotalMilliseconds,
retryCount,
MaxRetries,
batchId
);
await Task.Delay(delay);
continue;
}
// 成功获取锁,执行操作
try
{
await using (@lock)
{
var key = GetBatchKey(batchId);
var messagesKey = GetBatchMessagesKey(batchId);
// 记录消息ID到集合(用于去重)
var cacheValue = await fusionCache.TryGetAsync<HashSet<string>>(messagesKey);
var messages = new HashSet<string>();
if (cacheValue.HasValue)
{
messages = cacheValue.Value;
}
// 检查是否已经记录过(防止重复消费导致计数错误)
if (!messages.Add(messageId))
{
logger.LogWarning(
"消息已被记录过: BatchId={BatchId}, MessageId={MessageId}",
batchId,
messageId
);
return;
}
// 更新计数
var newCount = messages.Count;
await fusionCache.SetAsync(
key,
newCount,
options => options.SetDuration(_expirationTime)
);
// 更新消息集合
await fusionCache.SetAsync(
messagesKey,
messages,
options => options.SetDuration(_expirationTime)
);
// 只有在重试或达到里程碑时才记录日志,避免日志轰炸
var shouldLog =
retryCount > 0 || newCount % 10 == 0 || newCount == 1 || newCount >= 100;
if (shouldLog)
{
logger.LogInformation(
"已记录消息消费: BatchId={BatchId}, TotalConsumed={Count}, RetryCount={RetryCount}",
batchId,
newCount,
retryCount
);
}
// 成功后直接返回
return;
}
}
catch (Exception ex)
{
logger.LogError(
ex,
"记录消息消费失败: BatchId={BatchId}, MessageId={MessageId}",
batchId,
messageId
);
throw;
}
}
}
/// <summary>
/// 尝试获取分布式锁,快速失败
/// </summary>
private async Task<IDistributedSynchronizationHandle?> TryAcquireLockAsync(string lockKey)
{
using var cts = new CancellationTokenSource();
cts.CancelAfter(_tryAcquireLockTimeout);
try
{
return await lockProvider.AcquireLockAsync(
lockKey,
timeout: _lockTimeout,
cancellationToken: cts.Token
);
}
catch (OperationCanceledException)
{
// 超时
return null;
}
}
public async Task<bool> IsBatchCompletedAsync(string batchId, int expectedTotal)
{
try
{
var progress = await GetBatchProgressAsync(batchId);
var completed = progress >= expectedTotal;
if (completed)
{
logger.LogInformation(
"批次已完成: BatchId={BatchId}, Progress={Progress}/{Expected}",
batchId,
progress,
expectedTotal
);
}
return completed;
}
catch (Exception ex)
{
logger.LogError(
ex,
"检查批次完成状态失败: BatchId={BatchId}, ExpectedTotal={ExpectedTotal}",
batchId,
expectedTotal
);
return false;
}
}
public async Task<int> GetBatchProgressAsync(string batchId)
{
try
{
var key = GetBatchKey(batchId);
var cacheResult = await fusionCache.TryGetAsync<int>(key);
var count = cacheResult.HasValue ? cacheResult.Value : 0;
return count;
}
catch (Exception ex)
{
logger.LogError(ex, "获取批次进度失败: BatchId={BatchId}", batchId);
return 0;
}
}
public async Task CleanupBatchAsync(string batchId)
{
try
{
await fusionCache.RemoveAsync(GetBatchKey(batchId));
await fusionCache.RemoveAsync(GetBatchMessagesKey(batchId));
logger.LogInformation("批次记录已清理: BatchId={BatchId}", batchId);
}
catch (Exception ex)
{
logger.LogError(ex, "清理批次记录失败: BatchId={BatchId}", batchId);
throw;
}
}
private static string GetBatchKey(string batchId) => $"batch:progress:{batchId}";
private static string GetBatchMessagesKey(string batchId) => $"batch:messages:{batchId}";
private static string GetBatchLockKey(string batchId) => $"batch:lock:{batchId}";
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于分布式缓存的批次追踪器实现,用于在消息队列系统中跟踪批次消息的消费进度。让我详细解释一下这个代码:
类的职责和用途
DistributedBatchTracker 实现了 IBatchTracker 接口,主要用于:
- 跟踪批次中每个消息的消费状态
- 防止消息重复计数
- 检查批次是否完成
- 提供分布式环境下的线程安全保证
核心依赖
public class DistributedBatchTracker(
IFusionCache fusionCache, // 分布式缓存,用于存储数据
IDistributedLockProvider lockProvider, // 分布式锁提供者,保证并发安全
ILogger<DistributedBatchTracker> logger // 日志记录器
)
关键配置参数
_expirationTime: 24小时 - 缓存数据过期时间_lockTimeout: 5秒 - 锁的持有超时时间_tryAcquireLockTimeout: 100毫秒 - 尝试获取锁的超时时间MaxRetries: 5次 - 获取锁失败的最大重试次数_retryBaseDelay: 50毫秒 - 重试的基础延迟时间
核心方法分析
1. RecordMessageConsumedAsync - 记录消息消费
这是最复杂的方法,实现了以下功能:
分布式锁机制:
var @lock = await TryAcquireLockAsync(lockKey);
- 使用快速失败策略获取锁
- 实现指数退避重试机制(50ms → 100ms → 200ms → 400ms → 800ms)
防重复消费:
var messages = new HashSet<string>(); // 存储已消费的消息ID
if (!messages.Add(messageId)) {
// 消息已被处理过,直接返回
}
原子性更新:
// 同时更新计数和消息集合
await fusionCache.SetAsync(key, newCount, ...);
await fusionCache.SetAsync(messagesKey, messages, ...);
2. IsBatchCompletedAsync - 检查批次完成状态
var progress = await GetBatchProgressAsync(batchId);
return progress >= expectedTotal;
3. GetBatchProgressAsync - 获取批次进度
从缓存中读取当前已消费的消息数量。
4. CleanupBatchAsync - 清理批次数据
删除批次相关的所有缓存数据。
缓存键设计
private static string GetBatchKey(string batchId) => $"batch:progress:{batchId}"; // 存储消费计数
private static string GetBatchMessagesKey(string batchId) => $"batch:messages:{batchId}"; // 存储消息ID集合
private static string GetBatchLockKey(string batchId) => $"batch:lock:{batchId}"; // 分布式锁键
设计亮点
并发安全:使用分布式锁确保多个消费者同时处理同一批次时的数据一致性
防重复处理:通过 HashSet 存储已处理的消息ID,避免重复计数
容错机制:
- 指数退避重试
- 异常处理和日志记录
- 快速失败的锁获取策略
性能优化:
- 短锁持有时间
- 条件性日志记录(避免日志轰炸)
- 使用 HashSet 进行 O(1) 去重检查
可观测性:详细的日志记录,包括重试次数、进度里程碑等
使用场景
这个类特别适用于以下场景:
- 消息队列的批处理任务
- 需要确保所有消息都被处理完成的场景
- 分布式环境下的并发消息处理
- 需要防止消息重复处理的系统
总的来说,这是一个设计良好的分布式批次追踪器,充分考虑了并发安全、性能和可靠性。
评论加载中...