using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Extensions;
using Dpz.Core.MessageQueue.RabbitMQ;
using Dpz.Core.MongodbAccess;
using Dpz.Core.Public.Entity;
using Dpz.Core.Service;
using Dpz.Core.Service.RepositoryService;
using Dpz.Core.Service.RepositoryServiceImpl;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MongoDB.Driver;
using MongoDB.Driver.Linq;

namespace Dpz.Core.MessageQueue.Test.Integration;

/// <summary>
/// 端到端集成测试:真实 RabbitMQ + 真实 MongoDB,覆盖整条 Outbox 兜底链路。
///
/// 1. <see cref="OutboxMessagePublisher{T}"/> 发布成功 -> Mongo 中状态为 Sent
/// 2. 发布失败(RabbitMQ 不可用)-> Mongo 中状态为 PublishFailed,含退避时间
/// 3. <see cref="RabbitMQRawPublisher"/> 重发已存载荷成功,可被 Outbox Job 配合使用
///
/// 依赖:appsettings.Test.json 中 ConnectionStrings:mongodb 与 RabbitMQ:HostName 等可用。
/// </summary>
[Trait("Category", "Integration")]
[Collection(nameof(OutboxEndToEndCollection))]
public class OutboxEndToEndIntegrationTests(OutboxEndToEndFixture fixture) : IAsyncLifetime
{
    private readonly string _testRunId = $"e2e-{Guid.NewGuid():N}";

    public Task InitializeAsync() => Task.CompletedTask;

    public async Task DisposeAsync()
    {
        var filter = Builders<MessageOutboxRecord>.Filter.Where(x =>
            x.MessageId.StartsWith(_testRunId)
        );
        await fixture.Repository.Collection.DeleteManyAsync(filter);
    }

    [Fact]
    public async Task PublishViaOutbox_ShouldPersistAsSent_WhenRabbitMQAvailable()
    {
        var publisher = fixture.GetPublisher<E2EMessage>();
        var message = new E2EMessage
        {
            MessageId = Id("ok"),
            Source = "e2e",
            Payload = "hello",
        };

        await publisher.PublishAsync(message);

        // 等待 Mongo 中状态变为 Sent(写入是同步的,加少量退避以应对网络抖动)
        var record = await PollUntilAsync(
            message.MessageId,
            x => x?.Status == OutboxMessageStatus.Sent
        );

        Assert.NotNull(record);
        Assert.Equal(OutboxMessageStatus.Sent, record.Status);
        Assert.NotNull(record.SentAt);
        Assert.Equal(0, record.PublishAttempts);
        Assert.Equal("e2e", record.Source);
        Assert.Contains("\"Payload\":\"hello\"", record.Payload);
    }

    [Fact]
    public async Task PublishViaOutbox_ShouldPersistAsPublishFailed_WhenRabbitMQUnavailable()
    {
        // 使用一个绑定到错误端口的发布者,强制发布失败
        var failingPublisher = fixture.GetFailingPublisher<E2EMessage>();
        var message = new E2EMessage { MessageId = Id("fail"), Source = "e2e" };

        await Assert.ThrowsAnyAsync<Exception>(() => failingPublisher.PublishAsync(message));

        var record = await PollUntilAsync(
            message.MessageId,
            x => x?.Status == OutboxMessageStatus.PublishFailed
        );

        Assert.NotNull(record);
        Assert.Equal(OutboxMessageStatus.PublishFailed, record.Status);
        Assert.Equal(1, record.PublishAttempts);
        Assert.NotNull(record.LastPublishAttemptAt);
        Assert.NotNull(record.NextPublishRetryAt);
        Assert.False(string.IsNullOrEmpty(record.LastPublishError));
    }

    [Fact]
    public async Task RetryPublisher_ShouldFlipFailedRecordToSent_WhenInvokedWithStoredPayload()
    {
        var publisher = fixture.GetFailingPublisher<E2EMessage>();
        var message = new E2EMessage { MessageId = Id("retry"), Source = "retry-src" };

        // 第一阶段:制造一条 PublishFailed 记录
        await Assert.ThrowsAnyAsync<Exception>(() => publisher.PublishAsync(message));
        var failed = await PollUntilAsync(
            message.MessageId,
            x => x?.Status == OutboxMessageStatus.PublishFailed
        );
        Assert.NotNull(failed);
        Assert.Equal(OutboxMessageStatus.PublishFailed, failed.Status);

        // 第二阶段:模拟 Outbox 后台重试执行器的工作流
        // 1) 通过 GetPendingPublishRetry 拉取待重试条目(强制 NextPublishRetryAt 已到期)
        await fixture.Repository.UpdateAsync(
            x => x.MessageId == message.MessageId,
            Builders<MessageOutboxRecord>.Update.Set(
                x => x.NextPublishRetryAt,
                DateTime.Now.AddMinutes(-1)
            )
        );

        var entries = await fixture.OutboxStore.GetPendingPublishRetryAsync(50);
        var entry = entries.FirstOrDefault(x => x.MessageId == message.MessageId);
        Assert.NotNull(entry);

        // 2) RabbitMQRawPublisher 使用真实可用 RabbitMQ 重发
        await fixture.RawPublisher.PublishRawAsync(
            entry.Exchange,
            entry.RoutingKey,
            entry.MessageId,
            entry.Payload
        );
        await fixture.OutboxStore.MarkSentAsync(entry.MessageId);

        // 3) 验证状态切换为 Sent
        var sent = await PollUntilAsync(
            message.MessageId,
            x => x?.Status == OutboxMessageStatus.Sent
        );
        Assert.NotNull(sent);
        Assert.Equal(OutboxMessageStatus.Sent, sent.Status);
        Assert.NotNull(sent.SentAt);
    }

