using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace Dpz.Core.MessageQueue.RabbitMQ;
/// <summary>
/// RabbitMQ连接工厂实现
/// </summary>
public class RabbitMQConnectionFactory(
IOptions<RabbitMQOptions> options,
ILogger<RabbitMQConnectionFactory> logger
) : IRabbitMQConnectionFactory, IDisposable
{
private readonly RabbitMQOptions _options = options.Value;
private IConnection? _connection;
private readonly SemaphoreSlim _lock = new(1, 1);
private bool _disposed;
public async Task<IConnection> GetConnectionAsync()
{
if (_connection is { IsOpen: true })
{
return _connection;
}
await _lock.WaitAsync();
try
{
if (_connection is { IsOpen: true })
{
return _connection;
}
if (_connection != null)
{
try
{
// 使用独立的超时令牌,防止立即取消
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _connection.CloseAsync(cts.Token);
}
catch
{
// 忽略关闭异常,继续清理资源
}
finally
{
_connection.Dispose();
}
}
_connection = await CreateConnectionAsync();
return _connection;
}
finally
{
_lock.Release();
}
}
public async Task<IChannel> CreateChannelAsync()
{
var connection = await GetConnectionAsync();
return await connection.CreateChannelAsync();
}
private async Task<IConnection> CreateConnectionAsync()
{
try
{
var factory = new ConnectionFactory
{
HostName = _options.HostName,
Port = _options.Port,
UserName = _options.UserName,
Password = _options.Password,
VirtualHost = _options.VirtualHost,
RequestedConnectionTimeout = TimeSpan.FromSeconds(
_options.RequestedConnectionTimeout
),
RequestedHeartbeat = TimeSpan.FromSeconds(_options.RequestedHeartbeat),
AutomaticRecoveryEnabled = _options.AutomaticRecoveryEnabled,
NetworkRecoveryInterval = TimeSpan.FromSeconds(_options.NetworkRecoveryInterval),
};
var connection = await factory.CreateConnectionAsync();
logger.LogInformation(
"成功连接到RabbitMQ: {HostName}:{Port}",
_options.HostName,
_options.Port
);
connection.ConnectionShutdownAsync += (_, args) =>
{
logger.LogWarning(
"RabbitMQ连接已关闭: {HostName}:{Port} -> {Reason}",
_options.HostName,
_options.Port,
args.ReplyText
);
return Task.CompletedTask;
};
connection.CallbackExceptionAsync += (_, args) =>
{
logger.LogError(args.Exception, "RabbitMQ回调异常");
return Task.CompletedTask;
};
return connection;
}
catch (Exception ex)
{
logger.LogError(
ex,
"连接RabbitMQ失败: {HostName}:{Port}",
_options.HostName,
_options.Port
);
throw;
}
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
await _lock.WaitAsync();
try
{
if (_connection != null)
{
try
{
// 使用独立的超时令牌
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _connection.CloseAsync(cts.Token);
}
catch (Exception ex)
{
logger.LogWarning(ex, "关闭RabbitMQ连接时发生异常");
}
finally
{
_connection.Dispose();
}
}
}
finally
{
_lock.Release();
_lock.Dispose();
_disposed = true;
}
GC.SuppressFinalize(this);
}
public void Dispose()
{
if (_disposed)
{
return;
}
// 同步 Dispose 中不调用异步方法,直接清理资源
_connection?.Dispose();
_lock.Dispose();
_disposed = true;
GC.SuppressFinalize(this);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于管理 RabbitMQ 连接的工厂类实现,主要功能是创建和维护 RabbitMQ 连接。让我详细解释一下这个代码的各个部分:
类的基本结构
public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable
这个类实现了两个接口:
IRabbitMQConnectionFactory:自定义的连接工厂接口IDisposable:用于资源清理
构造函数和字段
public RabbitMQConnectionFactory(
IOptions<RabbitMQOptions> options,
ILogger<RabbitMQConnectionFactory> logger
)
使用了依赖注入模式,接收:
RabbitMQOptions:RabbitMQ 配置选项ILogger:用于日志记录
关键字段:
_connection:缓存的 RabbitMQ 连接_lock:信号量,用于线程安全的连接管理_disposed:标记对象是否已释放
核心方法解析
1. GetConnectionAsync() - 连接获取与管理
public async Task<IConnection> GetConnectionAsync()
这个方法实现了单例模式的连接管理:
- 双重检查锁定:先检查连接是否可用,避免不必要的锁等待
- 线程安全:使用
SemaphoreSlim确保同时只有一个线程创建连接 - 连接重用:如果连接已存在且处于开放状态,直接返回
- 优雅关闭:在创建新连接前,正确关闭旧连接
2. CreateChannelAsync() - 通道创建
public async Task<IChannel> CreateChannelAsync()
通过获取连接来创建新的 RabbitMQ 通道,每次调用都会创建新的通道实例。
3. CreateConnectionAsync() - 连接创建
这是私有方法,负责实际的连接创建:
var factory = new ConnectionFactory
{
HostName = _options.HostName,
Port = _options.Port,
// ... 其他配置
};
配置项包括:
- 基本连接信息(主机名、端口、用户名、密码)
- 超时设置(连接超时、心跳间隔)
- 自动恢复机制
事件处理:
ConnectionShutdownAsync:连接关闭时记录日志CallbackExceptionAsync:回调异常时记录错误日志
资源管理
异步释放 (DisposeAsync)
public async ValueTask DisposeAsync()
实现了 IAsyncDisposable 模式:
- 优雅关闭连接(5秒超时)
- 释放信号量
- 防止重复释放
同步释放 (Dispose)
public void Dispose()
提供同步的资源清理,直接释放连接而不等待异步操作。
设计特点
- 线程安全:使用信号量保证多线程环境下的安全性
- 连接复用:避免频繁创建连接,提高性能
- 异常处理:完善的异常捕获和日志记录
- 资源管理:正确实现了 Dispose 模式
- 配置驱动:通过 Options 模式支持配置注入
这个实现非常适合在微服务架构中作为 RabbitMQ 连接的统一管理组件,既保证了性能又确保了资源的正确释放。
评论加载中...