using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Dpz.Core.MessageQueue.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Dpz.Core.MessageQueue.RabbitMQ;
/// <summary>
/// RabbitMQ消息消费者后台服务(支持返回结果)
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
/// <typeparam name="TResult">返回结果类型</typeparam>
public class RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult> : BackgroundService
where TResult : MessageHandlerResult
where TMessage : MessageBase
{
private readonly IRabbitMQConnectionFactory _connectionFactory;
private readonly IMessageRoutingConvention _routingConvention;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<
RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult>
> _logger;
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _routingKey;
private IChannel? _channel;
private const int MaxReconnectDelaySeconds = 30;
public RabbitMQConsumerBackgroundServiceWithResult(
IRabbitMQConnectionFactory connectionFactory,
IMessageRoutingConvention routingConvention,
IServiceProvider serviceProvider,
ILogger<RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult>> logger
)
{
_connectionFactory = connectionFactory;
_routingConvention = routingConvention;
_serviceProvider = serviceProvider;
_logger = logger;
_exchangeName = _routingConvention.GetExchangeName<TMessage>();
_queueName = _routingConvention.GetQueueName<TMessage>();
_routingKey = _routingConvention.GetRoutingKey<TMessage>();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reconnectAttempt = 0;
while (!stoppingToken.IsCancellationRequested)
{
IChannel? channel = null;
var shutdownSignal = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously
);
try
{
channel = await _connectionFactory.CreateChannelAsync();
_channel = channel;
channel.ChannelShutdownAsync += (_, args) =>
{
if (!stoppingToken.IsCancellationRequested)
{
_logger.LogWarning(
"检测到Channel断开,准备重连: Queue={Queue}, ReplyCode={ReplyCode}, Reason={Reason}",
_queueName,
args.ReplyCode,
args.ReplyText
);
}
shutdownSignal.TrySetResult(true);
return Task.CompletedTask;
};
// 声明Exchange
await channel.ExchangeDeclareAsync(
exchange: _exchangeName,
type: _routingConvention.GetExchangeType<TMessage>().ToRabbitMQString(),
durable: true,
autoDelete: false,
arguments: null,
cancellationToken: stoppingToken
);
// 声明Queue
await channel.QueueDeclareAsync(
queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: stoppingToken
);
// 绑定Queue到Exchange
await channel.QueueBindAsync(
queue: _queueName,
exchange: _exchangeName,
routingKey: _routingKey,
arguments: null,
cancellationToken: stoppingToken
);
// 设置预取数量(QoS)
await channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: 1,
global: false,
cancellationToken: stoppingToken
);
// 创建消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, eventArgs) =>
{
await MessageHandlerAsync(eventArgs, stoppingToken);
};
// 开始消费
await channel.BasicConsumeAsync(
queue: _queueName,
autoAck: false,
consumer: consumer,
cancellationToken: stoppingToken
);
reconnectAttempt = 0;
_logger.LogInformation(
"消息消费者已启动(带返回结果): Queue={Queue}, Exchange={Exchange}, RoutingKey={RoutingKey}, ResultType={ResultType}",
_queueName,
_exchangeName,
_routingKey,
typeof(TResult).Name
);
await WaitForShutdownOrStopAsync(shutdownSignal.Task, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "消费者运行异常,将尝试重连: {Queue}", _queueName);
}
finally
{
if (ReferenceEquals(_channel, channel))
{
_channel = null;
}
if (channel != null)
{
try
{
await channel.DisposeAsync();
}
catch
{
// 忽略清理异常,继续重连流程
}
}
}
reconnectAttempt++;
var delay = GetReconnectDelay(reconnectAttempt);
_logger.LogWarning(
"消费者将在{DelaySeconds}s后重连: Queue={Queue}, Attempt={Attempt}",
delay.TotalSeconds,
_queueName,
reconnectAttempt
);
try
{
await Task.Delay(delay, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
break;
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("正在优雅停止消息消费者: {Queue}", _queueName);
if (_channel != null)
{
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _channel.CloseAsync(cancellationToken: cts.Token);
_logger.LogInformation("Channel已优雅关闭: {Queue}", _queueName);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "关闭Channel时发生异常: {Queue}", _queueName);
}
}
await base.StopAsync(cancellationToken);
}
private async Task MessageHandlerAsync(
BasicDeliverEventArgs eventArgs,
CancellationToken cancellationToken
)
{
var messageId = eventArgs.BasicProperties.MessageId ?? "Unknown";
if (_channel == null)
{
_logger.LogError("无法处理消息,Channel未初始化: MessageId={MessageId}", messageId);
return;
}
try
{
// 反序列化消息
var body = eventArgs.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
var message = JsonSerializer.Deserialize<TMessage>(json);
if (message == null)
{
_logger.LogWarning("消息反序列化失败: MessageId={MessageId}", messageId);
await _channel.BasicNackAsync(
eventArgs.DeliveryTag,
false,
false,
cancellationToken
);
return;
}
_logger.LogInformation("开始处理消息: MessageId={MessageId}", message.MessageId);
// 使用Scope创建Handler实例
using var scope = _serviceProvider.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<
IMessageHandler<TMessage, TResult>
>();
// 处理消息并获取结果
var result = await handler.HandleAsync(message, cancellationToken);
TryExtractWrappedResult(result, out var wrappedResult);
await HandleWrappedResultAsync(message, wrappedResult, eventArgs, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生异常: MessageId={MessageId}", messageId);
if (_channel != null)
{
try
{
await _channel.BasicNackAsync(
eventArgs.DeliveryTag,
false,
true,
cancellationToken
);
}
catch (Exception nackEx)
{
_logger.LogWarning(
nackEx,
"异常后Nack失败,消息可能由Broker自动重新投递: MessageId={MessageId}",
messageId
);
}
}
}
}
private async Task HandleWrappedResultAsync(
TMessage message,
WrappedResult result,
BasicDeliverEventArgs eventArgs,
CancellationToken cancellationToken
)
{
if (_channel == null)
{
_logger.LogError(
"无法处理消息结果,Channel未初始化: MessageId={MessageId}",
message.MessageId
);
return;
}
if (result.Success)
{
// 成功:记录结果并确认消息
_logger.LogInformation(
"消息处理成功: MessageId={MessageId}, ResultType={ResultType}, Result={Result}",
message.MessageId,
typeof(TResult).Name,
JsonSerializer.Serialize(result.Data)
);
if (result.Metadata != null && result.Metadata.Count > 0)
{
_logger.LogDebug(
"消息处理元数据: MessageId={MessageId}, Metadata={@Metadata}",
message.MessageId,
result.Metadata
);
}
await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
}
else
{
var nextRetryCount = message.RetryCount + 1;
// 失败:通过重发消息体持久化重试次数,避免requeue丢失内存中的RetryCount
_logger.LogWarning(
"消息处理失败: MessageId={MessageId}, RetryCount={RetryCount}, Error={Error}",
message.MessageId,
nextRetryCount,
result.ErrorMessage
);
if (nextRetryCount <= 3)
{
message.RetryCount = nextRetryCount;
await RepublishForRetryAsync(message, eventArgs, cancellationToken);
await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
_logger.LogInformation(
"消息已重新发布用于重试: MessageId={MessageId}, RetryCount={RetryCount}",
message.MessageId,
message.RetryCount
);
}
else
{
await _channel.BasicNackAsync(
eventArgs.DeliveryTag,
false,
false,
cancellationToken
);
_logger.LogError(
"消息已达到最大重试次数,将被丢弃: MessageId={MessageId}, FinalError={Error}",
message.MessageId,
result.ErrorMessage
);
}
}
}
public override void Dispose()
{
_channel?.Dispose();
base.Dispose();
}
private static void TryExtractWrappedResult(TResult result, out WrappedResult wrappedResult)
{
var data = PropertyAccessor
.GetGetter<TResult>(nameof(MessageHandlerResult<>.Data))
?.Invoke(result);
wrappedResult = new WrappedResult
{
Success = result.Success,
Data = data,
ErrorMessage = result.ErrorMessage,
Metadata = result.Metadata,
};
}
private async Task RepublishForRetryAsync(
TMessage message,
BasicDeliverEventArgs eventArgs,
CancellationToken cancellationToken
)
{
if (_channel == null)
{
throw new InvalidOperationException("Channel未初始化,无法重发消息");
}
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var properties = new BasicProperties
{
Persistent = true,
MessageId = message.MessageId,
Timestamp = new AmqpTimestamp(((DateTimeOffset)DateTime.UtcNow).ToUnixTimeSeconds()),
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent,
};
await _channel.BasicPublishAsync(
exchange: eventArgs.Exchange,
routingKey: eventArgs.RoutingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
);
}
private struct WrappedResult
{
public bool Success { get; set; }
public object? Data { get; set; }
public string? ErrorMessage { get; set; }
public Dictionary<string, object>? Metadata { get; set; }
}
private static TimeSpan GetReconnectDelay(int attempt)
{
var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
return TimeSpan.FromSeconds(seconds);
}
private static async Task WaitForShutdownOrStopAsync(
Task shutdownTask,
CancellationToken stoppingToken
)
{
var cancellationTask = Task.Delay(Timeout.Infinite, stoppingToken);
await Task.WhenAny(shutdownTask, cancellationTask);
stoppingToken.ThrowIfCancellationRequested();
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于RabbitMQ的消息消费者后台服务,支持返回处理结果。让我详细解释一下这个代码的功能和实现:
主要功能
这个类实现了一个RabbitMQ消息消费者,具有以下特点:
- 泛型设计:支持自定义消息类型(TMessage)和返回结果类型(TResult)
- 后台服务:继承自
BackgroundService,作为长期运行的后台任务 - 自动重连:当连接断开时自动重连,支持指数退避策略
- 消息重试:支持消息处理失败后的重试机制(最多3次)
- 优雅停机:支持优雅地停止服务
核心组件
1. 依赖注入
private readonly IRabbitMQConnectionFactory _connectionFactory; // RabbitMQ连接工厂
private readonly IMessageRoutingConvention _routingConvention; // 路由约定(决定交换器、队列名等)
private readonly IServiceProvider _serviceProvider; // 服务提供者(用于创建消息处理器)
private readonly ILogger _logger; // 日志记录器
2. 主要执行流程(ExecuteAsync方法)
连接管理:
- 创建RabbitMQ通道
- 监听通道断开事件,触发重连
- 声明交换器(Exchange)、队列(Queue)并建立绑定关系
消息消费设置:
- 设置QoS(每次只预取1条消息)
- 创建异步消费者
- 开始消费消息
错误处理和重连:
- 捕获异常并记录日志
- 实现指数退避重连策略(最长30秒)
3. 消息处理流程(MessageHandlerAsync方法)
消息反序列化:
var body = eventArgs.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
var message = JsonSerializer.Deserialize<TMessage>(json);
处理器调用:
using var scope = _serviceProvider.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<IMessageHandler<TMessage, TResult>>();
var result = await handler.HandleAsync(message, cancellationToken);
结果处理:
- 成功:记录日志并ACK消息
- 失败:检查重试次数
- 未超过3次:重新发布消息进行重试
- 超过3次:NACK消息并丢弃
4. 重试机制
当消息处理失败时:
- 增加重试计数器
- 重新发布消息到同一队列
- ACK原消息(避免重复处理)
- 新消息包含更新的重试计数
if (nextRetryCount <= 3)
{
message.RetryCount = nextRetryCount;
await RepublishForRetryAsync(message, eventArgs, cancellationToken);
await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
}
5. 优雅停机(StopAsync方法)
- 尝试优雅关闭RabbitMQ通道
- 设置5秒超时防止无限等待
- 调用基类的停止方法
设计特点
可靠性:
- 自动重连机制
- 消息持久化
- 错误处理和重试
可扩展性:
- 泛型设计支持不同消息类型
- 依赖注入支持自定义组件
可观测性:
- 详细的日志记录
- 包含消息ID、重试次数等关键信息
性能优化:
- 使用Scope避免内存泄漏
- QoS设置控制并发数量
这个实现为消息队列处理提供了一个健壮、可靠且功能完整的解决方案。
评论加载中...