    private string Id(string suffix) => $"{_testRunId}-{suffix}";

    private async Task<MessageOutboxRecord?> PollUntilAsync(
        string messageId,
        Func<MessageOutboxRecord?, bool> predicate,
        int timeoutMs = 5000
    )
    {
        var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
        MessageOutboxRecord? current = null;
        while (DateTime.UtcNow < deadline)
        {
            current = await fixture
                .Repository.SearchFor(x => x.MessageId == messageId)
                .FirstOrDefaultAsync();
            if (predicate(current))
            {
                return current;
            }
            await Task.Delay(100);
        }
        return current;
    }
}

/// <summary>
/// E2E 测试用消息载荷类型。命名空间和类型独立,避免与生产消息共享 Exchange/Queue。
/// </summary>
public class E2EMessage : MessageBase
{
    public string Payload { get; set; } = string.Empty;
}

[CollectionDefinition(nameof(OutboxEndToEndCollection))]
public class OutboxEndToEndCollection : ICollectionFixture<OutboxEndToEndFixture> { }

/// <summary>
/// E2E 集成测试 fixture:
/// - 真实连接到 RabbitMQ(按 appsettings.Test.json 的 RabbitMQ:* 配置)
/// - 真实连接到 Mongo
/// - 提供两个发布者:正常发布者 (host=localhost) 与故意失败的发布者 (HostName 指向无效地址)
///
/// 实现 <see cref="IAsyncDisposable"/>:因 <c>RabbitMQPublisher&lt;T&gt;</c> 只实现了
/// <c>IAsyncDisposable</c>,<c>ServiceProvider.Dispose()</c> 会抛 InvalidOperationException,
/// 必须使用 <c>DisposeAsync()</c>。
/// </summary>
public class OutboxEndToEndFixture : IAsyncDisposable
{
    private readonly ServiceProvider _provider;
    private readonly ServiceProvider _failingProvider;
    private readonly IServiceScope _rootScope;

    public OutboxEndToEndFixture()
    {
        var configuration = new ConfigurationBuilder()
            .AddJsonFile("appsettings.Test.json", optional: true)
            .AddEnvironmentVariables()
            .Build();

        var mongoConnection = configuration.GetConnectionString("mongodb");
        if (string.IsNullOrWhiteSpace(mongoConnection))
        {
            throw new InvalidOperationException(
                "OutboxEndToEndFixture 初始化失败:appsettings.Test.json 缺少 ConnectionStrings:mongodb"
            );
        }

        _provider = BuildProvider(configuration, failing: false);
        _rootScope = _provider.CreateScope();
        Repository = _rootScope.ServiceProvider.GetRequiredService<
            IRepository<MessageOutboxRecord>
        >();
        Repository.Database.RunCommand<MongoDB.Bson.BsonDocument>("{ping:1}");

        OutboxStore = _provider.GetRequiredService<IMessageOutboxStore>();
        RawPublisher = _provider.GetRequiredService<IMessageOutboxRetryPublisher>();

        // Failing provider:复制配置但 RabbitMQ 指向无效地址
        var failingConfig = new ConfigurationBuilder()
            .AddConfiguration(configuration)
            .AddInMemoryCollection(
                new Dictionary<string, string?>
                {
                    ["RabbitMQ:HostName"] = "127.0.0.1",
                    ["RabbitMQ:Port"] = "1", // 几乎一定不可用
                    ["RabbitMQ:RequestedConnectionTimeout"] = "1",
                }
            )
            .Build();
        _failingProvider = BuildProvider(failingConfig, failing: true);
    }

    public IRepository<MessageOutboxRecord> Repository { get; }

    public IMessageOutboxStore OutboxStore { get; }

    public IMessageOutboxRetryPublisher RawPublisher { get; }

    public IMessagePublisher<T> GetPublisher<T>()
        where T : MessageBase => _provider.GetRequiredService<IMessagePublisher<T>>();

