using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;

namespace Dpz.Core.MessageQueue.Test.RabbitMQ;

/// <summary>
/// 验证 <see cref="OutboxMessagePublisher{TMessage}"/> 装饰器在发布成功/失败时
/// 正确驱动 <see cref="IMessageOutboxStore"/> 的状态转换。
/// 此处用单元测试覆盖失败路径(强制 inner publisher 抛错);
/// 成功路径由 <see cref="OutboxEndToEndIntegrationTests"/> 真实 RabbitMQ 验证。
/// </summary>
public class OutboxMessagePublisherTests
{
    [Fact]
    public async Task PublishAsync_ShouldCreateThenMarkPublishFailed_WhenInnerThrows()
    {
        var outboxStore = new InMemoryOutboxStore();
        var routing = BuildRoutingConventionMock();
        var inner = BuildFailingInnerPublisher();

        var sut = new OutboxMessagePublisher<TestMessage>(
            inner,
            outboxStore,
            routing.Object,
            NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
        );

        var message = new TestMessage { MessageId = "m-fail-1", Source = "test-src" };

        var ex = await Assert.ThrowsAsync<InvalidOperationException>(() =>
            sut.PublishAsync(message)
        );

        // Outbox 操作顺序:先 Create,再 MarkPublishFailed
        Assert.Equal(2, outboxStore.Operations.Count);
        Assert.Equal(("Create", "m-fail-1"), outboxStore.Operations[0]);
        Assert.Equal(("MarkPublishFailed", "m-fail-1"), outboxStore.Operations[1]);

        // Create 时载荷字段正确
        var record = outboxStore.Records["m-fail-1"];
        Assert.Equal("test-exchange", record.Exchange);
        Assert.Equal("test-routing-key", record.RoutingKey);
        Assert.Equal(typeof(TestMessage).FullName, record.MessageType);
        Assert.Equal("test-src", record.Source);
        Assert.Contains("\"MessageId\":\"m-fail-1\"", record.Payload);
        Assert.Equal(ex.Message, record.LastError);
    }

    [Fact]
    public async Task PublishAsync_ShouldUseCustomRoutingKey_WhenProvided()
    {
        var outboxStore = new InMemoryOutboxStore();
        var routing = BuildRoutingConventionMock();
        var inner = BuildFailingInnerPublisher();

        var sut = new OutboxMessagePublisher<TestMessage>(
            inner,
            outboxStore,
            routing.Object,
            NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
        );

        var message = new TestMessage { MessageId = "m-custom-rk" };

        await Assert.ThrowsAsync<InvalidOperationException>(() =>
            sut.PublishAsync(message, "custom.rk")
        );

        var record = outboxStore.Records["m-custom-rk"];
        Assert.Equal("custom.rk", record.RoutingKey);
    }

    [Fact]
    public async Task PublishBatchAsync_ShouldCreateAllThenMarkAllFailed_WhenInnerThrows()
    {
        var outboxStore = new InMemoryOutboxStore();
        var routing = BuildRoutingConventionMock();
        var inner = BuildFailingInnerPublisher();

        var sut = new OutboxMessagePublisher<TestMessage>(
            inner,
            outboxStore,
            routing.Object,
            NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
        );

        var messages = new[]
        {
            new TestMessage { MessageId = "b-1" },
            new TestMessage { MessageId = "b-2" },
            new TestMessage { MessageId = "b-3" },
        };

        await Assert.ThrowsAsync<InvalidOperationException>(() => sut.PublishBatchAsync(messages));

        // 期望顺序:Create b-1, Create b-2, Create b-3, 然后 MarkPublishFailed b-1, b-2, b-3
        var ops = outboxStore.Operations;
        Assert.Equal(6, ops.Count);
        Assert.Equal(("Create", "b-1"), ops[0]);
        Assert.Equal(("Create", "b-2"), ops[1]);
        Assert.Equal(("Create", "b-3"), ops[2]);
        Assert.Equal(("MarkPublishFailed", "b-1"), ops[3]);
        Assert.Equal(("MarkPublishFailed", "b-2"), ops[4]);
        Assert.Equal(("MarkPublishFailed", "b-3"), ops[5]);
    }

