using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Medallion.Threading;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using ZiggyCreatures.Caching.Fusion;
namespace Dpz.Core.MessageQueue.RabbitMQ;
/// <summary>
/// Outbox 消息补发后台服务,负责周期性扫描发布失败或消费失败的消息并重新投递。
/// </summary>
/// <param name="retryService">Outbox 补发执行器。</param>
/// <param name="fusionCache">用于记录上次补发完成状态的 FusionCache 实例。</param>
/// <param name="distributedLockProvider">用于多实例互斥补发的分布式锁提供器。</param>
/// <param name="logger">日志记录器。</param>
public sealed class MessageOutboxRetryBackgroundService(
IMessageOutboxRetryService retryService,
IFusionCache fusionCache,
IDistributedLockProvider distributedLockProvider,
ILogger<MessageOutboxRetryBackgroundService> logger
) : BackgroundService
{
private const int BatchSize = 50;
private const string StateCacheKey = "Dpz.Core.MessageQueue.OutboxRetry.LastState";
private const string LockKey = "Dpz.Core.MessageQueue.OutboxRetry.RunLock";
private static readonly TimeSpan RetryInterval = TimeSpan.FromMinutes(1);
private static readonly TimeSpan StateCacheDuration = TimeSpan.FromDays(7);
private static readonly TimeSpan LockWaitTimeout = TimeSpan.FromSeconds(3);
private DateTimeOffset? _lastCompletedAtUtc;
/// <summary>
/// 启动 Outbox 补发循环。
/// </summary>
/// <param name="stoppingToken">宿主停止信号。</param>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await LogLastRetryStateAsync(stoppingToken);
logger.LogInformation("Outbox 重试后台服务已启动");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var delay = await GetDelayBeforeNextRunAsync(stoppingToken);
if (delay > TimeSpan.Zero)
{
await DelayAsync(delay, stoppingToken);
continue;
}
await RunOnceAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
logger.LogError(ex, "Outbox 重试后台服务执行异常");
await DelayAsync(RetryInterval, stoppingToken);
}
}
logger.LogInformation("Outbox 重试后台服务已停止");
}
private async Task RunOnceAsync(CancellationToken cancellationToken)
{
await using var lockHandle = await TryAcquireRetryLockAsync(cancellationToken);
if (lockHandle == null)
{
logger.LogInformation("未获取到Outbox重试锁,跳过本轮补发");
return;
}
// 获取锁后再次检查时间,避免多实例在锁外使用同一份过期的缓存状态。
var delay = await GetDelayBeforeNextRunAsync(cancellationToken);
if (delay > TimeSpan.Zero)
{
return;
}
var startedAtUtc = DateTimeOffset.UtcNow;
var result = await retryService.RetryAsync(BatchSize, cancellationToken);
if (result.TotalRetryCount == 0)
{
return;
}
var completedAtUtc = DateTimeOffset.UtcNow;
var state = new MessageOutboxRetryState(
startedAtUtc,
completedAtUtc,
result.PublishRetryCount,
result.PublishFailureCount,
result.ConsumeRetryCount,
result.ConsumeFailureCount
);
_lastCompletedAtUtc = completedAtUtc;
await fusionCache.SetAsync(
StateCacheKey,
state,
StateCacheDuration,
token: cancellationToken
);
logger.LogInformation(
"Outbox 重试完成: Publish={PublishCount}, PublishFailed={PublishFailed}, Consume={ConsumeCount}, ConsumeFailed={ConsumeFailed}, CompletedAtUtc={CompletedAtUtc}",
result.PublishRetryCount,
result.PublishFailureCount,
result.ConsumeRetryCount,
result.ConsumeFailureCount,
completedAtUtc
);
}
private async Task<IDistributedSynchronizationHandle?> TryAcquireRetryLockAsync(
CancellationToken cancellationToken
)
{
using var lockCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken
);
lockCancellationTokenSource.CancelAfter(LockWaitTimeout);
try
{
return await distributedLockProvider.AcquireLockAsync(
LockKey,
timeout: null,
cancellationToken: lockCancellationTokenSource.Token
);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return null;
}
}
private async Task<TimeSpan> GetDelayBeforeNextRunAsync(CancellationToken cancellationToken)
{
var lastCompletedAtUtc = await GetLastCompletedAtUtcAsync(cancellationToken);
if (!lastCompletedAtUtc.HasValue)
{
return TimeSpan.Zero;
}
var nextRunAtUtc = lastCompletedAtUtc.Value.Add(RetryInterval);
var delay = nextRunAtUtc - DateTimeOffset.UtcNow;
return delay > TimeSpan.Zero ? delay : TimeSpan.Zero;
}
private async Task<DateTimeOffset?> GetLastCompletedAtUtcAsync(
CancellationToken cancellationToken
)
{
try
{
var cache = await fusionCache.TryGetAsync<MessageOutboxRetryState>(
StateCacheKey,
token: cancellationToken
);
return cache.HasValue ? cache.Value.CompletedAtUtc : _lastCompletedAtUtc;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
logger.LogWarning(ex, "读取Outbox重试缓存状态失败");
return _lastCompletedAtUtc;
}
}
private async Task LogLastRetryStateAsync(CancellationToken cancellationToken)
{
try
{
var cache = await fusionCache.TryGetAsync<MessageOutboxRetryState>(
StateCacheKey,
token: cancellationToken
);
if (!cache.HasValue)
{
return;
}
_lastCompletedAtUtc = cache.Value.CompletedAtUtc;
logger.LogInformation(
"Outbox 上次重试完成时间: CompletedAtUtc={CompletedAtUtc}, Publish={PublishCount}, Consume={ConsumeCount}",
cache.Value.CompletedAtUtc,
cache.Value.PublishRetryCount,
cache.Value.ConsumeRetryCount
);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
logger.LogWarning(ex, "读取Outbox上次重试状态失败");
}
}
private static async Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken)
{
if (delay <= TimeSpan.Zero)
{
return;
}
await Task.Delay(delay, cancellationToken);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个实现了 Outbox 模式 的消息重试后台服务,主要用于处理消息队列中发布失败或消费失败的消息补发机制。让我详细解释一下这个代码:
核心功能
这个服务继承自 BackgroundService,作为一个后台服务运行,负责:
- 周期性扫描发布失败或消费失败的消息
- 重新投递这些失败的消息
- 记录重试状态并支持多实例部署
关键设计特点
1. 构造函数依赖注入
public sealed class MessageOutboxRetryBackgroundService(
IMessageOutboxRetryService retryService, // 执行具体重试逻辑
IFusionCache fusionCache, // 缓存重试状态
IDistributedLockProvider distributedLockProvider, // 分布式锁
ILogger<MessageOutboxRetryBackgroundService> logger
)
2. 重要常量配置
private const int BatchSize = 50; // 每次处理50条消息
private const string StateCacheKey = "...OutboxRetry.LastState"; // 缓存键
private const string LockKey = "...OutboxRetry.RunLock"; // 分布式锁键
private static readonly TimeSpan RetryInterval = TimeSpan.FromMinutes(1); // 重试间隔1分钟
核心执行流程
1. 主循环 (ExecuteAsync)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 1. 记录上次重试状态
await LogLastRetryStateAsync(stoppingToken);
// 2. 进入无限循环
while (!stoppingToken.IsCancellationRequested)
{
// 3. 计算是否需要延迟
var delay = await GetDelayBeforeNextRunAsync(stoppingToken);
if (delay > TimeSpan.Zero)
{
await DelayAsync(delay, stoppingToken);
continue;
}
// 4. 执行一次重试
await RunOnceAsync(stoppingToken);
}
}
2. 单次执行逻辑 (RunOnceAsync)
private async Task RunOnceAsync(CancellationToken cancellationToken)
{
// 1. 尝试获取分布式锁(防止多实例重复执行)
await using var lockHandle = await TryAcquireRetryLockAsync(cancellationToken);
if (lockHandle == null) return; // 未获取到锁就跳过
// 2. 获取锁后再次检查时间
var delay = await GetDelayBeforeNextRunAsync(cancellationToken);
if (delay > TimeSpan.Zero) return;
// 3. 执行重试并记录结果
var startedAtUtc = DateTimeOffset.UtcNow;
var result = await retryService.RetryAsync(BatchSize, cancellationToken);
var completedAtUtc = DateTimeOffset.UtcNow;
// 4. 保存状态到缓存
var state = new MessageOutboxRetryState(...);
await fusionCache.SetAsync(StateCacheKey, state, StateCacheDuration, token: cancellationToken);
}
关键技术实现
1. 分布式锁机制
private async Task<IDistributedSynchronizationHandle?> TryAcquireRetryLockAsync(...)
{
// 使用超时机制,避免无限等待
lockCancellationTokenSource.CancelAfter(LockWaitTimeout);
return await distributedLockProvider.AcquireLockAsync(
LockKey,
timeout: null,
cancellationToken: lockCancellationTokenSource.Token
);
}
2. 智能延迟计算
private async Task<TimeSpan> GetDelayBeforeNextRunAsync(...)
{
// 获取上次完成时间
var lastCompletedAtUtc = await GetLastCompletedAtUtcAsync(cancellationToken);
// 计算下次执行时间
var nextRunAtUtc = lastCompletedAtUtc.Value.Add(RetryInterval);
var delay = nextRunAtUtc - DateTimeOffset.UtcNow;
return delay > TimeSpan.Zero ? delay : TimeSpan.Zero;
}
3. 状态持久化
使用 FusionCache 来缓存重试状态,支持:
- 跨应用重启的状态恢复
- 多实例间的状态同步
- 7天的缓存持续时间
设计优势
- 高可用性: 通过分布式锁确保多实例部署时只有一个实例执行重试
- 状态持久化: 重试状态缓存到外部存储,支持应用重启后恢复
- 错误恢复: 完善的异常处理机制,确保服务稳定运行
- 可观测性: 详细的日志记录,便于监控和排查问题
- 性能优化: 批处理机制和智能延迟,避免频繁无效执行
这是一个设计良好的企业级消息重试服务,充分考虑了分布式环境下的各种复杂场景。
评论加载中...