using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Dpz.Core.MessageQueue.Test.RabbitMQ;
public class RabbitMQConsumerBackgroundServiceTests
{
[Fact]
public async Task ConsumerCore_ShouldAck_WhenHandlerSucceeds()
{
var handler = new DelegatingMessageHandler((_, _) => Task.FromResult(true));
await using var provider = BuildRegularConsumerProvider(handler);
var consumer = provider.GetRequiredService<TestableRabbitMQConsumerBackgroundService>();
var channel = BuildChannelMock();
var message = new TestMessage { MessageId = "m-success", RetryCount = 0 };
var eventArgs = CreateEventArgs(message.MessageId);
await consumer.InvokeHandleMessageAsyncCore(
message,
eventArgs,
channel.Object,
CancellationToken.None
);
channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);
channel.Verify(
x =>
x.BasicNackAsync(
It.IsAny<ulong>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()
),
Times.Never
);
channel.Verify(
x =>
x.BasicPublishAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
),
Times.Never
);
}
[Fact]
public async Task ConsumerCore_ShouldRepublishAndAck_WhenHandlerFailsBeforeMaxRetry()
{
var handler = new DelegatingMessageHandler((_, _) => Task.FromResult(false));
await using var provider = BuildRegularConsumerProvider(handler);
var consumer = provider.GetRequiredService<TestableRabbitMQConsumerBackgroundService>();
var channel = BuildChannelMock();
ReadOnlyMemory<byte> republishedBody = default;
channel
.Setup(x =>
x.BasicPublishAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
)
)
.Callback<
string,
string,
bool,
BasicProperties,
ReadOnlyMemory<byte>,
CancellationToken
>((_, _, _, _, body, _) => republishedBody = body)
.Returns(ValueTask.CompletedTask);
var message = new TestMessage { MessageId = "m-retry", RetryCount = 0 };
var eventArgs = CreateEventArgs(message.MessageId);
await consumer.InvokeHandleMessageAsyncCore(
message,
eventArgs,
channel.Object,
CancellationToken.None
);
channel.Verify(
x =>
x.BasicPublishAsync(
"test-exchange",
"test-routing-key",
false,
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
),
Times.Once
);
channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);
var republishedMessage = JsonSerializer.Deserialize<TestMessage>(republishedBody.Span);
Assert.NotNull(republishedMessage);
Assert.Equal(1, republishedMessage.RetryCount);
Assert.Equal(message.MessageId, republishedMessage.MessageId);
}
[Fact]
public async Task ConsumerCore_ShouldNackWithoutRequeue_WhenHandlerFailsAtMaxRetry()
{
var handler = new DelegatingMessageHandler((_, _) => Task.FromResult(false));
await using var provider = BuildRegularConsumerProvider(handler);
var consumer = provider.GetRequiredService<TestableRabbitMQConsumerBackgroundService>();
var channel = BuildChannelMock();
var message = new TestMessage { MessageId = "m-drop", RetryCount = 3 };
var eventArgs = CreateEventArgs(message.MessageId);
await consumer.InvokeHandleMessageAsyncCore(
message,
eventArgs,
channel.Object,
CancellationToken.None
);
channel.Verify(
x => x.BasicNackAsync(100, false, false, It.IsAny<CancellationToken>()),
Times.Once
);
channel.Verify(
x =>
x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()),
Times.Never
);
channel.Verify(
x =>
x.BasicPublishAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
),
Times.Never
);
}
[Fact]
public async Task ConsumerWithResultCore_ShouldAck_WhenResultSuccess()
{
var handler = new DelegatingResultHandler(
(_, _) => Task.FromResult(MessageHandlerResult<string>.Ok("ok"))
);
await using var provider = BuildResultConsumerProvider(handler);
var consumer =
provider.GetRequiredService<TestableRabbitMQConsumerBackgroundServiceWithResult>();
var channel = BuildChannelMock();
var message = new TestMessage { MessageId = "r-success", RetryCount = 0 };
var eventArgs = CreateEventArgs(message.MessageId);
await consumer.InvokeHandleMessageAsyncCore(
message,
eventArgs,
channel.Object,
CancellationToken.None
);
channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);
channel.Verify(
x =>
x.BasicNackAsync(
It.IsAny<ulong>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()
),
Times.Never
);
}
[Fact]
public async Task ConsumerWithResultCore_ShouldRepublishAndAck_WhenResultFailedBeforeMaxRetry()
{
var handler = new DelegatingResultHandler(
(_, _) => Task.FromResult(MessageHandlerResult<string>.Fail("failed"))
);
await using var provider = BuildResultConsumerProvider(handler);
var consumer =
provider.GetRequiredService<TestableRabbitMQConsumerBackgroundServiceWithResult>();
var channel = BuildChannelMock();
ReadOnlyMemory<byte> republishedBody = default;
channel
.Setup(x =>
x.BasicPublishAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
)
)
.Callback<
string,
string,
bool,
BasicProperties,
ReadOnlyMemory<byte>,
CancellationToken
>((_, _, _, _, body, _) => republishedBody = body)
.Returns(ValueTask.CompletedTask);
var message = new TestMessage { MessageId = "r-retry", RetryCount = 0 };
var eventArgs = CreateEventArgs(message.MessageId);
await consumer.InvokeHandleMessageAsyncCore(
message,
eventArgs,
channel.Object,
CancellationToken.None
);
channel.Verify(
x =>
x.BasicPublishAsync(
"test-exchange",
"test-routing-key",
false,
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
),
Times.Once
);
channel.Verify(x => x.BasicAckAsync(100, false, It.IsAny<CancellationToken>()), Times.Once);
var republishedMessage = JsonSerializer.Deserialize<TestMessage>(republishedBody.Span);
Assert.NotNull(republishedMessage);
Assert.Equal(1, republishedMessage.RetryCount);
Assert.Equal(message.MessageId, republishedMessage.MessageId);
}
[Fact]
public async Task ConsumerWithResultCore_ShouldNackWithoutRequeue_WhenResultFailedAtMaxRetry()
{
var handler = new DelegatingResultHandler(
(_, _) => Task.FromResult(MessageHandlerResult<string>.Fail("failed"))
);
await using var provider = BuildResultConsumerProvider(handler);
var consumer =
provider.GetRequiredService<TestableRabbitMQConsumerBackgroundServiceWithResult>();
var channel = BuildChannelMock();
var message = new TestMessage { MessageId = "r-drop", RetryCount = 3 };
var eventArgs = CreateEventArgs(message.MessageId);
await consumer.InvokeHandleMessageAsyncCore(
message,
eventArgs,
channel.Object,
CancellationToken.None
);
channel.Verify(
x => x.BasicNackAsync(100, false, false, It.IsAny<CancellationToken>()),
Times.Once
);
channel.Verify(
x =>
x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()),
Times.Never
);
}
private static ServiceProvider BuildRegularConsumerProvider(DelegatingMessageHandler handler)
{
var routing = BuildRoutingConventionMock();
var factory = new Mock<IRabbitMQConnectionFactory>();
var services = new ServiceCollection();
services.AddSingleton(factory.Object);
services.AddSingleton(routing.Object);
services.AddSingleton<ILoggerFactory>(NullLoggerFactory.Instance);
services.AddSingleton(handler);
services.AddScoped<IMessageHandler<TestMessage>>(_ => handler);
services.AddTransient<TestableRabbitMQConsumerBackgroundService>();
return services.BuildServiceProvider();
}
private static ServiceProvider BuildResultConsumerProvider(DelegatingResultHandler handler)
{
var routing = BuildRoutingConventionMock();
var factory = new Mock<IRabbitMQConnectionFactory>();
var services = new ServiceCollection();
services.AddSingleton(factory.Object);
services.AddSingleton(routing.Object);
services.AddSingleton<ILoggerFactory>(NullLoggerFactory.Instance);
services.AddSingleton(handler);
services.AddScoped<IMessageHandler<TestMessage, MessageHandlerResult<string>>>(_ =>
handler
);
services.AddTransient<TestableRabbitMQConsumerBackgroundServiceWithResult>();
return services.BuildServiceProvider();
}
private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()
{
var routing = new Mock<IMessageRoutingConvention>();
routing.Setup(x => x.GetExchangeName<TestMessage>()).Returns("test-exchange");
routing.Setup(x => x.GetQueueName<TestMessage>()).Returns("test-queue");
routing.Setup(x => x.GetRoutingKey<TestMessage>()).Returns("test-routing-key");
routing.Setup(x => x.GetExchangeType<TestMessage>()).Returns(Enums.ExchangeType.Direct);
return routing;
}
private static Mock<IChannel> BuildChannelMock()
{
var channel = new Mock<IChannel>();
channel
.Setup(x =>
x.BasicAckAsync(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<CancellationToken>())
)
.Returns(ValueTask.CompletedTask);
channel
.Setup(x =>
x.BasicNackAsync(
It.IsAny<ulong>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()
)
)
.Returns(ValueTask.CompletedTask);
channel
.Setup(x =>
x.BasicPublishAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<BasicProperties>(),
It.IsAny<ReadOnlyMemory<byte>>(),
It.IsAny<CancellationToken>()
)
)
.Returns(ValueTask.CompletedTask);
return channel;
}
private static BasicDeliverEventArgs CreateEventArgs(string messageId)
{
return new BasicDeliverEventArgs(
consumerTag: "consumer-1",
deliveryTag: 100,
redelivered: false,
exchange: "test-exchange",
routingKey: "test-routing-key",
properties: new BasicProperties { MessageId = messageId },
body: Encoding.UTF8.GetBytes("{}"),
cancellationToken: CancellationToken.None
);
}
private sealed class TestableRabbitMQConsumerBackgroundService(
IRabbitMQConnectionFactory connectionFactory,
IMessageRoutingConvention routingConvention,
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory
)
: RabbitMQConsumerBackgroundService<TestMessage>(
connectionFactory,
routingConvention,
serviceProvider,
NullMessageOutboxStore.Instance.Value,
loggerFactory
)
{
public Task InvokeHandleMessageAsyncCore(
TestMessage message,
BasicDeliverEventArgs eventArgs,
IChannel channel,
CancellationToken cancellationToken
) => HandleMessageAsyncCore(message, eventArgs, channel, cancellationToken);
}
private sealed class TestableRabbitMQConsumerBackgroundServiceWithResult(
IRabbitMQConnectionFactory connectionFactory,
IMessageRoutingConvention routingConvention,
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory
)
: RabbitMQConsumerBackgroundServiceWithResult<TestMessage, MessageHandlerResult<string>>(
connectionFactory,
routingConvention,
serviceProvider,
NullMessageOutboxStore.Instance.Value,
loggerFactory
)
{
public Task InvokeHandleMessageAsyncCore(
TestMessage message,
BasicDeliverEventArgs eventArgs,
IChannel channel,
CancellationToken cancellationToken
) => HandleMessageAsyncCore(message, eventArgs, channel, cancellationToken);
}
private sealed class DelegatingMessageHandler(
Func<TestMessage, CancellationToken, Task<bool>> handler
) : IMessageHandler<TestMessage>
{
public Task<bool> HandleAsync(
TestMessage message,
CancellationToken cancellationToken = default
)
{
return handler(message, cancellationToken);
}
}
private sealed class DelegatingResultHandler(
Func<TestMessage, CancellationToken, Task<MessageHandlerResult<string>>> handler
) : IMessageHandler<TestMessage, MessageHandlerResult<string>>
{
public Task<MessageHandlerResult<string>> HandleAsync(
TestMessage message,
CancellationToken cancellationToken = default
)
{
return handler(message, cancellationToken);
}
}
private sealed class TestMessage : MessageBase
{
public string Payload { get; set; } = string.Empty;
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个针对 RabbitMQ 消费者后台服务的完整单元测试类。让我来详细解释一下这个代码的结构和功能:
主要功能
这个测试类 RabbitMQConsumerBackgroundServiceTests 用于测试两种类型的 RabbitMQ 消费者:
- 常规消费者:返回
bool类型结果的消息处理器 - 结果消费者:返回
MessageHandlerResult<T>类型结果的消息处理器
测试场景
常规消费者测试场景
ConsumerCore_ShouldAck_WhenHandlerSucceeds- 测试消息处理成功时,应该发送 ACK 确认
- 验证不会发送 NACK 或重新发布消息
ConsumerCore_ShouldRepublishAndAck_WhenHandlerFailsBeforeMaxRetry- 测试在达到最大重试次数前处理失败时的行为
- 验证会重新发布消息(RetryCount +1)并发送 ACK
ConsumerCore_ShouldNackWithoutRequeue_WhenHandlerFailsAtMaxRetry- 测试达到最大重试次数后处理失败的行为
- 验证会发送 NACK(不重新入队)且不会重新发布
结果消费者测试场景
4-6. 针对返回 MessageHandlerResult<string> 的消费者进行相同的三种场景测试
核心组件
辅助类
// 可测试的消费者服务,暴露了内部的 HandleMessageAsyncCore 方法
private sealed class TestableRabbitMQConsumerBackgroundService
// 委托式消息处理器,允许通过 lambda 表达式定义处理逻辑
private sealed class DelegatingMessageHandler
private sealed class DelegatingResultHandler
// 测试用的消息类型
private sealed class TestMessage : MessageBase
Mock 对象构建
// 构建模拟的路由约定
private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()
// 构建模拟的 RabbitMQ 通道
private static Mock<IChannel> BuildChannelMock()
// 创建测试用的事件参数
private static BasicDeliverEventArgs CreateEventArgs(string messageId)
依赖注入容器构建
// 为常规消费者构建服务提供者
private static ServiceProvider BuildRegularConsumerProvider(DelegatingMessageHandler handler)
// 为结果消费者构建服务提供者
private static ServiceProvider BuildResultConsumerProvider(DelegatingResultHandler handler)
测试策略
- 隔离测试:使用 Mock 对象隔离外部依赖(RabbitMQ 连接、通道等)
- 行为验证:通过
Verify方法验证特定方法的调用次数和参数 - 状态验证:检查重新发布的消息内容(如 RetryCount 递增)
- 异步测试:所有测试方法都是异步的,符合现代 C# 测试实践
关键测试点
- 消息确认机制:验证 ACK/NACK 的正确使用
- 重试逻辑:确保重试计数正确递增
- 消息重新发布:验证失败消息的重新发布逻辑
- 最大重试限制:确保达到重试上限后正确处理消息
这个测试类很好地覆盖了消息队列消费者的核心功能,确保在各种场景下都能正确处理消息的生命周期。
评论加载中...