using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Microsoft.Extensions.Logging;

namespace Dpz.Core.MessageQueue.RabbitMQ;

public sealed class OutboxMessagePublisher<TMessage>(
    RabbitMQPublisher<TMessage> inner,
    IMessageOutboxStore outboxStore,
    IMessageRoutingConvention routingConvention,
    ILogger<OutboxMessagePublisher<TMessage>> logger
) : IMessagePublisher<TMessage>
    where TMessage : MessageBase
{
    private readonly string _exchangeName = routingConvention.GetExchangeName<TMessage>();
    private readonly string _defaultRoutingKey = routingConvention.GetRoutingKey<TMessage>();

    public async Task PublishAsync(
        TMessage message,
        string? routingKey = null,
        CancellationToken cancellationToken = default
    )
    {
        var actualRoutingKey = routingKey ?? _defaultRoutingKey;
        var payload = JsonSerializer.Serialize(message);

        await outboxStore.CreateAsync(
            message.MessageId,
            typeof(TMessage).FullName!,
            _exchangeName,
            actualRoutingKey,
            payload,
            message.Source,
            cancellationToken
        );

        try
        {
            await inner.PublishAsync(message, routingKey, cancellationToken);
            await outboxStore.MarkSentAsync(message.MessageId, cancellationToken);
        }
        catch (Exception ex)
        {
            logger.LogError(
                ex,
                "消息发布失败,已记录至Outbox: MessageId={MessageId}, Exchange={Exchange}",
                message.MessageId,
                _exchangeName
            );
            await outboxStore.MarkPublishFailedAsync(
                message.MessageId,
                ex.Message,
                cancellationToken
            );
            throw;
        }
    }

    public async Task PublishBatchAsync(
        IEnumerable<TMessage> messages,
        string? routingKey = null,
        CancellationToken cancellationToken = default
    )
    {
        var actualRoutingKey = routingKey ?? _defaultRoutingKey;
        var messageList = messages.ToList();

        foreach (var message in messageList)
        {
            var payload = JsonSerializer.Serialize(message);
            await outboxStore.CreateAsync(
                message.MessageId,
                typeof(TMessage).FullName!,
                _exchangeName,
                actualRoutingKey,
                payload,
                message.Source,
                cancellationToken
            );
        }

        try
        {
            await inner.PublishBatchAsync(messageList, routingKey, cancellationToken);
            foreach (var message in messageList)
            {
                await outboxStore.MarkSentAsync(message.MessageId, cancellationToken);
            }
        }
        catch (Exception ex)
        {
            logger.LogError(
                ex,
                "批量消息发布失败,已记录至Outbox: Count={Count}, Exchange={Exchange}",
                messageList.Count,
                _exchangeName
            );
            foreach (var message in messageList)
            {
                await outboxStore.MarkPublishFailedAsync(
                    message.MessageId,
                    ex.Message,
                    cancellationToken
                );
            }
            throw;
        }
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个实现了Outbox模式的RabbitMQ消息发布器,用于确保消息发布的可靠性。让我详细解释这个类的设计和功能:

类概述

OutboxMessagePublisher<TMessage> 是一个泛型类,它装饰了内部的 RabbitMQPublisher,为消息发布添加了Outbox模式的支持。

核心设计模式

1. Outbox模式

这是分布式系统中确保数据一致性的重要模式:

  • 在发布消息前,先将消息保存到本地存储(Outbox表)
  • 然后尝试发布消息
  • 根据发布结果更新Outbox记录的状态

2. 装饰器模式

通过包装 RabbitMQPublisher<TMessage> 来增强其功能,而不修改原有实现。

构造函数参数

public sealed class OutboxMessagePublisher<TMessage>(
    RabbitMQPublisher<TMessage> inner,           // 被装饰的原始发布器
    IMessageOutboxStore outboxStore,             // Outbox存储接口
    IMessageRoutingConvention routingConvention, // 路由约定
    ILogger<OutboxMessagePublisher<TMessage>> logger // 日志记录器
)

核心功能

单消息发布 (PublishAsync)

执行流程:

  1. 预处理:确定路由键,序列化消息
  2. 记录Outbox:将消息信息保存到Outbox存储
  3. 发布消息:调用内部发布器发送消息
  4. 成功处理:标记消息为已发送
  5. 异常处理:记录发布失败状态并重新抛出异常

批量消息发布 (PublishBatchAsync)

类似单消息发布,但针对消息集合:

  1. 先将所有消息记录到Outbox
  2. 批量发布所有消息
  3. 根据结果批量更新状态

关键特性

1. 可靠性保证

// 先记录到Outbox,确保消息不会丢失
await outboxStore.CreateAsync(message.MessageId, ...);

// 然后尝试发布
await inner.PublishAsync(message, routingKey, cancellationToken);

2. 状态跟踪

  • 成功发布:MarkSentAsync
  • 发布失败:MarkPublishFailedAsync

3. 异常透明性

发布失败时,异常会被重新抛出,保持原有的错误处理语义。

使用场景

这种设计特别适用于:

  • 微服务架构:需要确保事务完整性
  • 高可靠性要求:不能丢失重要业务消息
  • 分布式事务:需要最终一致性保证

优势

  1. 消息不丢失:即使发布失败,消息也已保存在Outbox中
  2. 可重试:失败的消息可以通过Outbox重新发布
  3. 可监控:通过Outbox可以监控消息发布状态
  4. 事务一致性:支持与业务操作在同一事务中执行

这是一个非常成熟的企业级消息发布解决方案,很好地解决了分布式系统中消息可靠性的问题。

评论加载中...