using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Dpz.Core.MessageQueue.Test.RabbitMQ;

public class RabbitMQConsumerBackgroundServiceTests
{
    [Fact]
    public async Task ConsumerCore_ShouldAck_WhenHandlerSucceeds()
    {
        var handler = new DelegatingMessageHandler((_, _) => Task.FromResult(true));
        await using var provider = BuildRegularConsumerProvider(handler);
        var consumer = provider.GetRequiredService<TestableRabbitMQConsumerBackgroundService>();

        var channel = BuildChannelMock();
        var message = new TestMessage { MessageId = "m-success", RetryCount = 0 };
        var eventArgs = CreateEventArgs(message.MessageId);

        await consumer.InvokeHandleMessageAsyncCore(
            message,
            eventArgs,
            channel.Object,
            CancellationToken.None
        );

        channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);
        channel.Verify(
            x =>
                x.BasicNackAsync(
                    It.IsAny<ulong>(),
                    It.IsAny<bool>(),
                    It.IsAny<bool>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Never
        );
        channel.Verify(
            x =>
                x.BasicPublishAsync(
                    It.IsAny<string>(),
                    It.IsAny<string>(),
                    It.IsAny<bool>(),
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Never
        );
    }

    [Fact]
    public async Task ConsumerCore_ShouldRepublishAndAck_WhenHandlerFailsBeforeMaxRetry()
    {
        var handler = new DelegatingMessageHandler((_, _) => Task.FromResult(false));
        await using var provider = BuildRegularConsumerProvider(handler);
        var consumer = provider.GetRequiredService<TestableRabbitMQConsumerBackgroundService>();

        var channel = BuildChannelMock();
        ReadOnlyMemory<byte> republishedBody = default;
        channel
            .Setup(x =>
                x.BasicPublishAsync(
                    It.IsAny<string>(),
                    It.IsAny<string>(),
                    It.IsAny<bool>(),
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                )
            )
            .Callback<
                string,
                string,
                bool,
                BasicProperties,
                ReadOnlyMemory<byte>,
                CancellationToken
            >((_, _, _, _, body, _) => republishedBody = body)
            .Returns(ValueTask.CompletedTask);

        var message = new TestMessage { MessageId = "m-retry", RetryCount = 0 };
        var eventArgs = CreateEventArgs(message.MessageId);

        await consumer.InvokeHandleMessageAsyncCore(
            message,
            eventArgs,
            channel.Object,
            CancellationToken.None
        );

        channel.Verify(
            x =>
                x.BasicPublishAsync(
                    "test-exchange",
                    "test-routing-key",
                    false,
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Once
        );
        channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);

        var republishedMessage = JsonSerializer.Deserialize<TestMessage>(republishedBody.Span);
        Assert.NotNull(republishedMessage);
        Assert.Equal(1, republishedMessage.RetryCount);
        Assert.Equal(message.MessageId, republishedMessage.MessageId);
    }

    [Fact]
    public async Task ConsumerCore_ShouldNackWithoutRequeue_WhenHandlerFailsAtMaxRetry()
    {
        var handler = new DelegatingMessageHandler((_, _) => Task.FromResult(false));
        await using var provider = BuildRegularConsumerProvider(handler);
        var consumer = provider.GetRequiredService<TestableRabbitMQConsumerBackgroundService>();

        var channel = BuildChannelMock();
        var message = new TestMessage { MessageId = "m-drop", RetryCount = 3 };
        var eventArgs = CreateEventArgs(message.MessageId);

        await consumer.InvokeHandleMessageAsyncCore(
            message,
            eventArgs,
            channel.Object,
            CancellationToken.None
        );

        channel.Verify(
            x => x.BasicNackAsync(100, false, false, It.IsAny<CancellationToken>()),
            Times.Once
        );
        channel.Verify(
            x =>
                x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()),
            Times.Never
        );
        channel.Verify(
            x =>
                x.BasicPublishAsync(
                    It.IsAny<string>(),
                    It.IsAny<string>(),
                    It.IsAny<bool>(),
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Never
        );
    }

    [Fact]
    public async Task ConsumerWithResultCore_ShouldAck_WhenResultSuccess()
    {
        var handler = new DelegatingResultHandler(
            (_, _) => Task.FromResult(MessageHandlerResult<string>.Ok("ok"))
        );
        await using var provider = BuildResultConsumerProvider(handler);
        var consumer =
            provider.GetRequiredService<TestableRabbitMQConsumerBackgroundServiceWithResult>();

        var channel = BuildChannelMock();
        var message = new TestMessage { MessageId = "r-success", RetryCount = 0 };
        var eventArgs = CreateEventArgs(message.MessageId);

        await consumer.InvokeHandleMessageAsyncCore(
            message,
            eventArgs,
            channel.Object,
            CancellationToken.None
        );

        channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);
        channel.Verify(
            x =>
                x.BasicNackAsync(
                    It.IsAny<ulong>(),
                    It.IsAny<bool>(),
                    It.IsAny<bool>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Never
        );
    }

    [Fact]
    public async Task ConsumerWithResultCore_ShouldRepublishAndAck_WhenResultFailedBeforeMaxRetry()
    {
        var handler = new DelegatingResultHandler(
            (_, _) => Task.FromResult(MessageHandlerResult<string>.Fail("failed"))
        );
        await using var provider = BuildResultConsumerProvider(handler);
        var consumer =
            provider.GetRequiredService<TestableRabbitMQConsumerBackgroundServiceWithResult>();

        var channel = BuildChannelMock();
        ReadOnlyMemory<byte> republishedBody = default;
        channel
            .Setup(x =>
                x.BasicPublishAsync(
                    It.IsAny<string>(),
                    It.IsAny<string>(),
                    It.IsAny<bool>(),
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                )
            )
            .Callback<
                string,
                string,
                bool,
                BasicProperties,
                ReadOnlyMemory<byte>,
                CancellationToken
            >((_, _, _, _, body, _) => republishedBody = body)
            .Returns(ValueTask.CompletedTask);

        var message = new TestMessage { MessageId = "r-retry", RetryCount = 0 };
        var eventArgs = CreateEventArgs(message.MessageId);

        await consumer.InvokeHandleMessageAsyncCore(
            message,
            eventArgs,
            channel.Object,
            CancellationToken.None
        );

        channel.Verify(
            x =>
                x.BasicPublishAsync(
                    "test-exchange",
                    "test-routing-key",
                    false,
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                ),
            Times.Once
        );
        channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);

        var republishedMessage = JsonSerializer.Deserialize<TestMessage>(republishedBody.Span);
        Assert.NotNull(republishedMessage);
        Assert.Equal(1, republishedMessage.RetryCount);
        Assert.Equal(message.MessageId, republishedMessage.MessageId);
    }

