using System.Reflection;
using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Medallion.Threading.FileSystem;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using ZiggyCreatures.Caching.Fusion;
using MessageExchangeType = Dpz.Core.MessageQueue.Enums.ExchangeType;
namespace Dzp.Core.XunitBasic;
public class MessageQueueReliabilityTests
{
[Fact]
public async Task DistributedBatchTracker_ShouldKeepAccurateProgress_InMultiInstanceScenario()
{
var cache = CreateDistributedCache();
var lockDirPath = Path.Combine(
Path.GetTempPath(),
"dpz.core.batch-lock",
Guid.NewGuid().ToString("N")
);
Directory.CreateDirectory(lockDirPath);
try
{
var lockProvider = new FileDistributedSynchronizationProvider(
new DirectoryInfo(lockDirPath)
);
var trackerA = new DistributedBatchTracker(
cache,
lockProvider,
NullLogger<DistributedBatchTracker>.Instance
);
var trackerB = new DistributedBatchTracker(
cache,
lockProvider,
NullLogger<DistributedBatchTracker>.Instance
);
var batchId = $"batch-{Guid.NewGuid():N}";
var uniqueMessageIds = Enumerable.Range(1, 20).Select(i => $"msg-{i}").ToList();
await Parallel.ForEachAsync(
uniqueMessageIds,
new ParallelOptions { MaxDegreeOfParallelism = 4 },
async (messageId, _) =>
{
// 同一条消息在两个实例上重复上报,验证多实例并发 + 去重的正确性
await trackerA.RecordMessageConsumedAsync(batchId, messageId);
await trackerB.RecordMessageConsumedAsync(batchId, messageId);
}
);
var progress = await trackerA.GetBatchProgressAsync(batchId);
Assert.Equal(uniqueMessageIds.Count, progress);
var completed = await trackerB.IsBatchCompletedAsync(batchId, uniqueMessageIds.Count);
Assert.True(completed);
await trackerA.CleanupBatchAsync(batchId);
}
finally
{
if (Directory.Exists(lockDirPath))
{
Directory.Delete(lockDirPath, recursive: true);
}
}
}
[Fact]
public async Task ConsumerWithResult_ShouldRepublishWithIncrementedRetryCount_WhenHandlerFails()
{
var handler = new FailingWrappedResultHandler();
var serviceProvider = new ServiceCollection()
.AddScoped<IMessageHandler<TestMessage, MessageHandlerResult<string>>>(_ => handler)
.BuildServiceProvider();
var connectionFactory = new Mock<IRabbitMQConnectionFactory>();
var routingConvention = new Mock<IMessageRoutingConvention>();
routingConvention.Setup(x => x.GetExchangeName<TestMessage>()).Returns("test-exchange");
routingConvention.Setup(x => x.GetQueueName<TestMessage>()).Returns("test-queue");
routingConvention.Setup(x => x.GetRoutingKey<TestMessage>()).Returns("test-routing-key");
routingConvention
.Setup(x => x.GetExchangeType<TestMessage>())
.Returns(MessageExchangeType.Direct);
var consumer = new RabbitMQConsumerBackgroundServiceWithResult<
TestMessage,
MessageHandlerResult<string>
>(
connectionFactory.Object,
routingConvention.Object,
serviceProvider,
NullLogger<
RabbitMQConsumerBackgroundServiceWithResult<
TestMessage,
MessageHandlerResult<string>
>
>.Instance
);
var channel = new Mock<IChannel>(MockBehavior.Strict);
ReadOnlyMemory<byte> republishedBody = default;
channel
.Setup(x =>
x.BasicPublishAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
)
)
.Callback<
string,
string,
bool,
BasicProperties,
ReadOnlyMemory<byte>,
CancellationToken
>((_, _, _, _, body, _) => republishedBody = body)
.Returns(ValueTask.CompletedTask);
channel
.Setup(x =>
x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>())
)
.Returns(ValueTask.CompletedTask);
channel
.Setup(x =>
x.BasicNackAsync(
It.IsAny<ulong>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()
)
)
.Returns(ValueTask.CompletedTask);
SetPrivateField(consumer, "_channel", channel.Object);
var originalMessage = new TestMessage
{
MessageId = Guid.NewGuid().ToString("N"),
RetryCount = 0,
Source = "unit-test",
Payload = "hello",
};
var deliverEventArgs = new BasicDeliverEventArgs(
consumerTag: "consumer-1",
deliveryTag: 100,
redelivered: false,
exchange: "test-exchange",
routingKey: "test-routing-key",
properties: new BasicProperties { MessageId = originalMessage.MessageId },
body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(originalMessage)),
cancellationToken: CancellationToken.None
);
var processMethod =
typeof(RabbitMQConsumerBackgroundServiceWithResult<
TestMessage,
MessageHandlerResult<string>
>).GetMethod("MessageHandlerAsync", BindingFlags.NonPublic | BindingFlags.Instance)
?? throw new InvalidOperationException("未找到 MessageHandlerAsync 方法");
if (
processMethod.Invoke(consumer, [deliverEventArgs, CancellationToken.None])
is not Task processTask
)
{
throw new InvalidOperationException("MessageHandlerAsync 调用失败");
}
await processTask;
channel.Verify(
x =>
x.BasicPublishAsync(
"test-exchange",
"test-routing-key",
false,
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
),
Times.Once
);
channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);
channel.Verify(
x =>
x.BasicNackAsync(
It.IsAny<ulong>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()
),
Times.Never
);
var republishedMessage = JsonSerializer.Deserialize<TestMessage>(republishedBody.Span);
Assert.NotNull(republishedMessage);
Assert.Equal(1, republishedMessage.RetryCount);
Assert.Equal(originalMessage.MessageId, republishedMessage.MessageId);
}
private static IFusionCache CreateDistributedCache()
{
return new FusionCache(new FusionCacheOptions());
}
private static void SetPrivateField<T>(T target, string fieldName, object value)
{
var field = typeof(T).GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
if (field == null)
{
throw new InvalidOperationException($"未找到字段: {fieldName}");
}
field.SetValue(target, value);
}
private sealed class TestMessage : MessageBase
{
public string Payload { get; set; } = string.Empty;
}
private sealed class FailingWrappedResultHandler
: IMessageHandler<TestMessage, MessageHandlerResult<string>>
{
public Task<MessageHandlerResult<string>> HandleAsync(
TestMessage message,
CancellationToken cancellationToken = default
)
{
return Task.FromResult(MessageHandlerResult<string>.Fail("force retry"));
}
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于测试消息队列可靠性的单元测试类,主要测试分布式环境下的消息处理机制。让我详细解释这个代码:
主要功能
这个测试类包含两个核心测试用例,用于验证消息队列系统在分布式环境下的可靠性:
1. 分布式批量追踪器测试 (DistributedBatchTracker_ShouldKeepAccurateProgress_InMultiInstanceScenario)
测试目标:验证在多实例并发场景下,分布式批量追踪器能否准确记录消息消费进度。
关键特性:
- 分布式锁:使用文件锁确保多实例间的同步
- 分布式缓存:使用FusionCache存储进度信息
- 并发安全:通过
Parallel.ForEachAsync模拟并发环境 - 消息去重:同一消息在两个实例上重复上报,验证去重逻辑
测试流程:
// 1. 创建临时锁目录和两个追踪器实例
var trackerA = new DistributedBatchTracker(...);
var trackerB = new DistributedBatchTracker(...);
// 2. 并发处理20条唯一消息
await Parallel.ForEachAsync(uniqueMessageIds, async (messageId, _) => {
await trackerA.RecordMessageConsumedAsync(batchId, messageId);
await trackerB.RecordMessageConsumedAsync(batchId, messageId);
});
// 3. 验证进度准确性和完成状态
2. 消费者重试机制测试 (ConsumerWithResult_ShouldRepublishWithIncrementedRetryCount_WhenHandlerFails)
测试目标:验证当消息处理失败时,消费者能否正确增加重试次数并重新发布消息。
关键特性:
- 失败处理:使用
FailingWrappedResultHandler模拟处理失败 - 重试计数:验证重发消息的重试次数正确递增
- 消息确认:验证正确的ACK机制
- Mock验证:使用Moq框架验证RabbitMQ操作
测试流程:
// 1. 设置失败的处理器和Mock的RabbitMQ组件
var handler = new FailingWrappedResultHandler();
var channel = new Mock<IChannel>();
// 2. 通过反射调用私有的消息处理方法
var processMethod = typeof(...).GetMethod("MessageHandlerAsync", ...);
await processMethod.Invoke(consumer, [deliverEventArgs, CancellationToken.None]);
// 3. 验证重发行为和重试计数递增
技术亮点
1. 依赖注入和Mock
var serviceProvider = new ServiceCollection()
.AddScoped<IMessageHandler<TestMessage, MessageHandlerResult<string>>>(_ => handler)
.BuildServiceProvider();
2. 反射调用私有方法
private static void SetPrivateField<T>(T target, string fieldName, object value)
{
var field = typeof(T).GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
field.SetValue(target, value);
}
3. 分布式锁和缓存
var lockProvider = new FileDistributedSynchronizationProvider(new DirectoryInfo(lockDirPath));
var cache = new FusionCache(new FusionCacheOptions());
测试模型
TestMessage
继承自MessageBase的简单测试消息,包含负载数据。
FailingWrappedResultHandler
专门用于测试的失败处理器,总是返回失败结果以触发重试机制。
总结
这个测试类通过模拟真实的分布式环境,验证了消息队列系统的两个关键特性:
- 分布式进度追踪:确保多实例环境下批量消息处理的准确性
- 失败重试机制:确保消息处理失败时的正确重试行为
测试覆盖了并发安全、分布式一致性、错误处理等关键场景,是一个相当全面的可靠性测试套件。
评论加载中...