using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
namespace Dpz.Core.MessageQueue.Test.RabbitMQ;
/// <summary>
/// 验证 <see cref="OutboxMessagePublisher{TMessage}"/> 装饰器在发布成功/失败时
/// 正确驱动 <see cref="IMessageOutboxStore"/> 的状态转换。
/// 此处用单元测试覆盖失败路径(强制 inner publisher 抛错);
/// 成功路径由 <see cref="OutboxEndToEndIntegrationTests"/> 真实 RabbitMQ 验证。
/// </summary>
public class OutboxMessagePublisherTests
{
[Fact]
public async Task PublishAsync_ShouldCreateThenMarkPublishFailed_WhenInnerThrows()
{
var outboxStore = new InMemoryOutboxStore();
var routing = BuildRoutingConventionMock();
var inner = BuildFailingInnerPublisher();
var sut = new OutboxMessagePublisher<TestMessage>(
inner,
outboxStore,
routing.Object,
NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
);
var message = new TestMessage { MessageId = "m-fail-1", Source = "test-src" };
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() =>
sut.PublishAsync(message)
);
// Outbox 操作顺序:先 Create,再 MarkPublishFailed
Assert.Equal(2, outboxStore.Operations.Count);
Assert.Equal(("Create", "m-fail-1"), outboxStore.Operations[0]);
Assert.Equal(("MarkPublishFailed", "m-fail-1"), outboxStore.Operations[1]);
// Create 时载荷字段正确
var record = outboxStore.Records["m-fail-1"];
Assert.Equal("test-exchange", record.Exchange);
Assert.Equal("test-routing-key", record.RoutingKey);
Assert.Equal(typeof(TestMessage).FullName, record.MessageType);
Assert.Equal("test-src", record.Source);
Assert.Contains("\"MessageId\":\"m-fail-1\"", record.Payload);
Assert.Equal(ex.Message, record.LastError);
}
[Fact]
public async Task PublishAsync_ShouldUseCustomRoutingKey_WhenProvided()
{
var outboxStore = new InMemoryOutboxStore();
var routing = BuildRoutingConventionMock();
var inner = BuildFailingInnerPublisher();
var sut = new OutboxMessagePublisher<TestMessage>(
inner,
outboxStore,
routing.Object,
NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
);
var message = new TestMessage { MessageId = "m-custom-rk" };
await Assert.ThrowsAsync<InvalidOperationException>(() =>
sut.PublishAsync(message, "custom.rk")
);
var record = outboxStore.Records["m-custom-rk"];
Assert.Equal("custom.rk", record.RoutingKey);
}
[Fact]
public async Task PublishBatchAsync_ShouldCreateAllThenMarkAllFailed_WhenInnerThrows()
{
var outboxStore = new InMemoryOutboxStore();
var routing = BuildRoutingConventionMock();
var inner = BuildFailingInnerPublisher();
var sut = new OutboxMessagePublisher<TestMessage>(
inner,
outboxStore,
routing.Object,
NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
);
var messages = new[]
{
new TestMessage { MessageId = "b-1" },
new TestMessage { MessageId = "b-2" },
new TestMessage { MessageId = "b-3" },
};
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.PublishBatchAsync(messages));
// 期望顺序:Create b-1, Create b-2, Create b-3, 然后 MarkPublishFailed b-1, b-2, b-3
var ops = outboxStore.Operations;
Assert.Equal(6, ops.Count);
Assert.Equal(("Create", "b-1"), ops[0]);
Assert.Equal(("Create", "b-2"), ops[1]);
Assert.Equal(("Create", "b-3"), ops[2]);
Assert.Equal(("MarkPublishFailed", "b-1"), ops[3]);
Assert.Equal(("MarkPublishFailed", "b-2"), ops[4]);
Assert.Equal(("MarkPublishFailed", "b-3"), ops[5]);
}
[Fact]
public async Task PublishBatchAsync_ShouldDoNothing_WhenMessagesEmpty()
{
var outboxStore = new InMemoryOutboxStore();
var routing = BuildRoutingConventionMock();
var inner = BuildFailingInnerPublisher();
var sut = new OutboxMessagePublisher<TestMessage>(
inner,
outboxStore,
routing.Object,
NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
);
// OutboxMessagePublisher 当前实现:批量为空时仍调用 inner.PublishBatchAsync(空集合直接返回,不抛异常)
// 因此外层不会抛异常,但 Outbox 不会有任何 Create 记录
await sut.PublishBatchAsync([]);
Assert.Empty(outboxStore.Operations);
}
private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()
{
var routing = new Mock<IMessageRoutingConvention>();
routing.Setup(x => x.GetExchangeName<TestMessage>()).Returns("test-exchange");
routing.Setup(x => x.GetQueueName<TestMessage>()).Returns("test-queue");
routing.Setup(x => x.GetRoutingKey<TestMessage>()).Returns("test-routing-key");
routing.Setup(x => x.GetExchangeType<TestMessage>()).Returns(Enums.ExchangeType.Direct);
return routing;
}
/// <summary>
/// 构造一个 inner publisher:连接工厂在创建 Channel 时抛出,
/// 触发 RabbitMQPublisher 的失败路径,使 Outbox 装饰器进入 catch 分支。
/// </summary>
private static RabbitMQPublisher<TestMessage> BuildFailingInnerPublisher()
{
var connectionFactory = new Mock<IRabbitMQConnectionFactory>();
connectionFactory
.Setup(x => x.CreateChannelAsync())
.ThrowsAsync(new InvalidOperationException("test-broker-unavailable"));
var routing = BuildRoutingConventionMock();
return new RabbitMQPublisher<TestMessage>(
connectionFactory.Object,
routing.Object,
NullLogger<RabbitMQPublisher<TestMessage>>.Instance
);
}
private sealed class TestMessage : MessageBase { }
/// <summary>
/// 简单的 Outbox 内存实现,用于断言操作顺序与字段。
/// </summary>
private sealed class InMemoryOutboxStore : IMessageOutboxStore
{
public List<(string Op, string MessageId)> Operations { get; } = [];
public Dictionary<string, RecordSnapshot> Records { get; } = [];
public Task CreateAsync(
string messageId,
string messageType,
string exchange,
string routingKey,
string payload,
string? source,
CancellationToken cancellationToken = default
)
{
Operations.Add(("Create", messageId));
Records[messageId] = new RecordSnapshot(
messageType,
exchange,
routingKey,
payload,
source,
null
);
return Task.CompletedTask;
}
public Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default)
{
Operations.Add(("MarkSent", messageId));
return Task.CompletedTask;
}
public Task MarkPublishFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
)
{
Operations.Add(("MarkPublishFailed", messageId));
if (Records.TryGetValue(messageId, out var existing))
{
Records[messageId] = existing with { LastError = error };
}
return Task.CompletedTask;
}
public Task MarkConsumedAsync(
string messageId,
CancellationToken cancellationToken = default
)
{
Operations.Add(("MarkConsumed", messageId));
return Task.CompletedTask;
}
public Task MarkConsumeFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
)
{
Operations.Add(("MarkConsumeFailed", messageId));
return Task.CompletedTask;
}
public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
) => Task.FromResult<IReadOnlyList<MessageOutboxEntry>>([]);
public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
) => Task.FromResult<IReadOnlyList<MessageOutboxEntry>>([]);
}
private sealed record RecordSnapshot(
string MessageType,
string Exchange,
string RoutingKey,
string Payload,
string? Source,
string? LastError
);
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个单元测试文件,专门测试 OutboxMessagePublisher<TMessage> 装饰器类的行为。让我详细解释这个测试类的各个部分:
测试目的
这个测试类主要验证 OutboxMessagePublisher 装饰器在消息发布失败时能否正确地驱动 IMessageOutboxStore 的状态转换。采用了测试金字塔策略:
- 单元测试:覆盖失败路径(强制内部发布器抛异常)
- 集成测试:由
OutboxEndToEndIntegrationTests使用真实 RabbitMQ 验证成功路径
主要测试用例
1. PublishAsync_ShouldCreateThenMarkPublishFailed_WhenInnerThrows
[Fact]
public async Task PublishAsync_ShouldCreateThenMarkPublishFailed_WhenInnerThrows()
- 目的:验证单条消息发布失败时的 Outbox 操作顺序
- 验证点:
- 操作顺序:先
Create再MarkPublishFailed - Outbox 记录的字段正确性(exchange、routing key、payload等)
- 错误信息被正确记录
- 操作顺序:先
2. PublishAsync_ShouldUseCustomRoutingKey_WhenProvided
[Fact]
public async Task PublishAsync_ShouldUseCustomRoutingKey_WhenProvided()
- 目的:验证自定义 routing key 功能
- 验证点:当提供自定义 routing key 时,Outbox 记录使用该值而非默认值
3. PublishBatchAsync_ShouldCreateAllThenMarkAllFailed_WhenInnerThrows
[Fact]
public async Task PublishBatchAsync_ShouldCreateAllThenMarkAllFailed_WhenInnerThrows()
- 目的:验证批量消息发布失败时的处理
- 验证点:
- 所有消息都先执行
Create操作 - 然后所有消息都执行
MarkPublishFailed操作 - 操作顺序正确
- 所有消息都先执行
4. PublishBatchAsync_ShouldDoNothing_WhenMessagesEmpty
[Fact]
public async Task PublishBatchAsync_ShouldDoNothing_WhenMessagesEmpty()
- 目的:验证空批量的边界情况
- 验证点:空消息列表不会产生任何 Outbox 操作
辅助方法和类
Mock 构建方法
private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()
- 创建路由约定的 Mock 对象,返回固定的测试值
private static RabbitMQPublisher<TestMessage> BuildFailingInnerPublisher()
- 创建一个会失败的内部发布器
- 通过 Mock
IRabbitMQConnectionFactory在创建 Channel 时抛异常来模拟失败
测试辅助类
TestMessage
private sealed class TestMessage : MessageBase { }
- 简单的测试消息类型
InMemoryOutboxStore
private sealed class InMemoryOutboxStore : IMessageOutboxStore
- 内存实现的 Outbox 存储,用于测试
- 核心功能:
Operations:记录操作顺序(用于断言操作序列)Records:存储消息记录快照(用于断言字段值)
- 实现了
IMessageOutboxStore的所有方法,但只在测试关心的方法中记录操作
RecordSnapshot
private sealed record RecordSnapshot(...)
- 记录 Outbox 条目的快照,便于测试验证字段值
测试策略特点
- 失败路径专注:专门测试异常情况,确保错误处理正确
- 操作顺序验证:通过记录操作序列来验证 Outbox 状态机的正确转换
- 字段完整性检查:验证 Outbox 记录中的所有关键字段
- 边界情况覆盖:包含空批量等边界情况
- Mock 隔离:使用 Mock 隔离外部依赖,确保测试的独立性和可重复性
这种测试设计很好地体现了单元测试的最佳实践:专注于单一职责、快速执行、与外部依赖隔离。
评论加载中...