using System.Reflection;
using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Medallion.Threading.FileSystem;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using ZiggyCreatures.Caching.Fusion;
using MessageExchangeType = Dpz.Core.MessageQueue.Enums.ExchangeType;

namespace Dzp.Core.XunitBasic;

public class MessageQueueReliabilityTests
{
    [Fact]
    public async Task DistributedBatchTracker_ShouldKeepAccurateProgress_InMultiInstanceScenario()
    {
        var cache = CreateDistributedCache();
        var lockDirPath = Path.Combine(
            Path.GetTempPath(),
            "dpz.core.batch-lock",
            Guid.NewGuid().ToString("N")
        );
        Directory.CreateDirectory(lockDirPath);

        try
        {
            var lockProvider = new FileDistributedSynchronizationProvider(
                new DirectoryInfo(lockDirPath)
            );
            var trackerA = new DistributedBatchTracker(
                cache,
                lockProvider,
                NullLogger<DistributedBatchTracker>.Instance
            );
            var trackerB = new DistributedBatchTracker(
                cache,
                lockProvider,
                NullLogger<DistributedBatchTracker>.Instance
            );

            var batchId = $"batch-{Guid.NewGuid():N}";
            var uniqueMessageIds = Enumerable.Range(1, 20).Select(i => $"msg-{i}").ToList();

            await Parallel.ForEachAsync(
                uniqueMessageIds,
                new ParallelOptions { MaxDegreeOfParallelism = 4 },
                async (messageId, _) =>
                {
                    // 同一条消息在两个实例上重复上报,验证多实例并发 + 去重的正确性
                    await trackerA.RecordMessageConsumedAsync(batchId, messageId);
                    await trackerB.RecordMessageConsumedAsync(batchId, messageId);
                }
            );

            var progress = await trackerA.GetBatchProgressAsync(batchId);
            Assert.Equal(uniqueMessageIds.Count, progress);

            var completed = await trackerB.IsBatchCompletedAsync(batchId, uniqueMessageIds.Count);
            Assert.True(completed);

            await trackerA.CleanupBatchAsync(batchId);
        }
        finally
        {
            if (Directory.Exists(lockDirPath))
            {
                Directory.Delete(lockDirPath, recursive: true);
            }
        }
    }

    [Fact]
    public async Task ConsumerWithResult_ShouldRepublishWithIncrementedRetryCount_WhenHandlerFails()
    {
        var handler = new FailingWrappedResultHandler();
        var serviceProvider = new ServiceCollection()
            .AddScoped<IMessageHandler<TestMessage, MessageHandlerResult<string>>>(_ => handler)
            .BuildServiceProvider();

        var connectionFactory = new Mock<IRabbitMQConnectionFactory>();
        var routingConvention = new Mock<IMessageRoutingConvention>();
        routingConvention.Setup(x => x.GetExchangeName<TestMessage>()).Returns("test-exchange");
        routingConvention.Setup(x => x.GetQueueName<TestMessage>()).Returns("test-queue");
        routingConvention.Setup(x => x.GetRoutingKey<TestMessage>()).Returns("test-routing-key");
        routingConvention
            .Setup(x => x.GetExchangeType<TestMessage>())
            .Returns(MessageExchangeType.Direct);

        var consumer = new RabbitMQConsumerBackgroundServiceWithResult<
            TestMessage,
            MessageHandlerResult<string>
        >(
            connectionFactory.Object,
            routingConvention.Object,
            serviceProvider,
            NullLogger<
                RabbitMQConsumerBackgroundServiceWithResult<
                    TestMessage,
                    MessageHandlerResult<string>
                >
            >.Instance
        );

        var channel = new Mock<IChannel>(MockBehavior.Strict);
        ReadOnlyMemory<byte> republishedBody = default;

        channel
            .Setup(x =>
                x.BasicPublishAsync(
                    It.IsAny<string>(),
                    It.IsAny<string>(),
                    It.IsAny<bool>(),
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                )
            )
            .Callback<
                string,
                string,
                bool,
                BasicProperties,
                ReadOnlyMemory<byte>,
                CancellationToken
            >((_, _, _, _, body, _) => republishedBody = body)
            .Returns(ValueTask.CompletedTask);

        channel
            .Setup(x =>
                x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>())
            )
            .Returns(ValueTask.CompletedTask);

        channel
            .Setup(x =>
                x.BasicNackAsync(
                    It.IsAny<ulong>(),
                    It.IsAny<bool>(),
                    It.IsAny<bool>(),
                    It.IsAny<CancellationToken>()
                )
            )
            .Returns(ValueTask.CompletedTask);

        SetPrivateField(consumer, "_channel", channel.Object);

        var originalMessage = new TestMessage
        {
            MessageId = Guid.NewGuid().ToString("N"),
            RetryCount = 0,
            Source = "unit-test",
            Payload = "hello",
        };