    [Fact]
    public async Task ConsumerWithResultCore_ShouldNackWithoutRequeue_WhenResultFailedAtMaxRetry()
    {
        var handler = new DelegatingResultHandler(
            (_, _) => Task.FromResult(MessageHandlerResult<string>.Fail("failed"))
        );
        await using var provider = BuildResultConsumerProvider(handler);
        var consumer =
            provider.GetRequiredService<TestableRabbitMQConsumerBackgroundServiceWithResult>();

        var channel = BuildChannelMock();
        var message = new TestMessage { MessageId = "r-drop", RetryCount = 3 };
        var eventArgs = CreateEventArgs(message.MessageId);

        await consumer.InvokeHandleMessageAsyncCore(
            message,
            eventArgs,
            channel.Object,
            CancellationToken.None
        );

        channel.Verify(
            x => x.BasicNackAsync(100, false, false, It.IsAny<CancellationToken>()),
            Times.Once
        );
        channel.Verify(
            x =>
                x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()),
            Times.Never
        );
    }

    private static ServiceProvider BuildRegularConsumerProvider(DelegatingMessageHandler handler)
    {
        var routing = BuildRoutingConventionMock();
        var factory = new Mock<IRabbitMQConnectionFactory>();

        var services = new ServiceCollection();
        services.AddSingleton(factory.Object);
        services.AddSingleton(routing.Object);
        services.AddSingleton<ILoggerFactory>(NullLoggerFactory.Instance);
        services.AddSingleton(handler);
        services.AddScoped<IMessageHandler<TestMessage>>(_ => handler);
        services.AddTransient<TestableRabbitMQConsumerBackgroundService>();

        return services.BuildServiceProvider();
    }

    private static ServiceProvider BuildResultConsumerProvider(DelegatingResultHandler handler)
    {
        var routing = BuildRoutingConventionMock();
        var factory = new Mock<IRabbitMQConnectionFactory>();

        var services = new ServiceCollection();
        services.AddSingleton(factory.Object);
        services.AddSingleton(routing.Object);
        services.AddSingleton<ILoggerFactory>(NullLoggerFactory.Instance);
        services.AddSingleton(handler);
        services.AddScoped<IMessageHandler<TestMessage, MessageHandlerResult<string>>>(_ =>
            handler
        );
        services.AddTransient<TestableRabbitMQConsumerBackgroundServiceWithResult>();

        return services.BuildServiceProvider();
    }

    private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()
    {
        var routing = new Mock<IMessageRoutingConvention>();
        routing.Setup(x => x.GetExchangeName<TestMessage>()).Returns("test-exchange");
        routing.Setup(x => x.GetQueueName<TestMessage>()).Returns("test-queue");
        routing.Setup(x => x.GetRoutingKey<TestMessage>()).Returns("test-routing-key");
        routing.Setup(x => x.GetExchangeType<TestMessage>()).Returns(Enums.ExchangeType.Direct);
        return routing;
    }

    private static Mock<IChannel> BuildChannelMock()
    {
        var channel = new Mock<IChannel>();
        channel
            .Setup(x =>
                x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>())
            )
            .Returns(ValueTask.CompletedTask);
        channel
            .Setup(x =>
                x.BasicNackAsync(
                    It.IsAny<ulong>(),
                    It.IsAny<bool>(),
                    It.IsAny<bool>(),
                    It.IsAny<CancellationToken>()
                )
            )
            .Returns(ValueTask.CompletedTask);
        channel
            .Setup(x =>
                x.BasicPublishAsync(
                    It.IsAny<string>(),
                    It.IsAny<string>(),
                    It.IsAny<bool>(),
                    It.IsAny<BasicProperties>(),
                    It.IsAny<ReadOnlyMemory<byte>>(),
                    It.IsAny<CancellationToken>()
                )
            )
            .Returns(ValueTask.CompletedTask);
        return channel;
    }

    private static BasicDeliverEventArgs CreateEventArgs(string messageId)
    {
        return new BasicDeliverEventArgs(
            consumerTag: "consumer-1",
            deliveryTag: 100,
            redelivered: false,
            exchange: "test-exchange",
            routingKey: "test-routing-key",
            properties: new BasicProperties { MessageId = messageId },
            body: Encoding.UTF8.GetBytes("{}"),
            cancellationToken: CancellationToken.None
        );
    }

    private sealed class TestableRabbitMQConsumerBackgroundService(
        IRabbitMQConnectionFactory connectionFactory,
        IMessageRoutingConvention routingConvention,
        IServiceProvider serviceProvider,
        ILoggerFactory loggerFactory
    )
        : RabbitMQConsumerBackgroundService<TestMessage>(
            connectionFactory,
            routingConvention,
            serviceProvider,
            NullMessageOutboxStore.Instance.Value,
            loggerFactory
        )
    {
        public Task InvokeHandleMessageAsyncCore(
            TestMessage message,
            BasicDeliverEventArgs eventArgs,
            IChannel channel,
            CancellationToken cancellationToken
        ) => HandleMessageAsyncCore(message, eventArgs, channel, cancellationToken);
    }

    private sealed class TestableRabbitMQConsumerBackgroundServiceWithResult(
        IRabbitMQConnectionFactory connectionFactory,
        IMessageRoutingConvention routingConvention,
        IServiceProvider serviceProvider,
        ILoggerFactory loggerFactory
    )
        : RabbitMQConsumerBackgroundServiceWithResult<TestMessage, MessageHandlerResult<string>>(
            connectionFactory,
            routingConvention,
            serviceProvider,
            NullMessageOutboxStore.Instance.Value,
            loggerFactory
        )
    {
        public Task InvokeHandleMessageAsyncCore(
            TestMessage message,
            BasicDeliverEventArgs eventArgs,
            IChannel channel,
            CancellationToken cancellationToken
        ) => HandleMessageAsyncCore(message, eventArgs, channel, cancellationToken);
    }

    private sealed class DelegatingMessageHandler(
        Func<TestMessage, CancellationToken, Task<bool>> handler
    ) : IMessageHandler<TestMessage>
    {
        public Task<bool> HandleAsync(
            TestMessage message,
            CancellationToken cancellationToken = default
        )
        {
            return handler(message, cancellationToken);
        }
    }

    private sealed class DelegatingResultHandler(
        Func<TestMessage, CancellationToken, Task<MessageHandlerResult<string>>> handler
    ) : IMessageHandler<TestMessage, MessageHandlerResult<string>>
    {
        public Task<MessageHandlerResult<string>> HandleAsync(
            TestMessage message,
            CancellationToken cancellationToken = default
        )
        {
            return handler(message, cancellationToken);
        }
    }

    private sealed class TestMessage : MessageBase
    {
        public string Payload { get; set; } = string.Empty;
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个针对 RabbitMQ 消费者后台服务的完整单元测试类。让我来详细解释一下这个代码的结构和功能:

主要功能

这个测试类 RabbitMQConsumerBackgroundServiceTests 用于测试两种类型的 RabbitMQ 消费者:

  1. 常规消费者:返回 bool 类型结果的消息处理器
  2. 结果消费者:返回 MessageHandlerResult<T> 类型结果的消息处理器

测试场景

常规消费者测试场景

  1. ConsumerCore_ShouldAck_WhenHandlerSucceeds

    • 测试消息处理成功时,应该发送 ACK 确认
    • 验证不会发送 NACK 或重新发布消息
  2. ConsumerCore_ShouldRepublishAndAck_WhenHandlerFailsBeforeMaxRetry

    • 测试在达到最大重试次数前处理失败时的行为
    • 验证会重新发布消息(RetryCount +1)并发送 ACK
  3. ConsumerCore_ShouldNackWithoutRequeue_WhenHandlerFailsAtMaxRetry

    • 测试达到最大重试次数后处理失败的行为
    • 验证会发送 NACK(不重新入队)且不会重新发布

结果消费者测试场景

4-6. 针对返回 MessageHandlerResult<string> 的消费者进行相同的三种场景测试

核心组件

辅助类

// 可测试的消费者服务,暴露了内部的 HandleMessageAsyncCore 方法
private sealed class TestableRabbitMQConsumerBackgroundService

// 委托式消息处理器,允许通过 lambda 表达式定义处理逻辑
private sealed class DelegatingMessageHandler
private sealed class DelegatingResultHandler

// 测试用的消息类型
private sealed class TestMessage : MessageBase

Mock 对象构建

// 构建模拟的路由约定
private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()

// 构建模拟的 RabbitMQ 通道
private static Mock<IChannel> BuildChannelMock()

// 创建测试用的事件参数
private static BasicDeliverEventArgs CreateEventArgs(string messageId)

依赖注入容器构建

// 为常规消费者构建服务提供者
private static ServiceProvider BuildRegularConsumerProvider(DelegatingMessageHandler handler)

// 为结果消费者构建服务提供者  
private static ServiceProvider BuildResultConsumerProvider(DelegatingResultHandler handler)

测试策略

  1. 隔离测试:使用 Mock 对象隔离外部依赖(RabbitMQ 连接、通道等)
  2. 行为验证:通过 Verify 方法验证特定方法的调用次数和参数
  3. 状态验证:检查重新发布的消息内容(如 RetryCount 递增)
  4. 异步测试:所有测试方法都是异步的,符合现代 C# 测试实践

关键测试点

  • 消息确认机制:验证 ACK/NACK 的正确使用
  • 重试逻辑:确保重试计数正确递增
  • 消息重新发布:验证失败消息的重新发布逻辑
  • 最大重试限制:确保达到重试上限后正确处理消息

这个测试类很好地覆盖了消息队列消费者的核心功能,确保在各种场景下都能正确处理消息的生命周期。

评论加载中...