using Dpz.Core.MessageQueue.RabbitMQ;
using Medallion.Threading.FileSystem;
using Microsoft.Extensions.Logging.Abstractions;
using ZiggyCreatures.Caching.Fusion;
namespace Dpz.Core.MessageQueue.Test.RabbitMQ;
public class DistributedBatchTrackerTests
{
[Fact]
public async Task RecordMessageConsumedAsync_ShouldCountUniqueMessagesOnly()
{
var tracker = CreateTracker(out var tempDir);
var batchId = $"batch-{Guid.NewGuid():N}";
try
{
await tracker.RecordMessageConsumedAsync(batchId, "msg-1");
await tracker.RecordMessageConsumedAsync(batchId, "msg-1");
await tracker.RecordMessageConsumedAsync(batchId, "msg-2");
var progress = await tracker.GetBatchProgressAsync(batchId);
Assert.Equal(2, progress);
}
finally
{
CleanupTempDir(tempDir);
}
}
[Fact]
public async Task IsBatchCompletedAsync_ShouldReturnExpectedState()
{
var tracker = CreateTracker(out var tempDir);
var batchId = $"batch-{Guid.NewGuid():N}";
try
{
await tracker.RecordMessageConsumedAsync(batchId, "msg-1");
Assert.False(await tracker.IsBatchCompletedAsync(batchId, 2));
Assert.True(await tracker.IsBatchCompletedAsync(batchId, 1));
}
finally
{
CleanupTempDir(tempDir);
}
}
[Fact]
public async Task CleanupBatchAsync_ShouldResetProgress()
{
var tracker = CreateTracker(out var tempDir);
var batchId = $"batch-{Guid.NewGuid():N}";
try
{
await tracker.RecordMessageConsumedAsync(batchId, "msg-1");
Assert.Equal(1, await tracker.GetBatchProgressAsync(batchId));
await tracker.CleanupBatchAsync(batchId);
Assert.Equal(0, await tracker.GetBatchProgressAsync(batchId));
}
finally
{
CleanupTempDir(tempDir);
}
}
private static DistributedBatchTracker CreateTracker(out string tempDir)
{
var cache = new FusionCache(new FusionCacheOptions());
tempDir = Path.Combine(
Path.GetTempPath(),
"dpz.core.mq.test",
Guid.NewGuid().ToString("N")
);
Directory.CreateDirectory(tempDir);
var lockProvider = new FileDistributedSynchronizationProvider(new DirectoryInfo(tempDir));
return new DistributedBatchTracker(
cache,
lockProvider,
NullLogger<DistributedBatchTracker>.Instance
);
}
private static void CleanupTempDir(string tempDir)
{
if (Directory.Exists(tempDir))
{
Directory.Delete(tempDir, true);
}
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于测试 DistributedBatchTracker 类的单元测试文件。让我详细解释一下代码的结构和功能:
整体概述
这个测试类 DistributedBatchTrackerTests 专门用于测试分布式批处理跟踪器(DistributedBatchTracker)的核心功能,该跟踪器主要用于在分布式消息队列环境中跟踪消息批次的处理进度。
依赖项说明
// 自定义的 RabbitMQ 消息队列核心库
using Dpz.Core.MessageQueue.RabbitMQ;
// 基于文件系统的分布式锁
using Medallion.Threading.FileSystem;
// 空日志记录器(用于测试)
using Microsoft.Extensions.Logging.Abstractions;
// 融合缓存库
using ZiggyCreatures.Caching.Fusion;
测试方法详解
1. RecordMessageConsumedAsync_ShouldCountUniqueMessagesOnly()
目的: 验证消息消费记录功能只统计唯一消息
// 记录同一消息ID两次
await tracker.RecordMessageConsumedAsync(batchId, "msg-1");
await tracker.RecordMessageConsumedAsync(batchId, "msg-1"); // 重复
await tracker.RecordMessageConsumedAsync(batchId, "msg-2");
预期结果: 即使重复记录相同消息,进度计数器也只会统计唯一消息(期望值为2,不是3)
2. IsBatchCompletedAsync_ShouldReturnExpectedState()
目的: 验证批次完成状态检查功能
- 当已处理消息数(1) < 总消息数(2)时,返回
false - 当已处理消息数(1) = 总消息数(1)时,返回
true
3. CleanupBatchAsync_ShouldResetProgress()
目的: 验证批次清理功能
测试清理操作是否能正确重置批次进度(从1重置为0)
辅助方法
CreateTracker() 方法
创建测试用的 DistributedBatchTracker 实例:
// 创建融合缓存
var cache = new FusionCache(new FusionCacheOptions());
// 创建临时目录用于文件锁
tempDir = Path.Combine(Path.GetTempPath(), "dpz.core.mq.test", Guid.NewGuid().ToString("N"));
// 创建基于文件系统的分布式锁提供者
var lockProvider = new FileDistributedSynchronizationProvider(new DirectoryInfo(tempDir));
CleanupTempDir() 方法
测试完成后清理临时目录,确保测试环境的整洁。
设计模式和最佳实践
- 资源管理: 使用 try-finally 块确保临时目录在测试完成后被清理
- 隔离性: 每个测试使用独立的批次ID(GUID)和临时目录
- 依赖注入: 通过构造函数注入缓存、锁提供者和日志记录器
- 测试命名: 使用描述性的测试方法名,清楚表达测试意图
核心功能推断
从测试代码可以推断出 DistributedBatchTracker 的主要功能:
- 跟踪分布式环境中消息批次的处理进度
- 防止重复消息被多次计数
- 支持检查批次完成状态
- 提供批次数据清理功能
- 使用缓存和分布式锁保证数据一致性
这种设计特别适用于需要确保消息幂等性和分布式协调的消息队列场景。
评论加载中...