using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace Dpz.Core.MessageQueue.RabbitMQ;

/// <summary>
/// RabbitMQ连接工厂实现
/// </summary>
public class RabbitMQConnectionFactory(
    IOptions<RabbitMQOptions> options,
    ILogger<RabbitMQConnectionFactory> logger
) : IRabbitMQConnectionFactory, IDisposable
{
    private readonly RabbitMQOptions _options = options.Value;
    private IConnection? _connection;
    private readonly SemaphoreSlim _lock = new(1, 1);
    private bool _disposed;

    public async Task<IConnection> GetConnectionAsync()
    {
        if (_connection is { IsOpen: true })
        {
            return _connection;
        }

        await _lock.WaitAsync();
        try
        {
            if (_connection is { IsOpen: true })
            {
                return _connection;
            }

            if (_connection != null)
            {
                try
                {
                    // 使用独立的超时令牌,防止立即取消
                    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
                    await _connection.CloseAsync(cts.Token);
                }
                catch
                {
                    // 忽略关闭异常,继续清理资源
                }
                finally
                {
                    _connection.Dispose();
                }
            }

            _connection = await CreateConnectionAsync();
            return _connection;
        }
        finally
        {
            _lock.Release();
        }
    }

    public async Task<IChannel> CreateChannelAsync()
    {
        var connection = await GetConnectionAsync();
        return await connection.CreateChannelAsync();
    }

    private async Task<IConnection> CreateConnectionAsync()
    {
        try
        {
            var factory = new ConnectionFactory
            {
                HostName = _options.HostName,
                Port = _options.Port,
                UserName = _options.UserName,
                Password = _options.Password,
                VirtualHost = _options.VirtualHost,
                RequestedConnectionTimeout = TimeSpan.FromSeconds(
                    _options.RequestedConnectionTimeout
                ),
                RequestedHeartbeat = TimeSpan.FromSeconds(_options.RequestedHeartbeat),
                AutomaticRecoveryEnabled = _options.AutomaticRecoveryEnabled,
                NetworkRecoveryInterval = TimeSpan.FromSeconds(_options.NetworkRecoveryInterval),
            };

            var connection = await factory.CreateConnectionAsync();

            logger.LogInformation(
                "成功连接到RabbitMQ: {HostName}:{Port}",
                _options.HostName,
                _options.Port
            );

            connection.ConnectionShutdownAsync += (_, args) =>
            {
                logger.LogWarning(
                    "RabbitMQ连接已关闭: {HostName}:{Port} -> {Reason}",
                    _options.HostName,
                    _options.Port,
                    args.ReplyText
                );
                return Task.CompletedTask;
            };

            connection.CallbackExceptionAsync += (_, args) =>
            {
                logger.LogError(args.Exception, "RabbitMQ回调异常");
                return Task.CompletedTask;
            };

            return connection;
        }
        catch (Exception ex)
        {
            logger.LogError(
                ex,
                "连接RabbitMQ失败: {HostName}:{Port}",
                _options.HostName,
                _options.Port
            );
            throw;
        }
    }

    public async ValueTask DisposeAsync()
    {
        if (_disposed)
        {
            return;
        }

        await _lock.WaitAsync();
        try
        {
            if (_connection != null)
            {
                try
                {
                    // 使用独立的超时令牌
                    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
                    await _connection.CloseAsync(cts.Token);
                }
                catch (Exception ex)
                {
                    logger.LogWarning(ex, "关闭RabbitMQ连接时发生异常");
                }
                finally
                {
                    _connection.Dispose();
                }
            }
        }
        finally
        {
            _lock.Release();
            _lock.Dispose();
            _disposed = true;
        }

        GC.SuppressFinalize(this);
    }

    public void Dispose()
    {
        if (_disposed)
        {
            return;
        }

        // 同步 Dispose 中不调用异步方法,直接清理资源
        _connection?.Dispose();
        _lock.Dispose();
        _disposed = true;
        GC.SuppressFinalize(this);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个用于管理 RabbitMQ 连接的工厂类实现,主要功能是创建和维护 RabbitMQ 连接。让我详细解释一下这个代码的各个部分:

类的基本结构

public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable

这个类实现了两个接口:

  • IRabbitMQConnectionFactory:自定义的连接工厂接口
  • IDisposable:用于资源清理

构造函数和字段

public RabbitMQConnectionFactory(
    IOptions<RabbitMQOptions> options,
    ILogger<RabbitMQConnectionFactory> logger
)

使用了依赖注入模式,接收:

  • RabbitMQOptions:RabbitMQ 配置选项
  • ILogger:用于日志记录

关键字段:

  • _connection:缓存的 RabbitMQ 连接
  • _lock:信号量,用于线程安全的连接管理
  • _disposed:标记对象是否已释放

核心方法解析

1. GetConnectionAsync() - 连接获取与管理

public async Task<IConnection> GetConnectionAsync()

这个方法实现了单例模式的连接管理:

  1. 双重检查锁定:先检查连接是否可用,避免不必要的锁等待
  2. 线程安全:使用 SemaphoreSlim 确保同时只有一个线程创建连接
  3. 连接重用:如果连接已存在且处于开放状态,直接返回
  4. 优雅关闭:在创建新连接前,正确关闭旧连接

2. CreateChannelAsync() - 通道创建

public async Task<IChannel> CreateChannelAsync()

通过获取连接来创建新的 RabbitMQ 通道,每次调用都会创建新的通道实例。

3. CreateConnectionAsync() - 连接创建

这是私有方法,负责实际的连接创建:

var factory = new ConnectionFactory
{
    HostName = _options.HostName,
    Port = _options.Port,
    // ... 其他配置
};

配置项包括

  • 基本连接信息(主机名、端口、用户名、密码)
  • 超时设置(连接超时、心跳间隔)
  • 自动恢复机制

事件处理

  • ConnectionShutdownAsync:连接关闭时记录日志
  • CallbackExceptionAsync:回调异常时记录错误日志

资源管理

异步释放 (DisposeAsync)

public async ValueTask DisposeAsync()

实现了 IAsyncDisposable 模式:

  • 优雅关闭连接(5秒超时)
  • 释放信号量
  • 防止重复释放

同步释放 (Dispose)

public void Dispose()

提供同步的资源清理,直接释放连接而不等待异步操作。

设计特点

  1. 线程安全:使用信号量保证多线程环境下的安全性
  2. 连接复用:避免频繁创建连接,提高性能
  3. 异常处理:完善的异常捕获和日志记录
  4. 资源管理:正确实现了 Dispose 模式
  5. 配置驱动:通过 Options 模式支持配置注入

这个实现非常适合在微服务架构中作为 RabbitMQ 连接的统一管理组件,既保证了性能又确保了资源的正确释放。

评论加载中...