using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Extensions;
using Dpz.Core.MessageQueue.RabbitMQ;
using Dpz.Core.MongodbAccess;
using Dpz.Core.Public.Entity;
using Dpz.Core.Service;
using Dpz.Core.Service.RepositoryService;
using Dpz.Core.Service.RepositoryServiceImpl;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
namespace Dpz.Core.MessageQueue.Test.Integration;
/// <summary>
/// 端到端集成测试:真实 RabbitMQ + 真实 MongoDB,覆盖整条 Outbox 兜底链路。
///
/// 1. <see cref="OutboxMessagePublisher{T}"/> 发布成功 -> Mongo 中状态为 Sent
/// 2. 发布失败(RabbitMQ 不可用)-> Mongo 中状态为 PublishFailed,含退避时间
/// 3. <see cref="RabbitMQRawPublisher"/> 重发已存载荷成功,可被 Outbox Job 配合使用
///
/// 依赖:appsettings.Test.json 中 ConnectionStrings:mongodb 与 RabbitMQ:HostName 等可用。
/// </summary>
[Trait("Category", "Integration")]
[Collection(nameof(OutboxEndToEndCollection))]
public class OutboxEndToEndIntegrationTests(OutboxEndToEndFixture fixture) : IAsyncLifetime
{
private readonly string _testRunId = $"e2e-{Guid.NewGuid():N}";
public Task InitializeAsync() => Task.CompletedTask;
public async Task DisposeAsync()
{
var filter = Builders<MessageOutboxRecord>.Filter.Where(x =>
x.MessageId.StartsWith(_testRunId)
);
await fixture.Repository.Collection.DeleteManyAsync(filter);
}
[Fact]
public async Task PublishViaOutbox_ShouldPersistAsSent_WhenRabbitMQAvailable()
{
var publisher = fixture.GetPublisher<E2EMessage>();
var message = new E2EMessage
{
MessageId = Id("ok"),
Source = "e2e",
Payload = "hello",
};
await publisher.PublishAsync(message);
// 等待 Mongo 中状态变为 Sent(写入是同步的,加少量退避以应对网络抖动)
var record = await PollUntilAsync(
message.MessageId,
x => x?.Status == OutboxMessageStatus.Sent
);
Assert.NotNull(record);
Assert.Equal(OutboxMessageStatus.Sent, record.Status);
Assert.NotNull(record.SentAt);
Assert.Equal(0, record.PublishAttempts);
Assert.Equal("e2e", record.Source);
Assert.Contains("\"Payload\":\"hello\"", record.Payload);
}
[Fact]
public async Task PublishViaOutbox_ShouldPersistAsPublishFailed_WhenRabbitMQUnavailable()
{
// 使用一个绑定到错误端口的发布者,强制发布失败
var failingPublisher = fixture.GetFailingPublisher<E2EMessage>();
var message = new E2EMessage { MessageId = Id("fail"), Source = "e2e" };
await Assert.ThrowsAnyAsync<Exception>(() => failingPublisher.PublishAsync(message));
var record = await PollUntilAsync(
message.MessageId,
x => x?.Status == OutboxMessageStatus.PublishFailed
);
Assert.NotNull(record);
Assert.Equal(OutboxMessageStatus.PublishFailed, record.Status);
Assert.Equal(1, record.PublishAttempts);
Assert.NotNull(record.LastPublishAttemptAt);
Assert.NotNull(record.NextPublishRetryAt);
Assert.False(string.IsNullOrEmpty(record.LastPublishError));
}
[Fact]
public async Task RetryPublisher_ShouldFlipFailedRecordToSent_WhenInvokedWithStoredPayload()
{
var publisher = fixture.GetFailingPublisher<E2EMessage>();
var message = new E2EMessage { MessageId = Id("retry"), Source = "retry-src" };
// 第一阶段:制造一条 PublishFailed 记录
await Assert.ThrowsAnyAsync<Exception>(() => publisher.PublishAsync(message));
var failed = await PollUntilAsync(
message.MessageId,
x => x?.Status == OutboxMessageStatus.PublishFailed
);
Assert.NotNull(failed);
Assert.Equal(OutboxMessageStatus.PublishFailed, failed.Status);
// 第二阶段:模拟 Outbox 后台重试执行器的工作流
// 1) 通过 GetPendingPublishRetry 拉取待重试条目(强制 NextPublishRetryAt 已到期)
await fixture.Repository.UpdateAsync(
x => x.MessageId == message.MessageId,
Builders<MessageOutboxRecord>.Update.Set(
x => x.NextPublishRetryAt,
DateTime.Now.AddMinutes(-1)
)
);
var entries = await fixture.OutboxStore.GetPendingPublishRetryAsync(50);
var entry = entries.FirstOrDefault(x => x.MessageId == message.MessageId);
Assert.NotNull(entry);
// 2) RabbitMQRawPublisher 使用真实可用 RabbitMQ 重发
await fixture.RawPublisher.PublishRawAsync(
entry.Exchange,
entry.RoutingKey,
entry.MessageId,
entry.Payload
);
await fixture.OutboxStore.MarkSentAsync(entry.MessageId);
// 3) 验证状态切换为 Sent
var sent = await PollUntilAsync(
message.MessageId,
x => x?.Status == OutboxMessageStatus.Sent
);
Assert.NotNull(sent);
Assert.Equal(OutboxMessageStatus.Sent, sent.Status);
Assert.NotNull(sent.SentAt);
}
private string Id(string suffix) => $"{_testRunId}-{suffix}";
private async Task<MessageOutboxRecord?> PollUntilAsync(
string messageId,
Func<MessageOutboxRecord?, bool> predicate,
int timeoutMs = 5000
)
{
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
MessageOutboxRecord? current = null;
while (DateTime.UtcNow < deadline)
{
current = await fixture
.Repository.SearchFor(x => x.MessageId == messageId)
.FirstOrDefaultAsync();
if (predicate(current))
{
return current;
}
await Task.Delay(100);
}
return current;
}
}
/// <summary>
/// E2E 测试用消息载荷类型。命名空间和类型独立,避免与生产消息共享 Exchange/Queue。
/// </summary>
public class E2EMessage : MessageBase
{
public string Payload { get; set; } = string.Empty;
}
[CollectionDefinition(nameof(OutboxEndToEndCollection))]
public class OutboxEndToEndCollection : ICollectionFixture<OutboxEndToEndFixture> { }
/// <summary>
/// E2E 集成测试 fixture:
/// - 真实连接到 RabbitMQ(按 appsettings.Test.json 的 RabbitMQ:* 配置)
/// - 真实连接到 Mongo
/// - 提供两个发布者:正常发布者 (host=localhost) 与故意失败的发布者 (HostName 指向无效地址)
///
/// 实现 <see cref="IAsyncDisposable"/>:因 <c>RabbitMQPublisher<T></c> 只实现了
/// <c>IAsyncDisposable</c>,<c>ServiceProvider.Dispose()</c> 会抛 InvalidOperationException,
/// 必须使用 <c>DisposeAsync()</c>。
/// </summary>
public class OutboxEndToEndFixture : IAsyncDisposable
{
private readonly ServiceProvider _provider;
private readonly ServiceProvider _failingProvider;
private readonly IServiceScope _rootScope;
public OutboxEndToEndFixture()
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.Test.json", optional: true)
.AddEnvironmentVariables()
.Build();
var mongoConnection = configuration.GetConnectionString("mongodb");
if (string.IsNullOrWhiteSpace(mongoConnection))
{
throw new InvalidOperationException(
"OutboxEndToEndFixture 初始化失败:appsettings.Test.json 缺少 ConnectionStrings:mongodb"
);
}
_provider = BuildProvider(configuration, failing: false);
_rootScope = _provider.CreateScope();
Repository = _rootScope.ServiceProvider.GetRequiredService<
IRepository<MessageOutboxRecord>
>();
Repository.Database.RunCommand<MongoDB.Bson.BsonDocument>("{ping:1}");
OutboxStore = _provider.GetRequiredService<IMessageOutboxStore>();
RawPublisher = _provider.GetRequiredService<IMessageOutboxRetryPublisher>();
// Failing provider:复制配置但 RabbitMQ 指向无效地址
var failingConfig = new ConfigurationBuilder()
.AddConfiguration(configuration)
.AddInMemoryCollection(
new Dictionary<string, string?>
{
["RabbitMQ:HostName"] = "127.0.0.1",
["RabbitMQ:Port"] = "1", // 几乎一定不可用
["RabbitMQ:RequestedConnectionTimeout"] = "1",
}
)
.Build();
_failingProvider = BuildProvider(failingConfig, failing: true);
}
public IRepository<MessageOutboxRecord> Repository { get; }
public IMessageOutboxStore OutboxStore { get; }
public IMessageOutboxRetryPublisher RawPublisher { get; }
public IMessagePublisher<T> GetPublisher<T>()
where T : MessageBase => _provider.GetRequiredService<IMessagePublisher<T>>();
public IMessagePublisher<T> GetFailingPublisher<T>()
where T : MessageBase => _failingProvider.GetRequiredService<IMessagePublisher<T>>();
private static ServiceProvider BuildProvider(IConfiguration configuration, bool failing)
{
var services = new ServiceCollection();
services.AddSingleton(configuration);
services.AddLogging();
// RabbitMQ 基础设施
services.AddRabbitMQ(configuration);
// 注册 Mongo 仓储
services.AddScoped(typeof(IRepository<>), typeof(Repository<>));
// 注册 IMongoMessageOutboxStore(生产实现)+ 启用 Outbox 装饰器
services.AddScoped<IMongoMessageOutboxStore, MongoMessageOutboxStore>();
services.AddSingleton<ILogger<MongoMessageOutboxStore>>(
NullLogger<MongoMessageOutboxStore>.Instance
);
services.AddMessageOutbox();
return services.BuildServiceProvider();
}
public async ValueTask DisposeAsync()
{
_rootScope.Dispose();
await _provider.DisposeAsync();
await _failingProvider.DisposeAsync();
GC.SuppressFinalize(this);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于消息队列 Outbox 模式的端到端集成测试代码,主要测试 RabbitMQ + MongoDB 的完整消息发布和兜底重试机制。让我详细解释各个部分:
整体架构
这个测试实现了 Outbox 模式,这是一种确保分布式系统中消息可靠发送的设计模式:
- 先将消息存储到本地数据库(MongoDB)
- 再发送到消息队列(RabbitMQ)
- 如果发送失败,后台任务会重试
核心测试类:OutboxEndToEndIntegrationTests
主要测试场景
正常发布测试 (
PublishViaOutbox_ShouldPersistAsSent_WhenRabbitMQAvailable)// 当RabbitMQ可用时,消息应该: // - 被成功发送到RabbitMQ // - 在MongoDB中状态标记为 "Sent" // - 记录发送时间发布失败测试 (
PublishViaOutbox_ShouldPersistAsPublishFailed_WhenRabbitMQUnavailable)// 当RabbitMQ不可用时,消息应该: // - 抛出异常 // - 在MongoDB中状态标记为 "PublishFailed" // - 记录重试次数和下次重试时间重试机制测试 (
RetryPublisher_ShouldFlipFailedRecordToSent_WhenInvokedWithStoredPayload)// 模拟后台重试任务: // 1. 先制造一个失败的消息记录 // 2. 模拟后台任务获取待重试的消息 // 3. 使用可用的RabbitMQ重新发送 // 4. 验证状态从 "PublishFailed" 变为 "Sent"
关键辅助方法
PollUntilAsync: 轮询等待数据库状态变化,处理异步操作的时序问题Id(): 生成唯一测试ID,避免测试间相互干扰DisposeAsync(): 清理测试数据
测试基础设施:OutboxEndToEndFixture
这是测试的核心支撑类,提供:
双重发布者设计
public IMessagePublisher<T> GetPublisher<T>() // 正常发布者
public IMessagePublisher<T> GetFailingPublisher<T>() // 故意失败的发布者
- 正常发布者:连接到真实的 RabbitMQ
- 失败发布者:连接到无效端口 (127.0.0.1:1),用于测试失败场景
依赖注入配置
services.AddRabbitMQ(configuration); // RabbitMQ 基础设施
services.AddScoped<IRepository<>>(); // MongoDB 仓储
services.AddScoped<IMongoMessageOutboxStore>(); // Outbox 存储实现
services.AddMessageOutbox(); // 启用 Outbox 装饰器
配置管理
- 从
appsettings.Test.json读取配置 - 对于失败测试,动态覆盖 RabbitMQ 配置指向无效地址
消息模型:E2EMessage
public class E2EMessage : MessageBase
{
public string Payload { get; set; } = string.Empty;
}
继承 MessageBase,包含消息ID、来源等基础字段,专门用于端到端测试。
技术特点
- 真实环境测试:使用真实的 RabbitMQ 和 MongoDB,不是 Mock
- 状态机验证:验证消息在不同场景下的状态转换
- 异步处理:正确处理异步操作和状态轮询
- 资源清理:实现了完整的资源清理机制
- 隔离性:每个测试使用唯一ID,避免相互影响
使用场景
这类测试适合:
- 验证消息可靠性保证机制
- 测试分布式事务的一致性
- 验证故障恢复和重试逻辑
- 端到端的系统集成验证
这是一个非常完整和专业的集成测试实现,体现了现代微服务架构中消息可靠性保证的最佳实践。
评论加载中...