    [Fact]
    public async Task PublishBatchAsync_ShouldDoNothing_WhenMessagesEmpty()
    {
        var outboxStore = new InMemoryOutboxStore();
        var routing = BuildRoutingConventionMock();
        var inner = BuildFailingInnerPublisher();

        var sut = new OutboxMessagePublisher<TestMessage>(
            inner,
            outboxStore,
            routing.Object,
            NullLogger<OutboxMessagePublisher<TestMessage>>.Instance
        );

        // OutboxMessagePublisher 当前实现:批量为空时仍调用 inner.PublishBatchAsync(空集合直接返回,不抛异常)
        // 因此外层不会抛异常,但 Outbox 不会有任何 Create 记录
        await sut.PublishBatchAsync([]);

        Assert.Empty(outboxStore.Operations);
    }

    private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()
    {
        var routing = new Mock<IMessageRoutingConvention>();
        routing.Setup(x => x.GetExchangeName<TestMessage>()).Returns("test-exchange");
        routing.Setup(x => x.GetQueueName<TestMessage>()).Returns("test-queue");
        routing.Setup(x => x.GetRoutingKey<TestMessage>()).Returns("test-routing-key");
        routing.Setup(x => x.GetExchangeType<TestMessage>()).Returns(Enums.ExchangeType.Direct);
        return routing;
    }

    /// <summary>
    /// 构造一个 inner publisher:连接工厂在创建 Channel 时抛出,
    /// 触发 RabbitMQPublisher 的失败路径,使 Outbox 装饰器进入 catch 分支。
    /// </summary>
    private static RabbitMQPublisher<TestMessage> BuildFailingInnerPublisher()
    {
        var connectionFactory = new Mock<IRabbitMQConnectionFactory>();
        connectionFactory
            .Setup(x => x.CreateChannelAsync())
            .ThrowsAsync(new InvalidOperationException("test-broker-unavailable"));

        var routing = BuildRoutingConventionMock();
        return new RabbitMQPublisher<TestMessage>(
            connectionFactory.Object,
            routing.Object,
            NullLogger<RabbitMQPublisher<TestMessage>>.Instance
        );
    }

    private sealed class TestMessage : MessageBase { }

    /// <summary>
    /// 简单的 Outbox 内存实现,用于断言操作顺序与字段。
    /// </summary>
    private sealed class InMemoryOutboxStore : IMessageOutboxStore
    {
        public List<(string Op, string MessageId)> Operations { get; } = [];
        public Dictionary<string, RecordSnapshot> Records { get; } = [];

        public Task CreateAsync(
            string messageId,
            string messageType,
            string exchange,
            string routingKey,
            string payload,
            string? source,
            CancellationToken cancellationToken = default
        )
        {
            Operations.Add(("Create", messageId));
            Records[messageId] = new RecordSnapshot(
                messageType,
                exchange,
                routingKey,
                payload,
                source,
                null
            );
            return Task.CompletedTask;
        }

        public Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default)
        {
            Operations.Add(("MarkSent", messageId));
            return Task.CompletedTask;
        }

        public Task MarkPublishFailedAsync(
            string messageId,
            string error,
            CancellationToken cancellationToken = default
        )
        {
            Operations.Add(("MarkPublishFailed", messageId));
            if (Records.TryGetValue(messageId, out var existing))
            {
                Records[messageId] = existing with { LastError = error };
            }
            return Task.CompletedTask;
        }

        public Task MarkConsumedAsync(
            string messageId,
            CancellationToken cancellationToken = default
        )
        {
            Operations.Add(("MarkConsumed", messageId));
            return Task.CompletedTask;
        }

        public Task MarkConsumeFailedAsync(
            string messageId,
            string error,
            CancellationToken cancellationToken = default
        )
        {
            Operations.Add(("MarkConsumeFailed", messageId));
            return Task.CompletedTask;
        }

