using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MongodbAccess;
using Dpz.Core.Public.Entity;
using Dpz.Core.Service.RepositoryService;
using Dpz.Core.Service.RepositoryServiceImpl;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using MongoDB.Driver;
using MongoDB.Driver.Linq;

namespace Dpz.Core.MessageQueue.Test.Integration;

/// <summary>
/// 真实 MongoDB 集成测试,验证 <see cref="MongoMessageOutboxStore"/> 的状态机与查询语义。
/// 依赖 appsettings.Test.json 中 ConnectionStrings:mongodb 指向可用 Mongo(与其他 *.Test 项目共用)。
/// 每个用例使用独立 MessageId 前缀避免相互干扰,并在结束时清理。
/// </summary>
[Trait("Category", "Integration")]
[Collection(nameof(MongoOutboxTestCollection))]
public class MongoMessageOutboxStoreIntegrationTests(MongoOutboxFixture fixture) : IAsyncLifetime
{
    private readonly string _testRunId = $"test-{Guid.NewGuid():N}";

    public Task InitializeAsync() => Task.CompletedTask;

    public async Task DisposeAsync()
    {
        var filter = Builders<MessageOutboxRecord>.Filter.Where(x =>
            x.MessageId.StartsWith(_testRunId)
        );
        await fixture.Repository.Collection.DeleteManyAsync(filter);
    }

    [Fact]
    public async Task CreateAsync_ShouldPersistRecordWithPendingStatus()
    {
        var sut = fixture.BuildStore();
        var messageId = Id("create");

        await sut.CreateAsync(
            messageId,
            "Some.Test.Type",
            "ex.create",
            "rk.create",
            "{\"foo\":\"bar\"}",
            "src-create"
        );

        var record = await GetRecordAsync(messageId);
        Assert.NotNull(record);
        Assert.Equal("Some.Test.Type", record!.MessageType);
        Assert.Equal("ex.create", record.Exchange);
        Assert.Equal("rk.create", record.RoutingKey);
        Assert.Equal("{\"foo\":\"bar\"}", record.Payload);
        Assert.Equal("src-create", record.Source);
        Assert.Equal(OutboxMessageStatus.Pending, record.Status);
        Assert.Equal(0, record.PublishAttempts);
        Assert.Null(record.SentAt);
        Assert.Null(record.LastPublishAttemptAt);
    }

    [Fact]
    public async Task MarkSentAsync_ShouldFlipStatusToSentAndStampSentAt()
    {
        var sut = fixture.BuildStore();
        var messageId = Id("sent");
        await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);

        await sut.MarkSentAsync(messageId);

