using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.Logging.Abstractions;
namespace Dpz.Core.MessageQueue.Test.RabbitMQ;
public class MessageOutboxRetryServiceTests
{
[Fact]
public async Task RetryAsync_ShouldRepublishPendingEntriesAndMarkSent()
{
var outboxStore = new InMemoryOutboxStore
{
PendingPublishEntries = [Entry("publish-1"), Entry("publish-2")],
PendingConsumeEntries = [Entry("consume-1")],
};
var retryPublisher = new InMemoryRetryPublisher();
var sut = new MessageOutboxRetryService(
outboxStore,
retryPublisher,
NullLogger<MessageOutboxRetryService>.Instance
);
var result = await sut.RetryAsync(50);
Assert.Equal(2, result.PublishRetryCount);
Assert.Equal(0, result.PublishFailureCount);
Assert.Equal(1, result.ConsumeRetryCount);
Assert.Equal(0, result.ConsumeFailureCount);
Assert.Equal(["publish-1", "publish-2", "consume-1"], outboxStore.SentMessageIds);
Assert.Equal(["publish-1", "publish-2", "consume-1"], retryPublisher.PublishedMessageIds);
}
[Fact]
public async Task RetryAsync_ShouldMarkPublishFailed_WhenPublishRetryThrows()
{
var outboxStore = new InMemoryOutboxStore { PendingPublishEntries = [Entry("publish-1")] };
var retryPublisher = new InMemoryRetryPublisher { ThrowForMessageIds = ["publish-1"] };
var sut = new MessageOutboxRetryService(
outboxStore,
retryPublisher,
NullLogger<MessageOutboxRetryService>.Instance
);
var result = await sut.RetryAsync(50);
Assert.Equal(1, result.PublishRetryCount);
Assert.Equal(1, result.PublishFailureCount);
Assert.Empty(outboxStore.SentMessageIds);
Assert.Equal(["publish-1"], outboxStore.PublishFailedMessageIds);
}
[Fact]
public async Task RetryAsync_ShouldMarkConsumeFailed_WhenConsumeRetryThrows()
{
var outboxStore = new InMemoryOutboxStore { PendingConsumeEntries = [Entry("consume-1")] };
var retryPublisher = new InMemoryRetryPublisher { ThrowForMessageIds = ["consume-1"] };
var sut = new MessageOutboxRetryService(
outboxStore,
retryPublisher,
NullLogger<MessageOutboxRetryService>.Instance
);
var result = await sut.RetryAsync(50);
Assert.Equal(1, result.ConsumeRetryCount);
Assert.Equal(1, result.ConsumeFailureCount);
Assert.Empty(outboxStore.SentMessageIds);
Assert.Equal(["consume-1"], outboxStore.ConsumeFailedMessageIds);
}
private static MessageOutboxEntry Entry(string messageId) =>
new(messageId, "test.exchange", "test.routing", "{}", 1, 1);
private sealed class InMemoryRetryPublisher : IMessageOutboxRetryPublisher
{
public List<string> PublishedMessageIds { get; } = [];
public HashSet<string> ThrowForMessageIds { get; init; } = [];
public Task PublishRawAsync(
string exchange,
string routingKey,
string messageId,
string jsonPayload,
CancellationToken cancellationToken = default
)
{
if (ThrowForMessageIds.Contains(messageId))
{
throw new InvalidOperationException("retry failed");
}
PublishedMessageIds.Add(messageId);
return Task.CompletedTask;
}
}
private sealed class InMemoryOutboxStore : IMessageOutboxStore
{
public IReadOnlyList<MessageOutboxEntry> PendingPublishEntries { get; init; } = [];
public IReadOnlyList<MessageOutboxEntry> PendingConsumeEntries { get; init; } = [];
public List<string> SentMessageIds { get; } = [];
public List<string> PublishFailedMessageIds { get; } = [];
public List<string> ConsumeFailedMessageIds { get; } = [];
public Task CreateAsync(
string messageId,
string messageType,
string exchange,
string routingKey,
string payload,
string? source,
CancellationToken cancellationToken = default
) => Task.CompletedTask;
public Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default)
{
SentMessageIds.Add(messageId);
return Task.CompletedTask;
}
public Task MarkPublishFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
)
{
PublishFailedMessageIds.Add(messageId);
return Task.CompletedTask;
}
public Task MarkConsumedAsync(
string messageId,
CancellationToken cancellationToken = default
) => Task.CompletedTask;
public Task MarkConsumeFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
)
{
ConsumeFailedMessageIds.Add(messageId);
return Task.CompletedTask;
}
public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
) => Task.FromResult(PendingPublishEntries);
public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
) => Task.FromResult(PendingConsumeEntries);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个 消息发件箱重试服务 的单元测试类,主要测试消息发布失败后的重试机制。让我详细解释这个代码:
代码结构概述
这个测试类验证 MessageOutboxRetryService 的功能,该服务用于处理消息队列中失败消息的重试逻辑。
核心测试场景
1. 成功重试测试 (RetryAsync_ShouldRepublishPendingEntriesAndMarkSent)
[Fact]
public async Task RetryAsync_ShouldRepublishPendingEntriesAndMarkSent()
测试目的:验证重试服务能正确处理待发布和待消费的消息
测试设置:
- 2个待发布消息 (
publish-1,publish-2) - 1个待消费消息 (
consume-1) - 使用模拟的存储和发布器
验证结果:
- 发布重试计数:2
- 消费重试计数:1
- 失败计数均为0
- 所有消息都被正确标记为已发送
- 所有消息都通过重试发布器发布
2. 发布失败测试 (RetryAsync_ShouldMarkPublishFailed_WhenPublishRetryThrows)
[Fact]
public async Task RetryAsync_ShouldMarkPublishFailed_WhenPublishRetryThrows()
测试目的:验证发布重试失败时的错误处理
测试场景:
- 模拟发布器对特定消息ID抛出异常
- 验证失败消息被正确标记为发布失败
- 确认消息不会被标记为已发送
3. 消费失败测试 (RetryAsync_ShouldMarkConsumeFailed_WhenConsumeRetryThrows)
[Fact]
public async Task RetryAsync_ShouldMarkConsumeFailed_WhenConsumeRetryThrows()
测试目的:验证消费重试失败时的错误处理
测试逻辑:类似发布失败测试,但针对消费场景
测试辅助类
InMemoryRetryPublisher
private sealed class InMemoryRetryPublisher : IMessageOutboxRetryPublisher
功能:
- 模拟消息重试发布器
- 记录已发布的消息ID
- 支持配置特定消息ID抛出异常(用于测试失败场景)
InMemoryOutboxStore
private sealed class InMemoryOutboxStore : IMessageOutboxStore
功能:
- 模拟消息发件箱存储
- 提供待处理消息列表
- 记录各种状态的消息ID(已发送、发布失败、消费失败)
- 实现所有必需的存储操作方法
测试价值
这个测试类确保了:
- 可靠性:消息重试机制能正确处理各种场景
- 错误处理:失败情况下能正确标记消息状态
- 状态管理:消息状态转换的准确性
- 业务逻辑:重试计数和失败计数的准确统计
设计模式
代码使用了以下良好实践:
- 依赖注入:通过接口抽象依赖关系
- 内存实现:使用内存存储加快测试执行
- 测试隔离:每个测试方法独立,互不影响
- 场景覆盖:涵盖成功和失败的关键路径
这是一个设计良好的单元测试,有效验证了消息发件箱重试服务的核心功能。
评论加载中...