using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using ExchangeType = Dpz.Core.MessageQueue.Enums.ExchangeType;

namespace Dpz.Core.MessageQueue.RabbitMQ;

/// <summary>
/// RabbitMQ消息发布者实现
/// 队列发布频繁可以改为 Channel 池 或者 AsyncLocal&lt;IChannel&gt;线程本地化
/// 不必过早优化
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
public class RabbitMQPublisher<TMessage>(
    IRabbitMQConnectionFactory connectionFactory,
    IMessageRoutingConvention routingConvention,
    ILogger<RabbitMQPublisher<TMessage>> logger
) : IMessagePublisher<TMessage>, IAsyncDisposable
    where TMessage : MessageBase
{
    private readonly string _exchangeName = routingConvention.GetExchangeName<TMessage>();
    private readonly ExchangeType _exchangeType = routingConvention.GetExchangeType<TMessage>();
    private readonly string _defaultRoutingKey = routingConvention.GetRoutingKey<TMessage>();
    private readonly string _queueName = routingConvention.GetQueueName<TMessage>();

    // 并发控制:保护 Channel 的创建和基础设施初始化
    private readonly SemaphoreSlim _initLock = new(1, 1);
    private bool _infrastructureInitialized;

    // Channel 复用(单例模式下安全,通过 SemaphoreSlim 保护并发访问)
    private IChannel? _channel;
    private readonly SemaphoreSlim _channelLock = new(1, 1);

    public async Task PublishAsync(
        TMessage message,
        string? routingKey = null,
        CancellationToken cancellationToken = default
    )
    {
        // 首次发布前确保 Exchange 和 Queue 都已创建(防止消息丢失)
        await EnsureInfrastructureAsync();

        // 获取复用的 Channel
        await _channelLock.WaitAsync(cancellationToken);
        try
        {
            var channel = await GetOrCreateChannelAsync();

            // 序列化消息
            var json = JsonSerializer.Serialize(message);
            var body = Encoding.UTF8.GetBytes(json);

            // 设置消息属性
            var properties = new BasicProperties
            {
                Persistent = true,
                MessageId = message.MessageId,
                Timestamp = new AmqpTimestamp(
                    ((DateTimeOffset)message.Timestamp).ToUnixTimeSeconds()
                ),
                ContentType = "application/json",
                DeliveryMode = DeliveryModes.Persistent,
            };

            // 发布消息(mandatory=true 启用路由失败检测)
            var actualRoutingKey = routingKey ?? _defaultRoutingKey;
            await channel.BasicPublishAsync(
                exchange: _exchangeName,
                routingKey: actualRoutingKey,
                mandatory: true,
                basicProperties: properties,
                body: body,
                cancellationToken: cancellationToken
            );

            logger.LogInformation(
                "消息已发布: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
                message.MessageId,
                _exchangeName,
                actualRoutingKey
            );
        }
        catch (Exception ex)
        {
            logger.LogError(
                ex,
                "发布消息失败: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
                message.MessageId,
                _exchangeName,
                routingKey ?? _defaultRoutingKey
            );

            // 发生异常时清理 Channel,下次重新创建
            await CleanupChannelAsync();
            throw;
        }
        finally
        {
            _channelLock.Release();
        }
    }

    public async Task PublishBatchAsync(
        IEnumerable<TMessage> messages,
        string? routingKey = null,
        CancellationToken cancellationToken = default
    )
    {
        var messageList = messages.ToList();
        if (messageList.Count == 0)
        {
            return;
        }

        // 首次发布前确保 Exchange 和 Queue 都已创建(防止消息丢失)
        await EnsureInfrastructureAsync();

        // 获取复用的 Channel
        await _channelLock.WaitAsync(cancellationToken);
        try
        {
            var channel = await GetOrCreateChannelAsync();
            var actualRoutingKey = routingKey ?? _defaultRoutingKey;

            // 批量发布所有消息
            // 1. 复用 Channel,减少连接开销
            // 2. 在同一个锁内批量发送,保证顺序性和原子性
            // 3. 消息持久化(Persistent=true)保证可靠性
            foreach (var message in messageList)
            {
                var json = JsonSerializer.Serialize(message);
                var body = Encoding.UTF8.GetBytes(json);

                var properties = new BasicProperties
                {
                    Persistent = true,
                    MessageId = message.MessageId,
                    Timestamp = new AmqpTimestamp(
                        ((DateTimeOffset)message.Timestamp).ToUnixTimeSeconds()
                    ),
                    ContentType = "application/json",
                    DeliveryMode = DeliveryModes.Persistent,
                };

                await channel.BasicPublishAsync(
                    exchange: _exchangeName,
                    routingKey: actualRoutingKey,
                    mandatory: true,
                    basicProperties: properties,
                    body: body,
                    cancellationToken: cancellationToken
                );
            }

            logger.LogInformation(
                "批量发布 {Count} 条消息成功: Exchange={Exchange}, RoutingKey={RoutingKey}",
                messageList.Count,
                _exchangeName,
                actualRoutingKey
            );
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "批量发布消息失败: 尝试发布 {Count} 条消息", messageList.Count);

            // 发生异常时清理 Channel,下次重新创建
            await CleanupChannelAsync();
            throw;
        }
        finally
        {
            _channelLock.Release();
        }
    }

    /// <summary>
    /// 确保 Exchange、Queue 和 Binding 已创建(线程安全,双重检查锁定)
    /// 这样可以确保即使消费者未启动,消息也能正确路由到队列
    /// </summary>
    private async Task EnsureInfrastructureAsync()
    {
        // 第一次检查:避免不必要的锁竞争
        if (_infrastructureInitialized)
        {
            return;
        }

        await _initLock.WaitAsync();
        try
        {
            // 第二次检查:防止重复创建
            if (_infrastructureInitialized)
            {
                return;
            }

            var channel = await connectionFactory.CreateChannelAsync();
            await using var _ = channel.ConfigureAwait(false);

            // 1. 声明Exchange(幂等操作)
            await channel.ExchangeDeclareAsync(
                exchange: _exchangeName,
                type: _exchangeType.ToRabbitMQString(),
                durable: true,
                autoDelete: false,
                arguments: null
            );

            // 2. 声明Queue(幂等操作)
            await channel.QueueDeclareAsync(
                queue: _queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null
            );

            // 3. 绑定Queue到Exchange(幂等操作)
            await channel.QueueBindAsync(
                queue: _queueName,
                exchange: _exchangeName,
                routingKey: _defaultRoutingKey,
                arguments: null
            );

            _infrastructureInitialized = true;

            logger.LogInformation(
                "RabbitMQ基础设施已就绪: Exchange={Exchange}, Queue={Queue}, RoutingKey={RoutingKey}, Type={Type}",
                _exchangeName,
                _queueName,
                _defaultRoutingKey,
                _exchangeType
            );
        }
        catch (Exception ex)
        {
            logger.LogError(
                ex,
                "创建RabbitMQ基础设施失败: Exchange={Exchange}, Queue={Queue}",
                _exchangeName,
                _queueName
            );
            throw;
        }
        finally
        {
            _initLock.Release();
        }
    }

    /// <summary>
    /// 获取或创建复用的 Channel
    /// 注意:调用此方法前必须已获取 _channelLock
    /// </summary>
    private async Task<IChannel> GetOrCreateChannelAsync()
    {
        // 如果 Channel 已存在且可用,直接返回
        if (_channel is { IsOpen: true })
        {
            return _channel;
        }

        // 清理旧的 Channel
        if (_channel != null)
        {
            try
            {
                await _channel.CloseAsync();
                await _channel.DisposeAsync();
            }
            catch
            {
                // 忽略清理异常
            }
        }

        // 创建新的 Channel
        _channel = await connectionFactory.CreateChannelAsync();

        logger.LogDebug("已创建新的 Channel: Exchange={Exchange}", _exchangeName);

        return _channel;
    }

    /// <summary>
    /// 清理 Channel(在发生异常时调用)
    /// 调用此方法时无需持有 _channelLock
    /// </summary>
    private async Task CleanupChannelAsync()
    {
        if (_channel == null)
        {
            return;
        }

        try
        {
            await _channel.CloseAsync();
            await _channel.DisposeAsync();
        }
        catch (Exception ex)
        {
            logger.LogWarning(ex, "清理 Channel 时发生异常");
        }
        finally
        {
            _channel = null;
        }
    }

    /// <summary>
    /// 释放资源
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        await CleanupChannelAsync();

        _initLock.Dispose();
        _channelLock.Dispose();

        GC.SuppressFinalize(this);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于RabbitMQ的消息发布者实现,采用了泛型设计和现代C#异步模式。让我详细解释一下这个代码的各个部分:

类的整体设计

public class RabbitMQPublisher<TMessage> : IMessagePublisher<TMessage>, IAsyncDisposable
    where TMessage : MessageBase
  • 泛型约束TMessage必须继承自MessageBase,确保类型安全
  • 接口实现:实现了IMessagePublisher<TMessage>IAsyncDisposable
  • 依赖注入:通过构造函数注入所需的依赖项

核心字段和初始化

private readonly string _exchangeName = routingConvention.GetExchangeName<TMessage>();
private readonly ExchangeType _exchangeType = routingConvention.GetExchangeType<TMessage>();
private readonly string _defaultRoutingKey = routingConvention.GetRoutingKey<TMessage>();
private readonly string _queueName = routingConvention.GetQueueName<TMessage>();

这些字段在构造时就确定了消息路由的基本配置,避免了运行时的重复计算。

并发控制机制

代码使用了两个SemaphoreSlim来处理并发:

  1. _initLock:保护基础设施(Exchange、Queue)的初始化
  2. _channelLock:保护Channel的创建和使用

这种设计确保了线程安全,同时避免了过度的锁竞争。

核心功能分析

1. 单消息发布 (PublishAsync)

public async Task PublishAsync(TMessage message, string? routingKey = null, CancellationToken cancellationToken = default)

关键特性:

  • 首次调用时自动创建基础设施
  • Channel复用减少连接开销
  • 消息持久化保证可靠性
  • 异常时自动清理Channel