    public IMessagePublisher<T> GetFailingPublisher<T>()
        where T : MessageBase => _failingProvider.GetRequiredService<IMessagePublisher<T>>();

    private static ServiceProvider BuildProvider(IConfiguration configuration, bool failing)
    {
        var services = new ServiceCollection();
        services.AddSingleton(configuration);
        services.AddLogging();

        // RabbitMQ 基础设施
        services.AddRabbitMQ(configuration);

        // 注册 Mongo 仓储
        services.AddScoped(typeof(IRepository<>), typeof(Repository<>));

        // 注册 IMongoMessageOutboxStore(生产实现)+ 启用 Outbox 装饰器
        services.AddScoped<IMongoMessageOutboxStore, MongoMessageOutboxStore>();
        services.AddSingleton<ILogger<MongoMessageOutboxStore>>(
            NullLogger<MongoMessageOutboxStore>.Instance
        );
        services.AddMessageOutbox();

        return services.BuildServiceProvider();
    }

    public async ValueTask DisposeAsync()
    {
        _rootScope.Dispose();
        await _provider.DisposeAsync();
        await _failingProvider.DisposeAsync();
        GC.SuppressFinalize(this);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个用于消息队列 Outbox 模式的端到端集成测试代码,主要测试 RabbitMQ + MongoDB 的完整消息发布和兜底重试机制。让我详细解释各个部分:

整体架构

这个测试实现了 Outbox 模式,这是一种确保分布式系统中消息可靠发送的设计模式:

  1. 先将消息存储到本地数据库(MongoDB)
  2. 再发送到消息队列(RabbitMQ)
  3. 如果发送失败,后台任务会重试

核心测试类:OutboxEndToEndIntegrationTests

主要测试场景

  1. 正常发布测试 (PublishViaOutbox_ShouldPersistAsSent_WhenRabbitMQAvailable)

    // 当RabbitMQ可用时,消息应该:
    // - 被成功发送到RabbitMQ
    // - 在MongoDB中状态标记为 "Sent"
    // - 记录发送时间
    
  2. 发布失败测试 (PublishViaOutbox_ShouldPersistAsPublishFailed_WhenRabbitMQUnavailable)

    // 当RabbitMQ不可用时,消息应该:
    // - 抛出异常
    // - 在MongoDB中状态标记为 "PublishFailed"
    // - 记录重试次数和下次重试时间
    
  3. 重试机制测试 (RetryPublisher_ShouldFlipFailedRecordToSent_WhenInvokedWithStoredPayload)

    // 模拟后台重试任务:
    // 1. 先制造一个失败的消息记录
    // 2. 模拟后台任务获取待重试的消息
    // 3. 使用可用的RabbitMQ重新发送
    // 4. 验证状态从 "PublishFailed" 变为 "Sent"
    

关键辅助方法

  • PollUntilAsync: 轮询等待数据库状态变化,处理异步操作的时序问题
  • Id(): 生成唯一测试ID,避免测试间相互干扰
  • DisposeAsync(): 清理测试数据

测试基础设施:OutboxEndToEndFixture

这是测试的核心支撑类,提供:

双重发布者设计

public IMessagePublisher<T> GetPublisher<T>()        // 正常发布者
public IMessagePublisher<T> GetFailingPublisher<T>() // 故意失败的发布者
  • 正常发布者:连接到真实的 RabbitMQ
  • 失败发布者:连接到无效端口 (127.0.0.1:1),用于测试失败场景

依赖注入配置

services.AddRabbitMQ(configuration);           // RabbitMQ 基础设施
services.AddScoped<IRepository<>>();           // MongoDB 仓储
services.AddScoped<IMongoMessageOutboxStore>(); // Outbox 存储实现
services.AddMessageOutbox();                   // 启用 Outbox 装饰器

配置管理

  • appsettings.Test.json 读取配置
  • 对于失败测试,动态覆盖 RabbitMQ 配置指向无效地址

消息模型:E2EMessage

public class E2EMessage : MessageBase
{
    public string Payload { get; set; } = string.Empty;
}

继承 MessageBase,包含消息ID、来源等基础字段,专门用于端到端测试。

技术特点

  1. 真实环境测试:使用真实的 RabbitMQ 和 MongoDB,不是 Mock
  2. 状态机验证:验证消息在不同场景下的状态转换
  3. 异步处理:正确处理异步操作和状态轮询
  4. 资源清理:实现了完整的资源清理机制
  5. 隔离性:每个测试使用唯一ID,避免相互影响

使用场景

这类测试适合:

  • 验证消息可靠性保证机制
  • 测试分布式事务的一致性
  • 验证故障恢复和重试逻辑
  • 端到端的系统集成验证

这是一个非常完整和专业的集成测试实现,体现了现代微服务架构中消息可靠性保证的最佳实践。

评论加载中...