        var record = await GetRecordAsync(messageId);
        Assert.NotNull(record);
        Assert.Equal(OutboxMessageStatus.Sent, record!.Status);
        Assert.NotNull(record.SentAt);
    }

    [Fact]
    public async Task MarkPublishFailedAsync_ShouldRecordAttemptsAndScheduleNextRetry()
    {
        var sut = fixture.BuildStore();
        var messageId = Id("publish-failed");
        await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);

        await sut.MarkPublishFailedAsync(messageId, "boom-1");

        var record = await GetRecordAsync(messageId);
        Assert.NotNull(record);
        Assert.Equal(OutboxMessageStatus.PublishFailed, record!.Status);
        Assert.Equal(1, record.PublishAttempts);
        Assert.Equal("boom-1", record.LastPublishError);
        Assert.NotNull(record.LastPublishAttemptAt);
        Assert.NotNull(record.NextPublishRetryAt);

        // 第一次失败:退避 = 2^1 = 2 分钟
        var expectedDelay = TimeSpan.FromMinutes(2);
        var actualDelay = record.NextPublishRetryAt!.Value - record.LastPublishAttemptAt!.Value;
        Assert.InRange(
            actualDelay,
            expectedDelay - TimeSpan.FromSeconds(2),
            expectedDelay + TimeSpan.FromSeconds(2)
        );

        // 第二次失败:attempts=2,退避 = 2^2 = 4 分钟,PublishAttempts 累加
        await sut.MarkPublishFailedAsync(messageId, "boom-2");
        record = await GetRecordAsync(messageId);
        Assert.Equal(2, record!.PublishAttempts);
        Assert.Equal("boom-2", record.LastPublishError);
        Assert.Equal(OutboxMessageStatus.PublishFailed, record.Status);
    }

    [Fact]
    public async Task MarkPublishFailedAsync_ShouldNotThrow_WhenRecordMissing()
    {
        var sut = fixture.BuildStore();
        // 未先 Create 直接 MarkFail - 不应抛异常,仅警告日志(实现内会 return 早出)
        await sut.MarkPublishFailedAsync(Id("missing"), "no record");
    }

    [Fact]
    public async Task MarkConsumedAsync_ShouldFlipStatusToConsumed()
    {
        var sut = fixture.BuildStore();
        var messageId = Id("consumed");
        await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);
        await sut.MarkSentAsync(messageId);

        await sut.MarkConsumedAsync(messageId);

        var record = await GetRecordAsync(messageId);
        Assert.Equal(OutboxMessageStatus.Consumed, record!.Status);
        Assert.NotNull(record.ConsumedAt);
    }

    [Fact]
    public async Task MarkConsumeFailedAsync_ShouldAccumulateAttemptsWithBackoff()
    {
        var sut = fixture.BuildStore();
        var messageId = Id("consume-failed");
        await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);

        await sut.MarkConsumeFailedAsync(messageId, "consume err 1");
        var record = await GetRecordAsync(messageId);
        Assert.Equal(OutboxMessageStatus.ConsumeFailed, record!.Status);
        Assert.Equal(1, record.ConsumeAttempts);
        Assert.Equal("consume err 1", record.LastConsumeError);
        Assert.NotNull(record.NextConsumeRetryAt);

        await sut.MarkConsumeFailedAsync(messageId, "consume err 2");
        record = await GetRecordAsync(messageId);
        Assert.Equal(2, record!.ConsumeAttempts);
        Assert.Equal("consume err 2", record.LastConsumeError);
    }

    [Fact]
    public async Task GetPendingPublishRetryAsync_ShouldReturnDueFailedRecordsOnly()
    {
        var sut = fixture.BuildStore();

        var dueId = Id("pub-due");
        var notDueId = Id("pub-not-due");
        var sentId = Id("pub-sent");

        await sut.CreateAsync(dueId, "T", "ex.due", "rk.due", "{\"a\":1}", null);
        await sut.CreateAsync(notDueId, "T", "ex.nd", "rk.nd", "{}", null);
        await sut.CreateAsync(sentId, "T", "ex.sent", "rk.sent", "{}", null);

        // dueId: 标记为发布失败 + 强制让 NextPublishRetryAt 已到期
        await sut.MarkPublishFailedAsync(dueId, "err");
        await ForcePublishRetryAtAsync(dueId, DateTime.Now.AddMinutes(-1));

        // notDueId: 标记失败但保持默认 NextPublishRetryAt(未来)
        await sut.MarkPublishFailedAsync(notDueId, "err");

        // sentId: 已发送 - 不应被返回
        await sut.MarkSentAsync(sentId);

        var entries = await sut.GetPendingPublishRetryAsync(50);

        var entryIds = entries.Select(x => x.MessageId).ToList();
        Assert.Contains(dueId, entryIds);
        Assert.DoesNotContain(notDueId, entryIds);
        Assert.DoesNotContain(sentId, entryIds);

        var entry = entries.Single(x => x.MessageId == dueId);
        Assert.Equal("ex.due", entry.Exchange);
        Assert.Equal("rk.due", entry.RoutingKey);
        Assert.Equal("{\"a\":1}", entry.Payload);
        Assert.Equal(1, entry.PublishAttempts);
    }

    [Fact]
    public async Task GetPendingConsumeRetryAsync_ShouldReturnDueFailedRecordsOnly()
    {
        var sut = fixture.BuildStore();

        var dueId = Id("con-due");
        var notDueId = Id("con-not-due");

        await sut.CreateAsync(dueId, "T", "ex.cdue", "rk.cdue", "{}", null);
        await sut.CreateAsync(notDueId, "T", "ex.cnd", "rk.cnd", "{}", null);

        await sut.MarkConsumeFailedAsync(dueId, "err");
        await ForceConsumeRetryAtAsync(dueId, DateTime.Now.AddMinutes(-1));

        await sut.MarkConsumeFailedAsync(notDueId, "err");

        var entries = await sut.GetPendingConsumeRetryAsync(50);

        Assert.Contains(entries, x => x.MessageId == dueId);
        Assert.DoesNotContain(entries, x => x.MessageId == notDueId);
    }

    [Fact]
    public async Task GetPendingPublishRetryAsync_ShouldRespectBatchSize()
    {
        var sut = fixture.BuildStore();

        for (var i = 0; i < 5; i++)
        {
            var id = Id($"batch-{i}");
            await sut.CreateAsync(id, "T", "ex", "rk", "{}", null);
            await sut.MarkPublishFailedAsync(id, "err");
            await ForcePublishRetryAtAsync(id, DateTime.Now.AddMinutes(-1));
        }

        var top2 = await sut.GetPendingPublishRetryAsync(2);

        // batchSize 限制返回数量
        Assert.True(top2.Count <= 2);
    }

    private string Id(string suffix) => $"{_testRunId}-{suffix}";

    private Task<MessageOutboxRecord?> GetRecordAsync(string messageId) =>
        fixture.Repository.SearchFor(x => x.MessageId == messageId).FirstOrDefaultAsync()!;

    private Task ForcePublishRetryAtAsync(string messageId, DateTime when) =>
        fixture.Repository.UpdateAsync(
            x => x.MessageId == messageId,
            Builders<MessageOutboxRecord>.Update.Set(x => x.NextPublishRetryAt, when)
        );

    private Task ForceConsumeRetryAtAsync(string messageId, DateTime when) =>
        fixture.Repository.UpdateAsync(
            x => x.MessageId == messageId,
            Builders<MessageOutboxRecord>.Update.Set(x => x.NextConsumeRetryAt, when)
        );
}