2. 批量消息发布 (PublishBatchAsync)

public async Task PublishBatchAsync(IEnumerable<TMessage> messages, string? routingKey = null, CancellationToken cancellationToken = default)

优势:

  • 在同一个锁内批量发送,保证顺序性
  • 复用Channel减少网络开销
  • 原子性操作,要么全部成功要么全部失败

3. 基础设施管理 (EnsureInfrastructureAsync)

使用双重检查锁定模式

if (_infrastructureInitialized) return; // 第一次检查
await _initLock.WaitAsync();
if (_infrastructureInitialized) return; // 第二次检查

这确保了:

  • Exchange、Queue和Binding的幂等创建
  • 即使消费者未启动,消息也能正确路由
  • 避免重复初始化的性能损耗

4. Channel管理

Channel复用策略:

  • 单例模式下安全的Channel复用
  • 异常时自动重建Channel
  • 资源清理和异常处理

可靠性保证

  1. 消息持久化

    var properties = new BasicProperties
    {
        Persistent = true,
        DeliveryMode = DeliveryModes.Persistent,
    };
    
  2. 路由失败检测

    await channel.BasicPublishAsync(
        mandatory: true, // 启用路由失败检测
        // ...
    );
    
  3. 异常处理:完整的异常捕获、日志记录和资源清理

性能优化

  1. Channel复用:避免频繁创建/销毁连接
  2. 懒加载:基础设施按需创建
  3. 批量操作:减少网络往返次数
  4. 并发控制:最小化锁竞争

资源管理

实现了IAsyncDisposable模式:

public async ValueTask DisposeAsync()
{
    await CleanupChannelAsync();
    _initLock.Dispose();
    _channelLock.Dispose();
    GC.SuppressFinalize(this);
}

确保所有资源都能正确释放。

总结

这是一个设计良好的消息发布者实现,具有以下优点:

  • 高可靠性:消息持久化、异常处理、资源清理
  • 高性能:Channel复用、批量操作、并发优化
  • 线程安全:完善的并发控制机制
  • 易维护:清晰的代码结构和丰富的日志
  • 可扩展:泛型设计支持不同消息类型

代码注释中提到"不必过早优化",但实际上已经实现了很多重要的优化,为生产环境使用做好了准备。

评论加载中...