using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.Logging.Abstractions;

namespace Dpz.Core.MessageQueue.Test.RabbitMQ;

public class MessageOutboxRetryServiceTests
{
    [Fact]
    public async Task RetryAsync_ShouldRepublishPendingEntriesAndMarkSent()
    {
        var outboxStore = new InMemoryOutboxStore
        {
            PendingPublishEntries = [Entry("publish-1"), Entry("publish-2")],
            PendingConsumeEntries = [Entry("consume-1")],
        };
        var retryPublisher = new InMemoryRetryPublisher();
        var sut = new MessageOutboxRetryService(
            outboxStore,
            retryPublisher,
            NullLogger<MessageOutboxRetryService>.Instance
        );

        var result = await sut.RetryAsync(50);

        Assert.Equal(2, result.PublishRetryCount);
        Assert.Equal(0, result.PublishFailureCount);
        Assert.Equal(1, result.ConsumeRetryCount);
        Assert.Equal(0, result.ConsumeFailureCount);
        Assert.Equal(["publish-1", "publish-2", "consume-1"], outboxStore.SentMessageIds);
        Assert.Equal(["publish-1", "publish-2", "consume-1"], retryPublisher.PublishedMessageIds);
    }

    [Fact]
    public async Task RetryAsync_ShouldMarkPublishFailed_WhenPublishRetryThrows()
    {
        var outboxStore = new InMemoryOutboxStore { PendingPublishEntries = [Entry("publish-1")] };
        var retryPublisher = new InMemoryRetryPublisher { ThrowForMessageIds = ["publish-1"] };
        var sut = new MessageOutboxRetryService(
            outboxStore,
            retryPublisher,
            NullLogger<MessageOutboxRetryService>.Instance
        );

        var result = await sut.RetryAsync(50);

        Assert.Equal(1, result.PublishRetryCount);
        Assert.Equal(1, result.PublishFailureCount);
        Assert.Empty(outboxStore.SentMessageIds);
        Assert.Equal(["publish-1"], outboxStore.PublishFailedMessageIds);
    }

    [Fact]
    public async Task RetryAsync_ShouldMarkConsumeFailed_WhenConsumeRetryThrows()
    {
        var outboxStore = new InMemoryOutboxStore { PendingConsumeEntries = [Entry("consume-1")] };
        var retryPublisher = new InMemoryRetryPublisher { ThrowForMessageIds = ["consume-1"] };
        var sut = new MessageOutboxRetryService(
            outboxStore,
            retryPublisher,
            NullLogger<MessageOutboxRetryService>.Instance
        );

        var result = await sut.RetryAsync(50);

        Assert.Equal(1, result.ConsumeRetryCount);
        Assert.Equal(1, result.ConsumeFailureCount);
        Assert.Empty(outboxStore.SentMessageIds);
        Assert.Equal(["consume-1"], outboxStore.ConsumeFailedMessageIds);
    }

    private static MessageOutboxEntry Entry(string messageId) =>
        new(messageId, "test.exchange", "test.routing", "{}", 1, 1);

    private sealed class InMemoryRetryPublisher : IMessageOutboxRetryPublisher
    {
        public List<string> PublishedMessageIds { get; } = [];

        public HashSet<string> ThrowForMessageIds { get; init; } = [];

        public Task PublishRawAsync(
            string exchange,
            string routingKey,
            string messageId,
            string jsonPayload,
            CancellationToken cancellationToken = default
        )
        {
            if (ThrowForMessageIds.Contains(messageId))
            {
                throw new InvalidOperationException("retry failed");
            }

            PublishedMessageIds.Add(messageId);
            return Task.CompletedTask;
        }
    }

    private sealed class InMemoryOutboxStore : IMessageOutboxStore
    {
        public IReadOnlyList<MessageOutboxEntry> PendingPublishEntries { get; init; } = [];

        public IReadOnlyList<MessageOutboxEntry> PendingConsumeEntries { get; init; } = [];

        public List<string> SentMessageIds { get; } = [];

        public List<string> PublishFailedMessageIds { get; } = [];

        public List<string> ConsumeFailedMessageIds { get; } = [];