[CollectionDefinition(nameof(MongoOutboxTestCollection))]
public class MongoOutboxTestCollection : ICollectionFixture<MongoOutboxFixture> { }

public class MongoOutboxFixture : IDisposable
{
    private readonly ServiceProvider _provider;
    private readonly IServiceScope _rootScope;

    public MongoOutboxFixture()
    {
        var configuration = new ConfigurationBuilder()
            .AddJsonFile("appsettings.Test.json", optional: true)
            .AddEnvironmentVariables()
            .Build();

        var connection = configuration.GetConnectionString("mongodb");
        if (string.IsNullOrWhiteSpace(connection))
        {
            throw new InvalidOperationException(
                "MongoOutboxFixture 初始化失败:appsettings.Test.json 缺少 ConnectionStrings:mongodb"
            );
        }

        var services = new ServiceCollection();
        services.AddSingleton<IConfiguration>(configuration);
        services.AddScoped(typeof(IRepository<>), typeof(Repository<>));
        services.AddScoped<IMongoMessageOutboxStore, MongoMessageOutboxStore>();
        services.AddSingleton<Microsoft.Extensions.Logging.ILogger<MongoMessageOutboxStore>>(
            NullLogger<MongoMessageOutboxStore>.Instance
        );
        _provider = services.BuildServiceProvider();

        // 用根作用域获取一个仓储实例用于测试外部读写(清理 + 强制刷新 NextRetryAt)。
        // BuildStore() 走每次新作用域,模拟生产侧 ScopedOutboxStoreAdapter 的语义。
        _rootScope = _provider.CreateScope();
        Repository = _rootScope.ServiceProvider.GetRequiredService<
            IRepository<MessageOutboxRecord>
        >();
        // 主动触发一次 ping,及早暴露连接问题
        Repository.Database.RunCommand<MongoDB.Bson.BsonDocument>("{ping:1}");
    }

