using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Dpz.Core.MessageQueue.RabbitMQ;

public abstract class RabbitMQConsumerBackgroundServiceBase<TMessage> : BackgroundService
    where TMessage : MessageBase
{
    private readonly IRabbitMQConnectionFactory _connectionFactory;
    private readonly IMessageRoutingConvention _routingConvention;
    private readonly string _exchangeName;
    private readonly string _queueName;
    private readonly string _routingKey;
    private IChannel? _channel;
    private int _isStopping;
    private const int MaxReconnectDelaySeconds = 30;
    private readonly ILogger<RabbitMQConsumerBackgroundServiceBase<TMessage>> _logger;
    private readonly IMessageOutboxStore _outboxStore;

    protected RabbitMQConsumerBackgroundServiceBase(
        IRabbitMQConnectionFactory connectionFactory,
        IMessageRoutingConvention routingConvention,
        IMessageOutboxStore outboxStore,
        ILoggerFactory loggerFactory
    )
    {
        _connectionFactory = connectionFactory;
        _routingConvention = routingConvention;
        _outboxStore = outboxStore;

        _exchangeName = _routingConvention.GetExchangeName<TMessage>();
        _queueName = _routingConvention.GetQueueName<TMessage>();
        _routingKey = _routingConvention.GetRoutingKey<TMessage>();
        _logger = loggerFactory.CreateLogger<RabbitMQConsumerBackgroundServiceBase<TMessage>>();
    }

    protected async Task TryMarkConsumedAsync(string messageId, CancellationToken cancellationToken)
    {
        try
        {
            await _outboxStore.MarkConsumedAsync(messageId, cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "更新Outbox消费状态失败: MessageId={MessageId}", messageId);
        }
    }

    protected async Task TryMarkConsumeFailedAsync(
        string messageId,
        string error,
        CancellationToken cancellationToken
    )
    {
        try
        {
            await _outboxStore.MarkConsumeFailedAsync(messageId, error, cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "更新Outbox消费失败状态失败: MessageId={MessageId}", messageId);
        }
    }

    protected virtual IChannel? ReadChannel() => _channel;

    protected virtual IChannel? ExchangeChannel(IChannel? channel) =>
        Interlocked.Exchange(ref _channel, channel);

    protected abstract Task HandleMessageAsyncCore(
        TMessage message,
        BasicDeliverEventArgs eventArgs,
        IChannel channel,
        CancellationToken cancellationToken
    );

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var reconnectAttempt = 0;

        while (!stoppingToken.IsCancellationRequested && !IsStopping())
        {
            IChannel? channel = null;
            var shutdownSignal = new TaskCompletionSource<bool>(
                TaskCreationOptions.RunContinuationsAsynchronously
            );

            try
            {
                channel = await _connectionFactory.CreateChannelAsync();
                ExchangeChannel(channel);

                channel.ChannelShutdownAsync += (_, args) =>
                {
                    if (!stoppingToken.IsCancellationRequested && !IsStopping())
                    {
                        _logger.LogWarning(
                            "检测到Channel断开,准备重连: Queue={Queue}, ReplyCode={ReplyCode}, Reason={Reason}",
                            _queueName,
                            args.ReplyCode,
                            args.ReplyText
                        );
                    }

                    shutdownSignal.TrySetResult(true);
                    return Task.CompletedTask;
                };

                await channel.ExchangeDeclareAsync(
                    exchange: _exchangeName,
                    type: _routingConvention.GetExchangeType<TMessage>().ToRabbitMQString(),
                    durable: true,
                    autoDelete: false,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                await channel.QueueDeclareAsync(
                    queue: _queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                await channel.QueueBindAsync(
                    queue: _queueName,
                    exchange: _exchangeName,
                    routingKey: _routingKey,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                await channel.BasicQosAsync(
                    prefetchSize: 0,
                    prefetchCount: 1,
                    global: false,
                    cancellationToken: stoppingToken
                );

                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.ReceivedAsync += async (_, eventArgs) =>
                {
                    await MessageHandlerAsync(eventArgs, stoppingToken);
                };

                await channel.BasicConsumeAsync(
                    queue: _queueName,
                    autoAck: false,
                    consumer: consumer,
                    cancellationToken: stoppingToken
                );

                reconnectAttempt = 0;
                _logger.LogInformation(
                    "消息消费者已启动: Queue={Queue}, Exchange={Exchange}, RoutingKey={RoutingKey}",
                    _queueName,
                    _exchangeName,
                    _routingKey
                );

                await WaitForShutdownOrStopAsync(shutdownSignal.Task, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "消费者运行异常,将尝试重连: {Queue}", _queueName);
            }
            finally
            {
                if (ReferenceEquals(ReadChannel(), channel))
                {
                    ExchangeChannel(null);
                }

                if (channel != null)
                {
                    try
                    {
                        await channel.DisposeAsync();
                    }
                    catch
                    {
                        // 忽略清理异常,继续重连流程
                    }
                }
            }

            if (stoppingToken.IsCancellationRequested || IsStopping())
            {
                _logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
                break;
            }

            reconnectAttempt++;
            var delay = GetReconnectDelay(reconnectAttempt);
            _logger.LogWarning(
                "消费者将在{DelaySeconds}s后重连: Queue={Queue}, Attempt={Attempt}",
                delay.TotalSeconds,
                _queueName,
                reconnectAttempt
            );

            try
            {
                await Task.Delay(delay, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
                break;
            }
        }
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        Interlocked.Exchange(ref _isStopping, 1);
        _logger.LogInformation("正在停止消息消费者: {Queue}", _queueName);

        var channel = ExchangeChannel(null);
        if (channel != null)
        {
            try
            {
                using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
                await channel.CloseAsync(cancellationToken: cts.Token);
                _logger.LogInformation("Channel已关闭: {Queue}", _queueName);
            }
            catch (ObjectDisposedException)
            {
                _logger.LogDebug("Channel已被释放,跳过关闭: {Queue}", _queueName);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "关闭Channel时发生异常: {Queue}", _queueName);
            }
            finally
            {
                try
                {
                    await channel.DisposeAsync();
                }
                catch
                {
                    // Stop阶段忽略清理异常,避免影响宿主退出流程
                }
            }
        }

        await base.StopAsync(cancellationToken);
    }

    protected async Task RepublishForRetryAsync(
        TMessage message,
        BasicDeliverEventArgs eventArgs,
        IChannel channel,
        CancellationToken cancellationToken
    )
    {
        var json = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(json);

        var properties = new BasicProperties
        {
            Persistent = true,
            MessageId = message.MessageId,
            Timestamp = new AmqpTimestamp(((DateTimeOffset)DateTime.UtcNow).ToUnixTimeSeconds()),
            ContentType = "application/json",
            DeliveryMode = DeliveryModes.Persistent,
        };

        await channel.BasicPublishAsync(
            exchange: eventArgs.Exchange,
            routingKey: eventArgs.RoutingKey,
            mandatory: false,
            basicProperties: properties,
            body: body,
            cancellationToken: cancellationToken
        );
    }

    public override void Dispose()
    {
        ExchangeChannel(null)?.Dispose();
        base.Dispose();
    }

    protected virtual async Task MessageHandlerAsync(
        BasicDeliverEventArgs eventArgs,
        CancellationToken cancellationToken
    )
    {
        var messageId = eventArgs.BasicProperties.MessageId ?? "Unknown";
        var channel = ReadChannel();

        if (channel == null)
        {
            _logger.LogError("无法处理消息,Channel未初始化: MessageId={MessageId}", messageId);
            return;
        }

        try
        {
            var body = eventArgs.Body.ToArray();
            var json = Encoding.UTF8.GetString(body);
            var message = JsonSerializer.Deserialize<TMessage>(json);

            if (message == null)
            {
                _logger.LogWarning("消息反序列化失败: MessageId={MessageId}", messageId);
                await channel.BasicNackAsync(
                    eventArgs.DeliveryTag,
                    false,
                    false,
                    cancellationToken
                );
                return;
            }

            _logger.LogInformation("开始处理消息: MessageId={MessageId}", message.MessageId);
            await HandleMessageAsyncCore(message, eventArgs, channel, cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理消息时发生异常: MessageId={MessageId}", messageId);

            try
            {
                await channel.BasicNackAsync(eventArgs.DeliveryTag, false, true, cancellationToken);
            }
            catch (Exception nackEx)
            {
                _logger.LogWarning(
                    nackEx,
                    "异常后Nack失败,消息可能由Broker自动重新投递: MessageId={MessageId}",
                    messageId
                );
            }
        }
    }

    private bool IsStopping() => Volatile.Read(ref _isStopping) == 1;

    private static TimeSpan GetReconnectDelay(int attempt)
    {
        var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
        return TimeSpan.FromSeconds(seconds);
    }

    private static async Task WaitForShutdownOrStopAsync(
        Task shutdownTask,
        CancellationToken stoppingToken
    )
    {
        var cancellationTask = Task.Delay(Timeout.Infinite, stoppingToken);
        await Task.WhenAny(shutdownTask, cancellationTask);
        stoppingToken.ThrowIfCancellationRequested();
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于RabbitMQ的消息消费者后台服务的抽象基类,实现了可靠的消息消费和连接管理功能。以下是详细解释:

类的基本结构

public abstract class RabbitMQConsumerBackgroundServiceBase<TMessage> : BackgroundService
    where TMessage : MessageBase
  • 继承自BackgroundService,作为后台服务运行
  • 使用泛型约束,只接受继承自MessageBase的消息类型
  • 是抽象类,需要子类实现具体的消息处理逻辑

核心依赖

  • IRabbitMQConnectionFactory: RabbitMQ连接工厂
  • IMessageRoutingConvention: 消息路由约定(Exchange、Queue、RoutingKey命名规则)
  • IMessageOutboxStore: 消息发件箱存储(用于消息状态跟踪)
  • ILogger: 日志记录

主要功能模块

1. 连接和重连机制

  • 实现指数退避重连策略(最大30秒延迟)
  • 自动检测连接断开并重连
  • 优雅处理连接异常

2. 消息消费设置

// 声明Exchange、Queue并绑定
await channel.ExchangeDeclareAsync(...);
await channel.QueueDeclareAsync(...);
await channel.QueueBindAsync(...);

// 设置QoS为1,确保一次只处理一条消息
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

3. 消息处理流程

  1. 接收消息: 使用AsyncEventingBasicConsumer异步接收
  2. 反序列化: JSON反序列化为强类型消息对象
  3. 业务处理: 调用抽象方法HandleMessageAsyncCore
  4. 确认/拒绝: 根据处理结果进行ACK/NACK操作

4. 状态跟踪

提供了Outbox模式的消息状态管理:

protected async Task TryMarkConsumedAsync(string messageId, CancellationToken cancellationToken)
protected async Task TryMarkConsumeFailedAsync(string messageId, string error, CancellationToken cancellationToken)

5. 重试机制

protected async Task RepublishForRetryAsync(TMessage message, BasicDeliverEventArgs eventArgs, ...)

支持将消息重新发布到队列进行重试。

关键设计特性

线程安全

  • 使用Interlocked.Exchange安全交换Channel引用
  • 使用Volatile.Read检查停止状态

优雅关闭

public override async Task StopAsync(CancellationToken cancellationToken)
  • 设置停止标志
  • 优雅关闭Channel
  • 等待当前消息处理完成

错误处理

  • 消息处理异常时自动NACK并重新入队
  • 反序列化失败时NACK但不重新入队
  • 详细的错误日志记录

使用方式

子类需要实现抽象方法:

protected abstract Task HandleMessageAsyncCore(
    TMessage message,
    BasicDeliverEventArgs eventArgs,
    IChannel channel,
    CancellationToken cancellationToken
);

这个基类提供了一个健壮、可扩展的RabbitMQ消费者框架,处理了连接管理、错误恢复、消息确认等复杂细节,让开发者只需专注于业务逻辑的实现。

评论加载中...