using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Dpz.Core.MessageQueue.RabbitMQ;
public abstract class RabbitMQConsumerBackgroundServiceBase<TMessage> : BackgroundService
where TMessage : MessageBase
{
private readonly IRabbitMQConnectionFactory _connectionFactory;
private readonly IMessageRoutingConvention _routingConvention;
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _routingKey;
private IChannel? _channel;
private int _isStopping;
private const int MaxReconnectDelaySeconds = 30;
private readonly ILogger<RabbitMQConsumerBackgroundServiceBase<TMessage>> _logger;
private readonly IMessageOutboxStore _outboxStore;
protected RabbitMQConsumerBackgroundServiceBase(
IRabbitMQConnectionFactory connectionFactory,
IMessageRoutingConvention routingConvention,
IMessageOutboxStore outboxStore,
ILoggerFactory loggerFactory
)
{
_connectionFactory = connectionFactory;
_routingConvention = routingConvention;
_outboxStore = outboxStore;
_exchangeName = _routingConvention.GetExchangeName<TMessage>();
_queueName = _routingConvention.GetQueueName<TMessage>();
_routingKey = _routingConvention.GetRoutingKey<TMessage>();
_logger = loggerFactory.CreateLogger<RabbitMQConsumerBackgroundServiceBase<TMessage>>();
}
protected async Task TryMarkConsumedAsync(string messageId, CancellationToken cancellationToken)
{
try
{
await _outboxStore.MarkConsumedAsync(messageId, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "更新Outbox消费状态失败: MessageId={MessageId}", messageId);
}
}
protected async Task TryMarkConsumeFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken
)
{
try
{
await _outboxStore.MarkConsumeFailedAsync(messageId, error, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "更新Outbox消费失败状态失败: MessageId={MessageId}", messageId);
}
}
protected virtual IChannel? ReadChannel() => _channel;
protected virtual IChannel? ExchangeChannel(IChannel? channel) =>
Interlocked.Exchange(ref _channel, channel);
protected abstract Task HandleMessageAsyncCore(
TMessage message,
BasicDeliverEventArgs eventArgs,
IChannel channel,
CancellationToken cancellationToken
);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reconnectAttempt = 0;
while (!stoppingToken.IsCancellationRequested && !IsStopping())
{
IChannel? channel = null;
var shutdownSignal = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously
);
try
{
channel = await _connectionFactory.CreateChannelAsync();
ExchangeChannel(channel);
channel.ChannelShutdownAsync += (_, args) =>
{
if (!stoppingToken.IsCancellationRequested && !IsStopping())
{
_logger.LogWarning(
"检测到Channel断开,准备重连: Queue={Queue}, ReplyCode={ReplyCode}, Reason={Reason}",
_queueName,
args.ReplyCode,
args.ReplyText
);
}
shutdownSignal.TrySetResult(true);
return Task.CompletedTask;
};
await channel.ExchangeDeclareAsync(
exchange: _exchangeName,
type: _routingConvention.GetExchangeType<TMessage>().ToRabbitMQString(),
durable: true,
autoDelete: false,
arguments: null,
cancellationToken: stoppingToken
);
await channel.QueueDeclareAsync(
queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: stoppingToken
);
await channel.QueueBindAsync(
queue: _queueName,
exchange: _exchangeName,
routingKey: _routingKey,
arguments: null,
cancellationToken: stoppingToken
);
await channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: 1,
global: false,
cancellationToken: stoppingToken
);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, eventArgs) =>
{
await MessageHandlerAsync(eventArgs, stoppingToken);
};
await channel.BasicConsumeAsync(
queue: _queueName,
autoAck: false,
consumer: consumer,
cancellationToken: stoppingToken
);
reconnectAttempt = 0;
_logger.LogInformation(
"消息消费者已启动: Queue={Queue}, Exchange={Exchange}, RoutingKey={RoutingKey}",
_queueName,
_exchangeName,
_routingKey
);
await WaitForShutdownOrStopAsync(shutdownSignal.Task, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "消费者运行异常,将尝试重连: {Queue}", _queueName);
}
finally
{
if (ReferenceEquals(ReadChannel(), channel))
{
ExchangeChannel(null);
}
if (channel != null)
{
try
{
await channel.DisposeAsync();
}
catch
{
// 忽略清理异常,继续重连流程
}
}
}
if (stoppingToken.IsCancellationRequested || IsStopping())
{
_logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
break;
}
reconnectAttempt++;
var delay = GetReconnectDelay(reconnectAttempt);
_logger.LogWarning(
"消费者将在{DelaySeconds}s后重连: Queue={Queue}, Attempt={Attempt}",
delay.TotalSeconds,
_queueName,
reconnectAttempt
);
try
{
await Task.Delay(delay, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
break;
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
Interlocked.Exchange(ref _isStopping, 1);
_logger.LogInformation("正在停止消息消费者: {Queue}", _queueName);
var channel = ExchangeChannel(null);
if (channel != null)
{
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await channel.CloseAsync(cancellationToken: cts.Token);
_logger.LogInformation("Channel已关闭: {Queue}", _queueName);
}
catch (ObjectDisposedException)
{
_logger.LogDebug("Channel已被释放,跳过关闭: {Queue}", _queueName);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "关闭Channel时发生异常: {Queue}", _queueName);
}
finally
{
try
{
await channel.DisposeAsync();
}
catch
{
// Stop阶段忽略清理异常,避免影响宿主退出流程
}
}
}
await base.StopAsync(cancellationToken);
}
protected async Task RepublishForRetryAsync(
TMessage message,
BasicDeliverEventArgs eventArgs,
IChannel channel,
CancellationToken cancellationToken
)
{
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var properties = new BasicProperties
{
Persistent = true,
MessageId = message.MessageId,
Timestamp = new AmqpTimestamp(((DateTimeOffset)DateTime.UtcNow).ToUnixTimeSeconds()),
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent,
};
await channel.BasicPublishAsync(
exchange: eventArgs.Exchange,
routingKey: eventArgs.RoutingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
);
}
public override void Dispose()
{
ExchangeChannel(null)?.Dispose();
base.Dispose();
}
protected virtual async Task MessageHandlerAsync(
BasicDeliverEventArgs eventArgs,
CancellationToken cancellationToken
)
{
var messageId = eventArgs.BasicProperties.MessageId ?? "Unknown";
var channel = ReadChannel();
if (channel == null)
{
_logger.LogError("无法处理消息,Channel未初始化: MessageId={MessageId}", messageId);
return;
}
try
{
var body = eventArgs.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
var message = JsonSerializer.Deserialize<TMessage>(json);
if (message == null)
{
_logger.LogWarning("消息反序列化失败: MessageId={MessageId}", messageId);
await channel.BasicNackAsync(
eventArgs.DeliveryTag,
false,
false,
cancellationToken
);
return;
}
_logger.LogInformation("开始处理消息: MessageId={MessageId}", message.MessageId);
await HandleMessageAsyncCore(message, eventArgs, channel, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生异常: MessageId={MessageId}", messageId);
try
{
await channel.BasicNackAsync(eventArgs.DeliveryTag, false, true, cancellationToken);
}
catch (Exception nackEx)
{
_logger.LogWarning(
nackEx,
"异常后Nack失败,消息可能由Broker自动重新投递: MessageId={MessageId}",
messageId
);
}
}
}
private bool IsStopping() => Volatile.Read(ref _isStopping) == 1;
private static TimeSpan GetReconnectDelay(int attempt)
{
var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
return TimeSpan.FromSeconds(seconds);
}
private static async Task WaitForShutdownOrStopAsync(
Task shutdownTask,
CancellationToken stoppingToken
)
{
var cancellationTask = Task.Delay(Timeout.Infinite, stoppingToken);
await Task.WhenAny(shutdownTask, cancellationTask);
stoppingToken.ThrowIfCancellationRequested();
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于RabbitMQ的消息消费者后台服务的抽象基类,实现了可靠的消息消费和连接管理功能。以下是详细解释:
类的基本结构
public abstract class RabbitMQConsumerBackgroundServiceBase<TMessage> : BackgroundService
where TMessage : MessageBase
- 继承自
BackgroundService,作为后台服务运行 - 使用泛型约束,只接受继承自
MessageBase的消息类型 - 是抽象类,需要子类实现具体的消息处理逻辑
核心依赖
- IRabbitMQConnectionFactory: RabbitMQ连接工厂
- IMessageRoutingConvention: 消息路由约定(Exchange、Queue、RoutingKey命名规则)
- IMessageOutboxStore: 消息发件箱存储(用于消息状态跟踪)
- ILogger: 日志记录
主要功能模块
1. 连接和重连机制
- 实现指数退避重连策略(最大30秒延迟)
- 自动检测连接断开并重连
- 优雅处理连接异常
2. 消息消费设置
// 声明Exchange、Queue并绑定
await channel.ExchangeDeclareAsync(...);
await channel.QueueDeclareAsync(...);
await channel.QueueBindAsync(...);
// 设置QoS为1,确保一次只处理一条消息
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
3. 消息处理流程
- 接收消息: 使用
AsyncEventingBasicConsumer异步接收 - 反序列化: JSON反序列化为强类型消息对象
- 业务处理: 调用抽象方法
HandleMessageAsyncCore - 确认/拒绝: 根据处理结果进行ACK/NACK操作
4. 状态跟踪
提供了Outbox模式的消息状态管理:
protected async Task TryMarkConsumedAsync(string messageId, CancellationToken cancellationToken)
protected async Task TryMarkConsumeFailedAsync(string messageId, string error, CancellationToken cancellationToken)
5. 重试机制
protected async Task RepublishForRetryAsync(TMessage message, BasicDeliverEventArgs eventArgs, ...)
支持将消息重新发布到队列进行重试。
关键设计特性
线程安全
- 使用
Interlocked.Exchange安全交换Channel引用 - 使用
Volatile.Read检查停止状态
优雅关闭
public override async Task StopAsync(CancellationToken cancellationToken)
- 设置停止标志
- 优雅关闭Channel
- 等待当前消息处理完成
错误处理
- 消息处理异常时自动NACK并重新入队
- 反序列化失败时NACK但不重新入队
- 详细的错误日志记录
使用方式
子类需要实现抽象方法:
protected abstract Task HandleMessageAsyncCore(
TMessage message,
BasicDeliverEventArgs eventArgs,
IChannel channel,
CancellationToken cancellationToken
);
这个基类提供了一个健壮、可扩展的RabbitMQ消费者框架,处理了连接管理、错误恢复、消息确认等复杂细节,让开发者只需专注于业务逻辑的实现。
评论加载中...