        var deliverEventArgs = new BasicDeliverEventArgs(
            consumerTag: "consumer-1",
            deliveryTag: 100,
            redelivered: false,
            exchange: "test-exchange",
            routingKey: "test-routing-key",
            properties: new BasicProperties { MessageId = originalMessage.MessageId },
            body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(originalMessage)),
            cancellationToken: CancellationToken.None
        );

        var processMethod =
            typeof(RabbitMQConsumerBackgroundServiceWithResult<
                TestMessage,
                MessageHandlerResult<string>
            >).GetMethod("MessageHandlerAsync", BindingFlags.NonPublic | BindingFlags.Instance)
            ?? throw new InvalidOperationException("未找到 MessageHandlerAsync 方法");

        if (
            processMethod.Invoke(consumer, [deliverEventArgs, CancellationToken.None])
            is not Task processTask
        )
        {
            throw new InvalidOperationException("MessageHandlerAsync 调用失败");
        }

        await processTask;

        channel.Verify(
            x =>
                x.BasicPublishAsync(
                    "test-exchange",
                    "test-routing-key",
                    false,
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Once
        );

        channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);

        channel.Verify(
            x =>
                x.BasicNackAsync(
                    It.IsAny<ulong>(),
                    It.IsAny<bool>(),
                    It.IsAny<bool>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Never
        );

        var republishedMessage = JsonSerializer.Deserialize<TestMessage>(republishedBody.Span);
        Assert.NotNull(republishedMessage);
        Assert.Equal(1, republishedMessage.RetryCount);
        Assert.Equal(originalMessage.MessageId, republishedMessage.MessageId);
    }

    private static IFusionCache CreateDistributedCache()
    {
        return new FusionCache(new FusionCacheOptions());
    }

    private static void SetPrivateField<T>(T target, string fieldName, object value)
    {
        var field = typeof(T).GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
        if (field == null)
        {
            throw new InvalidOperationException($"未找到字段: {fieldName}");
        }

        field.SetValue(target, value);
    }

    private sealed class TestMessage : MessageBase
    {
        public string Payload { get; set; } = string.Empty;
    }

    private sealed class FailingWrappedResultHandler
        : IMessageHandler<TestMessage, MessageHandlerResult<string>>
    {
        public Task<MessageHandlerResult<string>> HandleAsync(
            TestMessage message,
            CancellationToken cancellationToken = default
        )
        {
            return Task.FromResult(MessageHandlerResult<string>.Fail("force retry"));
        }
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个用于测试消息队列可靠性的单元测试类,主要测试分布式环境下的消息处理机制。让我详细解释这个代码:

主要功能

这个测试类包含两个核心测试用例,用于验证消息队列系统在分布式环境下的可靠性:

1. 分布式批量追踪器测试 (DistributedBatchTracker_ShouldKeepAccurateProgress_InMultiInstanceScenario)

测试目标:验证在多实例并发场景下,分布式批量追踪器能否准确记录消息消费进度。

关键特性

  • 分布式锁:使用文件锁确保多实例间的同步
  • 分布式缓存:使用FusionCache存储进度信息
  • 并发安全:通过Parallel.ForEachAsync模拟并发环境
  • 消息去重:同一消息在两个实例上重复上报,验证去重逻辑

测试流程

// 1. 创建临时锁目录和两个追踪器实例
var trackerA = new DistributedBatchTracker(...);
var trackerB = new DistributedBatchTracker(...);

// 2. 并发处理20条唯一消息
await Parallel.ForEachAsync(uniqueMessageIds, async (messageId, _) => {
    await trackerA.RecordMessageConsumedAsync(batchId, messageId);
    await trackerB.RecordMessageConsumedAsync(batchId, messageId);
});

// 3. 验证进度准确性和完成状态

2. 消费者重试机制测试 (ConsumerWithResult_ShouldRepublishWithIncrementedRetryCount_WhenHandlerFails)

测试目标:验证当消息处理失败时,消费者能否正确增加重试次数并重新发布消息。

关键特性

  • 失败处理:使用FailingWrappedResultHandler模拟处理失败
  • 重试计数:验证重发消息的重试次数正确递增
  • 消息确认:验证正确的ACK机制
  • Mock验证:使用Moq框架验证RabbitMQ操作

测试流程

// 1. 设置失败的处理器和Mock的RabbitMQ组件
var handler = new FailingWrappedResultHandler();
var channel = new Mock<IChannel>();

// 2. 通过反射调用私有的消息处理方法
var processMethod = typeof(...).GetMethod("MessageHandlerAsync", ...);
await processMethod.Invoke(consumer, [deliverEventArgs, CancellationToken.None]);

// 3. 验证重发行为和重试计数递增

技术亮点

1. 依赖注入和Mock

var serviceProvider = new ServiceCollection()
    .AddScoped<IMessageHandler<TestMessage, MessageHandlerResult<string>>>(_ => handler)
    .BuildServiceProvider();

2. 反射调用私有方法

private static void SetPrivateField<T>(T target, string fieldName, object value)
{
    var field = typeof(T).GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
    field.SetValue(target, value);
}

3. 分布式锁和缓存

var lockProvider = new FileDistributedSynchronizationProvider(new DirectoryInfo(lockDirPath));
var cache = new FusionCache(new FusionCacheOptions());

测试模型

TestMessage

继承自MessageBase的简单测试消息,包含负载数据。

FailingWrappedResultHandler

专门用于测试的失败处理器,总是返回失败结果以触发重试机制。

总结

这个测试类通过模拟真实的分布式环境,验证了消息队列系统的两个关键特性:

  1. 分布式进度追踪:确保多实例环境下批量消息处理的准确性
  2. 失败重试机制:确保消息处理失败时的正确重试行为

测试覆盖了并发安全、分布式一致性、错误处理等关键场景,是一个相当全面的可靠性测试套件。

评论加载中...