    public IRepository<MessageOutboxRecord> Repository { get; }

    public IMessageOutboxStore BuildStore()
    {
        // 每次构建都打开独立 scope,与生产 ScopedOutboxStoreAdapter 行为对齐
        var scope = _provider.CreateScope();
        // 注意:scope 在 fixture 销毁前不会显式释放,但 ServiceProvider.Dispose 会兜底清理
        return scope.ServiceProvider.GetRequiredService<IMongoMessageOutboxStore>();
    }

    public void Dispose()
    {
        _rootScope.Dispose();
        _provider.Dispose();
        GC.SuppressFinalize(this);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个用于测试MongoDB消息发件箱(Message Outbox)功能的集成测试类。让我详细解释这个代码的结构和功能:

主要组成部分

1. 测试类 MongoMessageOutboxStoreIntegrationTests

这是主要的集成测试类,用于验证MongoMessageOutboxStore的状态机和查询功能:

特点:

  • 使用真实的MongoDB数据库进行集成测试
  • 每个测试用例使用独立的消息ID前缀(_testRunId)避免相互干扰
  • 实现了IAsyncLifetime接口,在测试结束时自动清理数据

主要测试场景:

  1. 消息创建测试 (CreateAsync_ShouldPersistRecordWithPendingStatus)

    • 验证创建消息记录时状态为Pending
    • 检查各字段正确存储
  2. 消息发送标记 (MarkSentAsync_ShouldFlipStatusToSentAndStampSentAt)

    • 验证消息状态从Pending变为Sent
    • 检查发送时间戳被正确设置
  3. 发布失败处理 (MarkPublishFailedAsync_ShouldRecordAttemptsAndScheduleNextRetry)

    • 验证失败重试机制
    • 检查指数退避算法(2^n分钟)
    • 验证重试次数累加
  4. 消费状态管理

    • 消费成功标记 (MarkConsumedAsync)
    • 消费失败重试 (MarkConsumeFailedAsync)
  5. 批量查询功能

    • 获取待重试发布的消息 (GetPendingPublishRetryAsync)
    • 获取待重试消费的消息 (GetPendingConsumeRetryAsync)
    • 验证批量大小限制

2. 测试固件 MongoOutboxFixture

这是一个共享的测试固件,为所有测试提供MongoDB连接和服务:

功能:

  • 从配置文件读取MongoDB连接字符串
  • 配置依赖注入容器
  • 提供Repository实例用于数据访问
  • 为每个测试构建独立的Store实例

关键设计:

public IMessageOutboxStore BuildStore()
{
    // 每次构建都打开独立 scope,模拟生产环境的作用域管理
    var scope = _provider.CreateScope();
    return scope.ServiceProvider.GetRequiredService<IMongoMessageOutboxStore>();
}

3. 测试集合定义 MongoOutboxTestCollection

使用xUnit的集合固件模式,确保所有测试共享同一个fixture实例,避免重复初始化数据库连接。

技术特点

状态机验证

测试覆盖了消息的完整生命周期:

Pending → Sent → Consumed
    ↓       ↓
PublishFailed → 重试 → Sent
                ↓
            ConsumeFailed → 重试 → Consumed

重试机制

  • 指数退避:第n次失败后等待2^n分钟
  • 重试计数:累计失败次数
  • 时间戳管理:记录最后重试时间和下次重试时间

数据隔离

  • 每个测试运行使用唯一ID前缀
  • 测试结束后自动清理数据
  • 避免测试间相互影响

配置管理

  • 支持从appsettings.Test.json读取配置
  • 支持环境变量覆盖
  • 连接失败时提供清晰的错误信息

测试价值

这个集成测试确保了:

  1. 数据持久化正确性
  2. 状态转换逻辑准确性
  3. 重试机制的可靠性
  4. 查询语义的正确性
  5. 并发安全(通过独立作用域)

这是一个设计良好的集成测试,既验证了核心业务逻辑,又确保了与真实数据库的兼容性。

评论加载中...