using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using ExchangeType = Dpz.Core.MessageQueue.Enums.ExchangeType;
namespace Dpz.Core.MessageQueue.RabbitMQ;
/// <summary>
/// RabbitMQ消息发布者实现
/// 队列发布频繁可以改为 Channel 池 或者 AsyncLocal<IChannel>线程本地化
/// 不必过早优化
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
public class RabbitMQPublisher<TMessage>(
IRabbitMQConnectionFactory connectionFactory,
IMessageRoutingConvention routingConvention,
ILogger<RabbitMQPublisher<TMessage>> logger
) : IMessagePublisher<TMessage>, IAsyncDisposable
where TMessage : MessageBase
{
private readonly string _exchangeName = routingConvention.GetExchangeName<TMessage>();
private readonly ExchangeType _exchangeType = routingConvention.GetExchangeType<TMessage>();
private readonly string _defaultRoutingKey = routingConvention.GetRoutingKey<TMessage>();
private readonly string _queueName = routingConvention.GetQueueName<TMessage>();
// 并发控制:保护 Channel 的创建和基础设施初始化
private readonly SemaphoreSlim _initLock = new(1, 1);
private bool _infrastructureInitialized;
// Channel 复用(单例模式下安全,通过 SemaphoreSlim 保护并发访问)
private IChannel? _channel;
private readonly SemaphoreSlim _channelLock = new(1, 1);
public async Task PublishAsync(
TMessage message,
string? routingKey = null,
CancellationToken cancellationToken = default
)
{
// 首次发布前确保 Exchange 和 Queue 都已创建(防止消息丢失)
await EnsureInfrastructureAsync();
// 获取复用的 Channel
await _channelLock.WaitAsync(cancellationToken);
try
{
var channel = await GetOrCreateChannelAsync();
// 序列化消息
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
// 设置消息属性
var properties = new BasicProperties
{
Persistent = true,
MessageId = message.MessageId,
Timestamp = new AmqpTimestamp(
((DateTimeOffset)message.Timestamp).ToUnixTimeSeconds()
),
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent,
};
// 发布消息(mandatory=true 启用路由失败检测)
var actualRoutingKey = routingKey ?? _defaultRoutingKey;
await channel.BasicPublishAsync(
exchange: _exchangeName,
routingKey: actualRoutingKey,
mandatory: true,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
);
logger.LogInformation(
"消息已发布: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
message.MessageId,
_exchangeName,
actualRoutingKey
);
}
catch (Exception ex)
{
logger.LogError(
ex,
"发布消息失败: MessageId={MessageId}, Exchange={Exchange}, RoutingKey={RoutingKey}",
message.MessageId,
_exchangeName,
routingKey ?? _defaultRoutingKey
);
// 发生异常时清理 Channel,下次重新创建
await CleanupChannelAsync();
throw;
}
finally
{
_channelLock.Release();
}
}
public async Task PublishBatchAsync(
IEnumerable<TMessage> messages,
string? routingKey = null,
CancellationToken cancellationToken = default
)
{
var messageList = messages.ToList();
if (messageList.Count == 0)
{
return;
}
// 首次发布前确保 Exchange 和 Queue 都已创建(防止消息丢失)
await EnsureInfrastructureAsync();
// 获取复用的 Channel
await _channelLock.WaitAsync(cancellationToken);
try
{
var channel = await GetOrCreateChannelAsync();
var actualRoutingKey = routingKey ?? _defaultRoutingKey;
// 批量发布所有消息
// 1. 复用 Channel,减少连接开销
// 2. 在同一个锁内批量发送,保证顺序性和原子性
// 3. 消息持久化(Persistent=true)保证可靠性
foreach (var message in messageList)
{
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var properties = new BasicProperties
{
Persistent = true,
MessageId = message.MessageId,
Timestamp = new AmqpTimestamp(
((DateTimeOffset)message.Timestamp).ToUnixTimeSeconds()
),
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent,
};
await channel.BasicPublishAsync(
exchange: _exchangeName,
routingKey: actualRoutingKey,
mandatory: true,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
);
}
logger.LogInformation(
"批量发布 {Count} 条消息成功: Exchange={Exchange}, RoutingKey={RoutingKey}",
messageList.Count,
_exchangeName,
actualRoutingKey
);
}
catch (Exception ex)
{
logger.LogError(ex, "批量发布消息失败: 尝试发布 {Count} 条消息", messageList.Count);
// 发生异常时清理 Channel,下次重新创建
await CleanupChannelAsync();
throw;
}
finally
{
_channelLock.Release();
}
}
/// <summary>
/// 确保 Exchange、Queue 和 Binding 已创建(线程安全,双重检查锁定)
/// 这样可以确保即使消费者未启动,消息也能正确路由到队列
/// </summary>
private async Task EnsureInfrastructureAsync()
{
// 第一次检查:避免不必要的锁竞争
if (_infrastructureInitialized)
{
return;
}
await _initLock.WaitAsync();
try
{
// 第二次检查:防止重复创建
if (_infrastructureInitialized)
{
return;
}
var channel = await connectionFactory.CreateChannelAsync();
await using var _ = channel.ConfigureAwait(false);
// 1. 声明Exchange(幂等操作)
await channel.ExchangeDeclareAsync(
exchange: _exchangeName,
type: _exchangeType.ToRabbitMQString(),
durable: true,
autoDelete: false,
arguments: null
);
// 2. 声明Queue(幂等操作)
await channel.QueueDeclareAsync(
queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
// 3. 绑定Queue到Exchange(幂等操作)
await channel.QueueBindAsync(
queue: _queueName,
exchange: _exchangeName,
routingKey: _defaultRoutingKey,
arguments: null
);
_infrastructureInitialized = true;
logger.LogInformation(
"RabbitMQ基础设施已就绪: Exchange={Exchange}, Queue={Queue}, RoutingKey={RoutingKey}, Type={Type}",
_exchangeName,
_queueName,
_defaultRoutingKey,
_exchangeType
);
}
catch (Exception ex)
{
logger.LogError(
ex,
"创建RabbitMQ基础设施失败: Exchange={Exchange}, Queue={Queue}",
_exchangeName,
_queueName
);
throw;
}
finally
{
_initLock.Release();
}
}
/// <summary>
/// 获取或创建复用的 Channel
/// 注意:调用此方法前必须已获取 _channelLock
/// </summary>
private async Task<IChannel> GetOrCreateChannelAsync()
{
// 如果 Channel 已存在且可用,直接返回
if (_channel is { IsOpen: true })
{
return _channel;
}
// 清理旧的 Channel
if (_channel != null)
{
try
{
await _channel.CloseAsync();
await _channel.DisposeAsync();
}
catch
{
// 忽略清理异常
}
}
// 创建新的 Channel
_channel = await connectionFactory.CreateChannelAsync();
logger.LogDebug("已创建新的 Channel: Exchange={Exchange}", _exchangeName);
return _channel;
}
/// <summary>
/// 清理 Channel(在发生异常时调用)
/// 调用此方法时无需持有 _channelLock
/// </summary>
private async Task CleanupChannelAsync()
{
if (_channel == null)
{
return;
}
try
{
await _channel.CloseAsync();
await _channel.DisposeAsync();
}
catch (Exception ex)
{
logger.LogWarning(ex, "清理 Channel 时发生异常");
}
finally
{
_channel = null;
}
}
/// <summary>
/// 释放资源
/// </summary>
public async ValueTask DisposeAsync()
{
await CleanupChannelAsync();
_initLock.Dispose();
_channelLock.Dispose();
GC.SuppressFinalize(this);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于RabbitMQ的消息发布者实现,采用了泛型设计和现代C#异步模式。让我详细解释一下这个代码的各个部分:
类的整体设计
public class RabbitMQPublisher<TMessage> : IMessagePublisher<TMessage>, IAsyncDisposable
where TMessage : MessageBase
- 泛型约束:
TMessage必须继承自MessageBase,确保类型安全 - 接口实现:实现了
IMessagePublisher<TMessage>和IAsyncDisposable - 依赖注入:通过构造函数注入所需的依赖项
核心字段和初始化
private readonly string _exchangeName = routingConvention.GetExchangeName<TMessage>();
private readonly ExchangeType _exchangeType = routingConvention.GetExchangeType<TMessage>();
private readonly string _defaultRoutingKey = routingConvention.GetRoutingKey<TMessage>();
private readonly string _queueName = routingConvention.GetQueueName<TMessage>();
这些字段在构造时就确定了消息路由的基本配置,避免了运行时的重复计算。
并发控制机制
代码使用了两个SemaphoreSlim来处理并发:
_initLock:保护基础设施(Exchange、Queue)的初始化_channelLock:保护Channel的创建和使用
这种设计确保了线程安全,同时避免了过度的锁竞争。
核心功能分析
1. 单消息发布 (PublishAsync)
public async Task PublishAsync(TMessage message, string? routingKey = null, CancellationToken cancellationToken = default)
关键特性:
- 首次调用时自动创建基础设施
- Channel复用减少连接开销
- 消息持久化保证可靠性
- 异常时自动清理Channel
2. 批量消息发布 (PublishBatchAsync)
public async Task PublishBatchAsync(IEnumerable<TMessage> messages, string? routingKey = null, CancellationToken cancellationToken = default)
优势:
- 在同一个锁内批量发送,保证顺序性
- 复用Channel减少网络开销
- 原子性操作,要么全部成功要么全部失败
3. 基础设施管理 (EnsureInfrastructureAsync)
使用双重检查锁定模式:
if (_infrastructureInitialized) return; // 第一次检查
await _initLock.WaitAsync();
if (_infrastructureInitialized) return; // 第二次检查
这确保了:
- Exchange、Queue和Binding的幂等创建
- 即使消费者未启动,消息也能正确路由
- 避免重复初始化的性能损耗
4. Channel管理
Channel复用策略:
- 单例模式下安全的Channel复用
- 异常时自动重建Channel
- 资源清理和异常处理
可靠性保证
消息持久化:
var properties = new BasicProperties { Persistent = true, DeliveryMode = DeliveryModes.Persistent, };路由失败检测:
await channel.BasicPublishAsync( mandatory: true, // 启用路由失败检测 // ... );异常处理:完整的异常捕获、日志记录和资源清理
性能优化
- Channel复用:避免频繁创建/销毁连接
- 懒加载:基础设施按需创建
- 批量操作:减少网络往返次数
- 并发控制:最小化锁竞争
资源管理
实现了IAsyncDisposable模式:
public async ValueTask DisposeAsync()
{
await CleanupChannelAsync();
_initLock.Dispose();
_channelLock.Dispose();
GC.SuppressFinalize(this);
}
确保所有资源都能正确释放。
总结
这是一个设计良好的消息发布者实现,具有以下优点:
- 高可靠性:消息持久化、异常处理、资源清理
- 高性能:Channel复用、批量操作、并发优化
- 线程安全:完善的并发控制机制
- 易维护:清晰的代码结构和丰富的日志
- 可扩展:泛型设计支持不同消息类型
代码注释中提到"不必过早优化",但实际上已经实现了很多重要的优化,为生产环境使用做好了准备。
评论加载中...