using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MongodbAccess;
using Dpz.Core.Public.Entity;
using Dpz.Core.Service.RepositoryService;
using Dpz.Core.Service.RepositoryServiceImpl;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
namespace Dpz.Core.MessageQueue.Test.Integration;
/// <summary>
/// 真实 MongoDB 集成测试,验证 <see cref="MongoMessageOutboxStore"/> 的状态机与查询语义。
/// 依赖 appsettings.Test.json 中 ConnectionStrings:mongodb 指向可用 Mongo(与其他 *.Test 项目共用)。
/// 每个用例使用独立 MessageId 前缀避免相互干扰,并在结束时清理。
/// </summary>
[Trait("Category", "Integration")]
[Collection(nameof(MongoOutboxTestCollection))]
public class MongoMessageOutboxStoreIntegrationTests(MongoOutboxFixture fixture) : IAsyncLifetime
{
private readonly string _testRunId = $"test-{Guid.NewGuid():N}";
public Task InitializeAsync() => Task.CompletedTask;
public async Task DisposeAsync()
{
var filter = Builders<MessageOutboxRecord>.Filter.Where(x =>
x.MessageId.StartsWith(_testRunId)
);
await fixture.Repository.Collection.DeleteManyAsync(filter);
}
[Fact]
public async Task CreateAsync_ShouldPersistRecordWithPendingStatus()
{
var sut = fixture.BuildStore();
var messageId = Id("create");
await sut.CreateAsync(
messageId,
"Some.Test.Type",
"ex.create",
"rk.create",
"{\"foo\":\"bar\"}",
"src-create"
);
var record = await GetRecordAsync(messageId);
Assert.NotNull(record);
Assert.Equal("Some.Test.Type", record!.MessageType);
Assert.Equal("ex.create", record.Exchange);
Assert.Equal("rk.create", record.RoutingKey);
Assert.Equal("{\"foo\":\"bar\"}", record.Payload);
Assert.Equal("src-create", record.Source);
Assert.Equal(OutboxMessageStatus.Pending, record.Status);
Assert.Equal(0, record.PublishAttempts);
Assert.Null(record.SentAt);
Assert.Null(record.LastPublishAttemptAt);
}
[Fact]
public async Task MarkSentAsync_ShouldFlipStatusToSentAndStampSentAt()
{
var sut = fixture.BuildStore();
var messageId = Id("sent");
await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);
await sut.MarkSentAsync(messageId);
var record = await GetRecordAsync(messageId);
Assert.NotNull(record);
Assert.Equal(OutboxMessageStatus.Sent, record!.Status);
Assert.NotNull(record.SentAt);
}
[Fact]
public async Task MarkPublishFailedAsync_ShouldRecordAttemptsAndScheduleNextRetry()
{
var sut = fixture.BuildStore();
var messageId = Id("publish-failed");
await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);
await sut.MarkPublishFailedAsync(messageId, "boom-1");
var record = await GetRecordAsync(messageId);
Assert.NotNull(record);
Assert.Equal(OutboxMessageStatus.PublishFailed, record!.Status);
Assert.Equal(1, record.PublishAttempts);
Assert.Equal("boom-1", record.LastPublishError);
Assert.NotNull(record.LastPublishAttemptAt);
Assert.NotNull(record.NextPublishRetryAt);
// 第一次失败:退避 = 2^1 = 2 分钟
var expectedDelay = TimeSpan.FromMinutes(2);
var actualDelay = record.NextPublishRetryAt!.Value - record.LastPublishAttemptAt!.Value;
Assert.InRange(
actualDelay,
expectedDelay - TimeSpan.FromSeconds(2),
expectedDelay + TimeSpan.FromSeconds(2)
);
// 第二次失败:attempts=2,退避 = 2^2 = 4 分钟,PublishAttempts 累加
await sut.MarkPublishFailedAsync(messageId, "boom-2");
record = await GetRecordAsync(messageId);
Assert.Equal(2, record!.PublishAttempts);
Assert.Equal("boom-2", record.LastPublishError);
Assert.Equal(OutboxMessageStatus.PublishFailed, record.Status);
}
[Fact]
public async Task MarkPublishFailedAsync_ShouldNotThrow_WhenRecordMissing()
{
var sut = fixture.BuildStore();
// 未先 Create 直接 MarkFail - 不应抛异常,仅警告日志(实现内会 return 早出)
await sut.MarkPublishFailedAsync(Id("missing"), "no record");
}
[Fact]
public async Task MarkConsumedAsync_ShouldFlipStatusToConsumed()
{
var sut = fixture.BuildStore();
var messageId = Id("consumed");
await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);
await sut.MarkSentAsync(messageId);
await sut.MarkConsumedAsync(messageId);
var record = await GetRecordAsync(messageId);
Assert.Equal(OutboxMessageStatus.Consumed, record!.Status);
Assert.NotNull(record.ConsumedAt);
}
[Fact]
public async Task MarkConsumeFailedAsync_ShouldAccumulateAttemptsWithBackoff()
{
var sut = fixture.BuildStore();
var messageId = Id("consume-failed");
await sut.CreateAsync(messageId, "T", "ex", "rk", "{}", null);
await sut.MarkConsumeFailedAsync(messageId, "consume err 1");
var record = await GetRecordAsync(messageId);
Assert.Equal(OutboxMessageStatus.ConsumeFailed, record!.Status);
Assert.Equal(1, record.ConsumeAttempts);
Assert.Equal("consume err 1", record.LastConsumeError);
Assert.NotNull(record.NextConsumeRetryAt);
await sut.MarkConsumeFailedAsync(messageId, "consume err 2");
record = await GetRecordAsync(messageId);
Assert.Equal(2, record!.ConsumeAttempts);
Assert.Equal("consume err 2", record.LastConsumeError);
}
[Fact]
public async Task GetPendingPublishRetryAsync_ShouldReturnDueFailedRecordsOnly()
{
var sut = fixture.BuildStore();
var dueId = Id("pub-due");
var notDueId = Id("pub-not-due");
var sentId = Id("pub-sent");
await sut.CreateAsync(dueId, "T", "ex.due", "rk.due", "{\"a\":1}", null);
await sut.CreateAsync(notDueId, "T", "ex.nd", "rk.nd", "{}", null);
await sut.CreateAsync(sentId, "T", "ex.sent", "rk.sent", "{}", null);
// dueId: 标记为发布失败 + 强制让 NextPublishRetryAt 已到期
await sut.MarkPublishFailedAsync(dueId, "err");
await ForcePublishRetryAtAsync(dueId, DateTime.Now.AddMinutes(-1));
// notDueId: 标记失败但保持默认 NextPublishRetryAt(未来)
await sut.MarkPublishFailedAsync(notDueId, "err");
// sentId: 已发送 - 不应被返回
await sut.MarkSentAsync(sentId);
var entries = await sut.GetPendingPublishRetryAsync(50);
var entryIds = entries.Select(x => x.MessageId).ToList();
Assert.Contains(dueId, entryIds);
Assert.DoesNotContain(notDueId, entryIds);
Assert.DoesNotContain(sentId, entryIds);
var entry = entries.Single(x => x.MessageId == dueId);
Assert.Equal("ex.due", entry.Exchange);
Assert.Equal("rk.due", entry.RoutingKey);
Assert.Equal("{\"a\":1}", entry.Payload);
Assert.Equal(1, entry.PublishAttempts);
}
[Fact]
public async Task GetPendingConsumeRetryAsync_ShouldReturnDueFailedRecordsOnly()
{
var sut = fixture.BuildStore();
var dueId = Id("con-due");
var notDueId = Id("con-not-due");
await sut.CreateAsync(dueId, "T", "ex.cdue", "rk.cdue", "{}", null);
await sut.CreateAsync(notDueId, "T", "ex.cnd", "rk.cnd", "{}", null);
await sut.MarkConsumeFailedAsync(dueId, "err");
await ForceConsumeRetryAtAsync(dueId, DateTime.Now.AddMinutes(-1));
await sut.MarkConsumeFailedAsync(notDueId, "err");
var entries = await sut.GetPendingConsumeRetryAsync(50);
Assert.Contains(entries, x => x.MessageId == dueId);
Assert.DoesNotContain(entries, x => x.MessageId == notDueId);
}
[Fact]
public async Task GetPendingPublishRetryAsync_ShouldRespectBatchSize()
{
var sut = fixture.BuildStore();
for (var i = 0; i < 5; i++)
{
var id = Id($"batch-{i}");
await sut.CreateAsync(id, "T", "ex", "rk", "{}", null);
await sut.MarkPublishFailedAsync(id, "err");
await ForcePublishRetryAtAsync(id, DateTime.Now.AddMinutes(-1));
}
var top2 = await sut.GetPendingPublishRetryAsync(2);
// batchSize 限制返回数量
Assert.True(top2.Count <= 2);
}
private string Id(string suffix) => $"{_testRunId}-{suffix}";
private Task<MessageOutboxRecord?> GetRecordAsync(string messageId) =>
fixture.Repository.SearchFor(x => x.MessageId == messageId).FirstOrDefaultAsync()!;
private Task ForcePublishRetryAtAsync(string messageId, DateTime when) =>
fixture.Repository.UpdateAsync(
x => x.MessageId == messageId,
Builders<MessageOutboxRecord>.Update.Set(x => x.NextPublishRetryAt, when)
);
private Task ForceConsumeRetryAtAsync(string messageId, DateTime when) =>
fixture.Repository.UpdateAsync(
x => x.MessageId == messageId,
Builders<MessageOutboxRecord>.Update.Set(x => x.NextConsumeRetryAt, when)
);
}
[CollectionDefinition(nameof(MongoOutboxTestCollection))]
public class MongoOutboxTestCollection : ICollectionFixture<MongoOutboxFixture> { }
public class MongoOutboxFixture : IDisposable
{
private readonly ServiceProvider _provider;
private readonly IServiceScope _rootScope;
public MongoOutboxFixture()
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.Test.json", optional: true)
.AddEnvironmentVariables()
.Build();
var connection = configuration.GetConnectionString("mongodb");
if (string.IsNullOrWhiteSpace(connection))
{
throw new InvalidOperationException(
"MongoOutboxFixture 初始化失败:appsettings.Test.json 缺少 ConnectionStrings:mongodb"
);
}
var services = new ServiceCollection();
services.AddSingleton<IConfiguration>(configuration);
services.AddScoped(typeof(IRepository<>), typeof(Repository<>));
services.AddScoped<IMongoMessageOutboxStore, MongoMessageOutboxStore>();
services.AddSingleton<Microsoft.Extensions.Logging.ILogger<MongoMessageOutboxStore>>(
NullLogger<MongoMessageOutboxStore>.Instance
);
_provider = services.BuildServiceProvider();
// 用根作用域获取一个仓储实例用于测试外部读写(清理 + 强制刷新 NextRetryAt)。
// BuildStore() 走每次新作用域,模拟生产侧 ScopedOutboxStoreAdapter 的语义。
_rootScope = _provider.CreateScope();
Repository = _rootScope.ServiceProvider.GetRequiredService<
IRepository<MessageOutboxRecord>
>();
// 主动触发一次 ping,及早暴露连接问题
Repository.Database.RunCommand<MongoDB.Bson.BsonDocument>("{ping:1}");
}
public IRepository<MessageOutboxRecord> Repository { get; }
public IMessageOutboxStore BuildStore()
{
// 每次构建都打开独立 scope,与生产 ScopedOutboxStoreAdapter 行为对齐
var scope = _provider.CreateScope();
// 注意:scope 在 fixture 销毁前不会显式释放,但 ServiceProvider.Dispose 会兜底清理
return scope.ServiceProvider.GetRequiredService<IMongoMessageOutboxStore>();
}
public void Dispose()
{
_rootScope.Dispose();
_provider.Dispose();
GC.SuppressFinalize(this);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于测试MongoDB消息发件箱(Message Outbox)功能的集成测试类。让我详细解释这个代码的结构和功能:
主要组成部分
1. 测试类 MongoMessageOutboxStoreIntegrationTests
这是主要的集成测试类,用于验证MongoMessageOutboxStore的状态机和查询功能:
特点:
- 使用真实的MongoDB数据库进行集成测试
- 每个测试用例使用独立的消息ID前缀(
_testRunId)避免相互干扰 - 实现了
IAsyncLifetime接口,在测试结束时自动清理数据
主要测试场景:
消息创建测试 (
CreateAsync_ShouldPersistRecordWithPendingStatus)- 验证创建消息记录时状态为Pending
- 检查各字段正确存储
消息发送标记 (
MarkSentAsync_ShouldFlipStatusToSentAndStampSentAt)- 验证消息状态从Pending变为Sent
- 检查发送时间戳被正确设置
发布失败处理 (
MarkPublishFailedAsync_ShouldRecordAttemptsAndScheduleNextRetry)- 验证失败重试机制
- 检查指数退避算法(2^n分钟)
- 验证重试次数累加
消费状态管理
- 消费成功标记 (
MarkConsumedAsync) - 消费失败重试 (
MarkConsumeFailedAsync)
- 消费成功标记 (
批量查询功能
- 获取待重试发布的消息 (
GetPendingPublishRetryAsync) - 获取待重试消费的消息 (
GetPendingConsumeRetryAsync) - 验证批量大小限制
- 获取待重试发布的消息 (
2. 测试固件 MongoOutboxFixture
这是一个共享的测试固件,为所有测试提供MongoDB连接和服务:
功能:
- 从配置文件读取MongoDB连接字符串
- 配置依赖注入容器
- 提供Repository实例用于数据访问
- 为每个测试构建独立的Store实例
关键设计:
public IMessageOutboxStore BuildStore()
{
// 每次构建都打开独立 scope,模拟生产环境的作用域管理
var scope = _provider.CreateScope();
return scope.ServiceProvider.GetRequiredService<IMongoMessageOutboxStore>();
}
3. 测试集合定义 MongoOutboxTestCollection
使用xUnit的集合固件模式,确保所有测试共享同一个fixture实例,避免重复初始化数据库连接。
技术特点
状态机验证
测试覆盖了消息的完整生命周期:
Pending → Sent → Consumed
↓ ↓
PublishFailed → 重试 → Sent
↓
ConsumeFailed → 重试 → Consumed
重试机制
- 指数退避:第n次失败后等待2^n分钟
- 重试计数:累计失败次数
- 时间戳管理:记录最后重试时间和下次重试时间
数据隔离
- 每个测试运行使用唯一ID前缀
- 测试结束后自动清理数据
- 避免测试间相互影响
配置管理
- 支持从
appsettings.Test.json读取配置 - 支持环境变量覆盖
- 连接失败时提供清晰的错误信息
测试价值
这个集成测试确保了:
- 数据持久化正确性
- 状态转换逻辑准确性
- 重试机制的可靠性
- 查询语义的正确性
- 并发安全(通过独立作用域)
这是一个设计良好的集成测试,既验证了核心业务逻辑,又确保了与真实数据库的兼容性。
评论加载中...