using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Dpz.Core.MessageQueue.RabbitMQ;
/// <summary>
/// RabbitMQ消息消费者后台服务
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
public class RabbitMQConsumerBackgroundService<TMessage> : BackgroundService
where TMessage : MessageBase
{
private readonly IRabbitMQConnectionFactory _connectionFactory;
private readonly IMessageRoutingConvention _routingConvention;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<RabbitMQConsumerBackgroundService<TMessage>> _logger;
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _routingKey;
private IChannel? _channel;
private const int MaxReconnectDelaySeconds = 30;
public RabbitMQConsumerBackgroundService(
IRabbitMQConnectionFactory connectionFactory,
IMessageRoutingConvention routingConvention,
IServiceProvider serviceProvider,
ILogger<RabbitMQConsumerBackgroundService<TMessage>> logger
)
{
_connectionFactory = connectionFactory;
_routingConvention = routingConvention;
_serviceProvider = serviceProvider;
_logger = logger;
_exchangeName = _routingConvention.GetExchangeName<TMessage>();
_queueName = _routingConvention.GetQueueName<TMessage>();
_routingKey = _routingConvention.GetRoutingKey<TMessage>();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reconnectAttempt = 0;
while (!stoppingToken.IsCancellationRequested)
{
IChannel? channel = null;
var shutdownSignal = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously
);
try
{
channel = await _connectionFactory.CreateChannelAsync();
_channel = channel;
channel.ChannelShutdownAsync += (_, args) =>
{
if (!stoppingToken.IsCancellationRequested)
{
_logger.LogWarning(
"检测到Channel断开,准备重连: Queue={Queue}, ReplyCode={ReplyCode}, Reason={Reason}",
_queueName,
args.ReplyCode,
args.ReplyText
);
}
shutdownSignal.TrySetResult(true);
return Task.CompletedTask;
};
// 声明Exchange
await channel.ExchangeDeclareAsync(
exchange: _exchangeName,
type: _routingConvention.GetExchangeType<TMessage>().ToRabbitMQString(),
durable: true,
autoDelete: false,
arguments: null,
cancellationToken: stoppingToken
);
// 声明Queue
await channel.QueueDeclareAsync(
queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: stoppingToken
);
// 绑定Queue到Exchange
await channel.QueueBindAsync(
queue: _queueName,
exchange: _exchangeName,
routingKey: _routingKey,
arguments: null,
cancellationToken: stoppingToken
);
// 设置预取数量(QoS)
await channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: 1,
global: false,
cancellationToken: stoppingToken
);
// 创建消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, eventArgs) =>
{
await MessageHandlerAsync(eventArgs, stoppingToken);
};
// 开始消费
await channel.BasicConsumeAsync(
queue: _queueName,
autoAck: false,
consumer: consumer,
cancellationToken: stoppingToken
);
reconnectAttempt = 0;
_logger.LogInformation(
"消息消费者已启动: Queue={Queue}, Exchange={Exchange}, RoutingKey={RoutingKey}",
_queueName,
_exchangeName,
_routingKey
);
await WaitForShutdownOrStopAsync(shutdownSignal.Task, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "消费者运行异常,将尝试重连: {Queue}", _queueName);
}
finally
{
if (ReferenceEquals(_channel, channel))
{
_channel = null;
}
if (channel != null)
{
try
{
await channel.DisposeAsync();
}
catch
{
// 忽略清理异常,继续重连流程
}
}
}
reconnectAttempt++;
var delay = GetReconnectDelay(reconnectAttempt);
_logger.LogWarning(
"消费者将在{DelaySeconds}s后重连: Queue={Queue}, Attempt={Attempt}",
delay.TotalSeconds,
_queueName,
reconnectAttempt
);
try
{
await Task.Delay(delay, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
break;
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("正在停止消息消费者: {Queue}", _queueName);
if (_channel != null)
{
try
{
// 使用独立的超时令牌,防止立即取消
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _channel.CloseAsync(cancellationToken: cts.Token);
_logger.LogInformation("Channel已关闭: {Queue}", _queueName);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "关闭Channel时发生异常: {Queue}", _queueName);
}
}
await base.StopAsync(cancellationToken);
}
private async Task MessageHandlerAsync(
BasicDeliverEventArgs eventArgs,
CancellationToken cancellationToken
)
{
var messageId = eventArgs.BasicProperties.MessageId ?? "Unknown";
if (_channel == null)
{
_logger.LogError("无法处理消息,Channel未初始化: MessageId={MessageId}", messageId);
return;
}
try
{
// 反序列化消息
var body = eventArgs.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
var message = JsonSerializer.Deserialize<TMessage>(json);
if (message == null)
{
_logger.LogWarning("消息反序列化失败: MessageId={MessageId}", messageId);
// 拒绝消息,不重新入队
await _channel.BasicNackAsync(
eventArgs.DeliveryTag,
false,
false,
cancellationToken
);
return;
}
_logger.LogInformation("开始处理消息: MessageId={MessageId}", message.MessageId);
// 使用Scope创建Handler实例
using var scope = _serviceProvider.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<IMessageHandler<TMessage>>();
// 处理消息
var success = await handler.HandleAsync(message, cancellationToken);
if (!success)
{
var nextRetryCount = message.RetryCount + 1;
_logger.LogWarning(
"消息处理失败: MessageId={MessageId}, RetryCount={RetryCount}",
message.MessageId,
nextRetryCount
);
// 失败重试通过“重新发布新消息体 + ACK原消息”实现,确保RetryCount可持久传递。
if (nextRetryCount <= 3)
{
message.RetryCount = nextRetryCount;
await RepublishForRetryAsync(message, eventArgs, cancellationToken);
await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
_logger.LogInformation(
"消息已重新发布用于重试: MessageId={MessageId}, RetryCount={RetryCount}",
message.MessageId,
message.RetryCount
);
}
else
{
await _channel.BasicNackAsync(
eventArgs.DeliveryTag,
false,
false,
cancellationToken
);
_logger.LogError(
"消息已达到最大重试次数,将被丢弃: MessageId={MessageId}",
message.MessageId
);
}
return;
}
// 确认消息
await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
_logger.LogInformation("消息处理成功: MessageId={MessageId}", message.MessageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生异常: MessageId={MessageId}", messageId);
if (_channel != null)
{
try
{
// 重新入队
await _channel.BasicNackAsync(
eventArgs.DeliveryTag,
false,
true,
cancellationToken
);
}
catch (Exception nackEx)
{
_logger.LogWarning(
nackEx,
"异常后Nack失败,消息可能由Broker自动重新投递: MessageId={MessageId}",
messageId
);
}
}
}
}
public override void Dispose()
{
// 同步 Dispose 中直接清理,异步关闭在 StopAsync 中完成
_channel?.Dispose();
base.Dispose();
}
private async Task RepublishForRetryAsync(
TMessage message,
BasicDeliverEventArgs eventArgs,
CancellationToken cancellationToken
)
{
if (_channel == null)
{
throw new InvalidOperationException("Channel未初始化,无法重发消息");
}
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var properties = new BasicProperties
{
Persistent = true,
MessageId = message.MessageId,
Timestamp = new AmqpTimestamp(((DateTimeOffset)DateTime.UtcNow).ToUnixTimeSeconds()),
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent,
};
await _channel.BasicPublishAsync(
exchange: eventArgs.Exchange,
routingKey: eventArgs.RoutingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
);
}
private static TimeSpan GetReconnectDelay(int attempt)
{
var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
return TimeSpan.FromSeconds(seconds);
}
private static async Task WaitForShutdownOrStopAsync(
Task shutdownTask,
CancellationToken stoppingToken
)
{
var cancellationTask = Task.Delay(Timeout.Infinite, stoppingToken);
await Task.WhenAny(shutdownTask, cancellationTask);
stoppingToken.ThrowIfCancellationRequested();
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于 RabbitMQ 的消息消费者后台服务实现,它是一个泛型类,用于消费指定类型的消息。让我详细解释这个代码的各个部分:
主要功能
这个类实现了一个可靠的 RabbitMQ 消息消费者,具有以下特性:
- 自动重连机制
- 消息重试处理
- 优雅关闭
- 完整的错误处理
核心组件
1. 依赖注入
private readonly IRabbitMQConnectionFactory _connectionFactory; // 连接工厂
private readonly IMessageRoutingConvention _routingConvention; // 路由约定
private readonly IServiceProvider _serviceProvider; // 服务提供者
private readonly ILogger<RabbitMQConsumerBackgroundService<TMessage>> _logger; // 日志记录器
2. 核心执行方法 ExecuteAsync
连接和重连逻辑:
- 使用无限循环确保服务持续运行
- 实现指数退避重连策略(延迟时间逐步增加)
- 监听 Channel 断开事件,自动触发重连
RabbitMQ 初始化:
// 声明 Exchange(交换机)
await channel.ExchangeDeclareAsync(/*...*/);
// 声明 Queue(队列)
await channel.QueueDeclareAsync(/*...*/);
// 绑定 Queue 到 Exchange
await channel.QueueBindAsync(/*...*/);
// 设置 QoS(服务质量)
await channel.BasicQosAsync(prefetchCount: 1);
3. 消息处理机制 MessageHandlerAsync
消息反序列化:
- 将消息体从字节数组转换为 UTF-8 字符串
- 使用
JsonSerializer反序列化为强类型对象
处理流程:
- 创建服务范围(Scope)获取消息处理器
- 调用处理器处理消息
- 根据处理结果决定后续操作
重试机制:
if (!success) {
var nextRetryCount = message.RetryCount + 1;
if (nextRetryCount <= 3) {
// 重新发布消息用于重试
message.RetryCount = nextRetryCount;
await RepublishForRetryAsync(message, eventArgs, cancellationToken);
await _channel.BasicAckAsync(/*...*/); // 确认原消息
} else {
// 超过最大重试次数,丢弃消息
await _channel.BasicNackAsync(/*...*/);
}
}
4. 重试策略
重连延迟计算:
private static TimeSpan GetReconnectDelay(int attempt)
{
var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
return TimeSpan.FromSeconds(seconds);
}
- 使用指数退避算法
- 最大延迟限制为 30 秒
- 防止过度频繁的重连尝试
5. 优雅关闭 StopAsync
public override async Task StopAsync(CancellationToken cancellationToken)
{
// 优雅关闭 Channel
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _channel.CloseAsync(cancellationToken: cts.Token);
await base.StopAsync(cancellationToken);
}
设计亮点
- 可靠性:完整的错误处理和重连机制
- 可扩展性:泛型设计支持任意消息类型
- 资源管理:正确的异步资源清理
- 监控友好:详细的日志记录
- 性能优化:使用服务范围避免内存泄漏
使用场景
这个类适用于需要可靠消息处理的场景,如:
- 订单处理系统
- 事件驱动架构
- 微服务间通信
- 异步任务处理
整体上,这是一个生产级别的消息消费者实现,具备了企业级应用所需的可靠性和健壮性特性。
评论加载中...