        public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
            int batchSize,
            CancellationToken cancellationToken = default
        ) => Task.FromResult<IReadOnlyList<MessageOutboxEntry>>([]);

        public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
            int batchSize,
            CancellationToken cancellationToken = default
        ) => Task.FromResult<IReadOnlyList<MessageOutboxEntry>>([]);
    }

    private sealed record RecordSnapshot(
        string MessageType,
        string Exchange,
        string RoutingKey,
        string Payload,
        string? Source,
        string? LastError
    );
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个单元测试文件,专门测试 OutboxMessagePublisher<TMessage> 装饰器类的行为。让我详细解释这个测试类的各个部分:

测试目的

这个测试类主要验证 OutboxMessagePublisher 装饰器在消息发布失败时能否正确地驱动 IMessageOutboxStore 的状态转换。采用了测试金字塔策略:

  • 单元测试:覆盖失败路径(强制内部发布器抛异常)
  • 集成测试:由 OutboxEndToEndIntegrationTests 使用真实 RabbitMQ 验证成功路径

主要测试用例

1. PublishAsync_ShouldCreateThenMarkPublishFailed_WhenInnerThrows

[Fact]
public async Task PublishAsync_ShouldCreateThenMarkPublishFailed_WhenInnerThrows()
  • 目的:验证单条消息发布失败时的 Outbox 操作顺序
  • 验证点
    • 操作顺序:先 CreateMarkPublishFailed
    • Outbox 记录的字段正确性(exchange、routing key、payload等)
    • 错误信息被正确记录

2. PublishAsync_ShouldUseCustomRoutingKey_WhenProvided

[Fact]
public async Task PublishAsync_ShouldUseCustomRoutingKey_WhenProvided()
  • 目的:验证自定义 routing key 功能
  • 验证点:当提供自定义 routing key 时,Outbox 记录使用该值而非默认值

3. PublishBatchAsync_ShouldCreateAllThenMarkAllFailed_WhenInnerThrows

[Fact]
public async Task PublishBatchAsync_ShouldCreateAllThenMarkAllFailed_WhenInnerThrows()
  • 目的:验证批量消息发布失败时的处理
  • 验证点
    • 所有消息都先执行 Create 操作
    • 然后所有消息都执行 MarkPublishFailed 操作
    • 操作顺序正确

4. PublishBatchAsync_ShouldDoNothing_WhenMessagesEmpty

[Fact]
public async Task PublishBatchAsync_ShouldDoNothing_WhenMessagesEmpty()
  • 目的:验证空批量的边界情况
  • 验证点:空消息列表不会产生任何 Outbox 操作

辅助方法和类

Mock 构建方法

private static Mock<IMessageRoutingConvention> BuildRoutingConventionMock()
  • 创建路由约定的 Mock 对象,返回固定的测试值
private static RabbitMQPublisher<TestMessage> BuildFailingInnerPublisher()
  • 创建一个会失败的内部发布器
  • 通过 Mock IRabbitMQConnectionFactory 在创建 Channel 时抛异常来模拟失败

测试辅助类

TestMessage

private sealed class TestMessage : MessageBase { }
  • 简单的测试消息类型

InMemoryOutboxStore

private sealed class InMemoryOutboxStore : IMessageOutboxStore
  • 内存实现的 Outbox 存储,用于测试
  • 核心功能
    • Operations:记录操作顺序(用于断言操作序列)
    • Records:存储消息记录快照(用于断言字段值)
  • 实现了 IMessageOutboxStore 的所有方法,但只在测试关心的方法中记录操作

RecordSnapshot

private sealed record RecordSnapshot(...)
  • 记录 Outbox 条目的快照,便于测试验证字段值

测试策略特点

  1. 失败路径专注:专门测试异常情况,确保错误处理正确
  2. 操作顺序验证:通过记录操作序列来验证 Outbox 状态机的正确转换
  3. 字段完整性检查:验证 Outbox 记录中的所有关键字段
  4. 边界情况覆盖:包含空批量等边界情况
  5. Mock 隔离:使用 Mock 隔离外部依赖,确保测试的独立性和可重复性

这种测试设计很好地体现了单元测试的最佳实践:专注于单一职责、快速执行、与外部依赖隔离。

评论加载中...