        public Task CreateAsync(
            string messageId,
            string messageType,
            string exchange,
            string routingKey,
            string payload,
            string? source,
            CancellationToken cancellationToken = default
        ) => Task.CompletedTask;

        public Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default)
        {
            SentMessageIds.Add(messageId);
            return Task.CompletedTask;
        }

        public Task MarkPublishFailedAsync(
            string messageId,
            string error,
            CancellationToken cancellationToken = default
        )
        {
            PublishFailedMessageIds.Add(messageId);
            return Task.CompletedTask;
        }

        public Task MarkConsumedAsync(
            string messageId,
            CancellationToken cancellationToken = default
        ) => Task.CompletedTask;

        public Task MarkConsumeFailedAsync(
            string messageId,
            string error,
            CancellationToken cancellationToken = default
        )
        {
            ConsumeFailedMessageIds.Add(messageId);
            return Task.CompletedTask;
        }

        public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
            int batchSize,
            CancellationToken cancellationToken = default
        ) => Task.FromResult(PendingPublishEntries);

        public Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
            int batchSize,
            CancellationToken cancellationToken = default
        ) => Task.FromResult(PendingConsumeEntries);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个 消息发件箱重试服务 的单元测试类,主要测试消息发布失败后的重试机制。让我详细解释这个代码:

代码结构概述

这个测试类验证 MessageOutboxRetryService 的功能,该服务用于处理消息队列中失败消息的重试逻辑。

核心测试场景

1. 成功重试测试 (RetryAsync_ShouldRepublishPendingEntriesAndMarkSent)

[Fact]
public async Task RetryAsync_ShouldRepublishPendingEntriesAndMarkSent()

测试目的:验证重试服务能正确处理待发布和待消费的消息

测试设置

  • 2个待发布消息 (publish-1, publish-2)
  • 1个待消费消息 (consume-1)
  • 使用模拟的存储和发布器

验证结果

  • 发布重试计数:2
  • 消费重试计数:1
  • 失败计数均为0
  • 所有消息都被正确标记为已发送
  • 所有消息都通过重试发布器发布

2. 发布失败测试 (RetryAsync_ShouldMarkPublishFailed_WhenPublishRetryThrows)

[Fact]
public async Task RetryAsync_ShouldMarkPublishFailed_WhenPublishRetryThrows()

测试目的:验证发布重试失败时的错误处理

测试场景

  • 模拟发布器对特定消息ID抛出异常
  • 验证失败消息被正确标记为发布失败
  • 确认消息不会被标记为已发送

3. 消费失败测试 (RetryAsync_ShouldMarkConsumeFailed_WhenConsumeRetryThrows)

[Fact] 
public async Task RetryAsync_ShouldMarkConsumeFailed_WhenConsumeRetryThrows()

测试目的:验证消费重试失败时的错误处理

测试逻辑:类似发布失败测试,但针对消费场景

测试辅助类

InMemoryRetryPublisher

private sealed class InMemoryRetryPublisher : IMessageOutboxRetryPublisher

功能

  • 模拟消息重试发布器
  • 记录已发布的消息ID
  • 支持配置特定消息ID抛出异常(用于测试失败场景)

InMemoryOutboxStore

private sealed class InMemoryOutboxStore : IMessageOutboxStore

功能

  • 模拟消息发件箱存储
  • 提供待处理消息列表
  • 记录各种状态的消息ID(已发送、发布失败、消费失败)
  • 实现所有必需的存储操作方法

测试价值

这个测试类确保了:

  1. 可靠性:消息重试机制能正确处理各种场景
  2. 错误处理:失败情况下能正确标记消息状态
  3. 状态管理:消息状态转换的准确性
  4. 业务逻辑:重试计数和失败计数的准确统计

设计模式

代码使用了以下良好实践:

  • 依赖注入:通过接口抽象依赖关系
  • 内存实现:使用内存存储加快测试执行
  • 测试隔离:每个测试方法独立,互不影响
  • 场景覆盖:涵盖成功和失败的关键路径

这是一个设计良好的单元测试,有效验证了消息发件箱重试服务的核心功能。

评论加载中...