using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Dpz.Core.MessageQueue.RabbitMQ;

/// <summary>
/// RabbitMQ消息消费者后台服务
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
public class RabbitMQConsumerBackgroundService<TMessage> : BackgroundService
    where TMessage : MessageBase
{
    private readonly IRabbitMQConnectionFactory _connectionFactory;
    private readonly IMessageRoutingConvention _routingConvention;
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<RabbitMQConsumerBackgroundService<TMessage>> _logger;
    private readonly string _exchangeName;
    private readonly string _queueName;
    private readonly string _routingKey;
    private IChannel? _channel;
    private const int MaxReconnectDelaySeconds = 30;

    public RabbitMQConsumerBackgroundService(
        IRabbitMQConnectionFactory connectionFactory,
        IMessageRoutingConvention routingConvention,
        IServiceProvider serviceProvider,
        ILogger<RabbitMQConsumerBackgroundService<TMessage>> logger
    )
    {
        _connectionFactory = connectionFactory;
        _routingConvention = routingConvention;
        _serviceProvider = serviceProvider;
        _logger = logger;

        _exchangeName = _routingConvention.GetExchangeName<TMessage>();
        _queueName = _routingConvention.GetQueueName<TMessage>();
        _routingKey = _routingConvention.GetRoutingKey<TMessage>();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var reconnectAttempt = 0;

        while (!stoppingToken.IsCancellationRequested)
        {
            IChannel? channel = null;
            var shutdownSignal = new TaskCompletionSource<bool>(
                TaskCreationOptions.RunContinuationsAsynchronously
            );

            try
            {
                channel = await _connectionFactory.CreateChannelAsync();
                _channel = channel;

                channel.ChannelShutdownAsync += (_, args) =>
                {
                    if (!stoppingToken.IsCancellationRequested)
                    {
                        _logger.LogWarning(
                            "检测到Channel断开,准备重连: Queue={Queue}, ReplyCode={ReplyCode}, Reason={Reason}",
                            _queueName,
                            args.ReplyCode,
                            args.ReplyText
                        );
                    }

                    shutdownSignal.TrySetResult(true);
                    return Task.CompletedTask;
                };

                // 声明Exchange
                await channel.ExchangeDeclareAsync(
                    exchange: _exchangeName,
                    type: _routingConvention.GetExchangeType<TMessage>().ToRabbitMQString(),
                    durable: true,
                    autoDelete: false,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                // 声明Queue
                await channel.QueueDeclareAsync(
                    queue: _queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                // 绑定Queue到Exchange
                await channel.QueueBindAsync(
                    queue: _queueName,
                    exchange: _exchangeName,
                    routingKey: _routingKey,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                // 设置预取数量(QoS)
                await channel.BasicQosAsync(
                    prefetchSize: 0,
                    prefetchCount: 1,
                    global: false,
                    cancellationToken: stoppingToken
                );

                // 创建消费者
                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.ReceivedAsync += async (_, eventArgs) =>
                {
                    await MessageHandlerAsync(eventArgs, stoppingToken);
                };

                // 开始消费
                await channel.BasicConsumeAsync(
                    queue: _queueName,
                    autoAck: false,
                    consumer: consumer,
                    cancellationToken: stoppingToken
                );

                reconnectAttempt = 0;
                _logger.LogInformation(
                    "消息消费者已启动: Queue={Queue}, Exchange={Exchange}, RoutingKey={RoutingKey}",
                    _queueName,
                    _exchangeName,
                    _routingKey
                );

                await WaitForShutdownOrStopAsync(shutdownSignal.Task, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "消费者运行异常,将尝试重连: {Queue}", _queueName);
            }
            finally
            {
                if (ReferenceEquals(_channel, channel))
                {
                    _channel = null;
                }

                if (channel != null)
                {
                    try
                    {
                        await channel.DisposeAsync();
                    }
                    catch
                    {
                        // 忽略清理异常,继续重连流程
                    }
                }
            }

            reconnectAttempt++;
            var delay = GetReconnectDelay(reconnectAttempt);
            _logger.LogWarning(
                "消费者将在{DelaySeconds}s后重连: Queue={Queue}, Attempt={Attempt}",
                delay.TotalSeconds,
                _queueName,
                reconnectAttempt
            );

            try
            {
                await Task.Delay(delay, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
                break;
            }
        }
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("正在停止消息消费者: {Queue}", _queueName);

        if (_channel != null)
        {
            try
            {
                // 使用独立的超时令牌,防止立即取消
                using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
                await _channel.CloseAsync(cancellationToken: cts.Token);
                _logger.LogInformation("Channel已关闭: {Queue}", _queueName);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "关闭Channel时发生异常: {Queue}", _queueName);
            }
        }

        await base.StopAsync(cancellationToken);
    }

    private async Task MessageHandlerAsync(
        BasicDeliverEventArgs eventArgs,
        CancellationToken cancellationToken
    )
    {
        var messageId = eventArgs.BasicProperties.MessageId ?? "Unknown";

        if (_channel == null)
        {
            _logger.LogError("无法处理消息,Channel未初始化: MessageId={MessageId}", messageId);
            return;
        }

        try
        {
            // 反序列化消息
            var body = eventArgs.Body.ToArray();
            var json = Encoding.UTF8.GetString(body);
            var message = JsonSerializer.Deserialize<TMessage>(json);

            if (message == null)
            {
                _logger.LogWarning("消息反序列化失败: MessageId={MessageId}", messageId);
                // 拒绝消息,不重新入队
                await _channel.BasicNackAsync(
                    eventArgs.DeliveryTag,
                    false,
                    false,
                    cancellationToken
                );
                return;
            }

            _logger.LogInformation("开始处理消息: MessageId={MessageId}", message.MessageId);

            // 使用Scope创建Handler实例
            using var scope = _serviceProvider.CreateScope();
            var handler = scope.ServiceProvider.GetRequiredService<IMessageHandler<TMessage>>();

            // 处理消息
            var success = await handler.HandleAsync(message, cancellationToken);

            if (!success)
            {
                var nextRetryCount = message.RetryCount + 1;
                _logger.LogWarning(
                    "消息处理失败: MessageId={MessageId}, RetryCount={RetryCount}",
                    message.MessageId,
                    nextRetryCount
                );

                // 失败重试通过“重新发布新消息体 + ACK原消息”实现,确保RetryCount可持久传递。
                if (nextRetryCount <= 3)
                {
                    message.RetryCount = nextRetryCount;
                    await RepublishForRetryAsync(message, eventArgs, cancellationToken);
                    await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
                    _logger.LogInformation(
                        "消息已重新发布用于重试: MessageId={MessageId}, RetryCount={RetryCount}",
                        message.MessageId,
                        message.RetryCount
                    );
                }
                else
                {
                    await _channel.BasicNackAsync(
                        eventArgs.DeliveryTag,
                        false,
                        false,
                        cancellationToken
                    );
                    _logger.LogError(
                        "消息已达到最大重试次数,将被丢弃: MessageId={MessageId}",
                        message.MessageId
                    );
                }

                return;
            }

            // 确认消息
            await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
            _logger.LogInformation("消息处理成功: MessageId={MessageId}", message.MessageId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理消息时发生异常: MessageId={MessageId}", messageId);

            if (_channel != null)
            {
                try
                {
                    // 重新入队
                    await _channel.BasicNackAsync(
                        eventArgs.DeliveryTag,
                        false,
                        true,
                        cancellationToken
                    );
                }
                catch (Exception nackEx)
                {
                    _logger.LogWarning(
                        nackEx,
                        "异常后Nack失败,消息可能由Broker自动重新投递: MessageId={MessageId}",
                        messageId
                    );
                }
            }
        }
    }

    public override void Dispose()
    {
        // 同步 Dispose 中直接清理,异步关闭在 StopAsync 中完成
        _channel?.Dispose();
        base.Dispose();
    }

    private async Task RepublishForRetryAsync(
        TMessage message,
        BasicDeliverEventArgs eventArgs,
        CancellationToken cancellationToken
    )
    {
        if (_channel == null)
        {
            throw new InvalidOperationException("Channel未初始化,无法重发消息");
        }

        var json = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(json);

        var properties = new BasicProperties
        {
            Persistent = true,
            MessageId = message.MessageId,
            Timestamp = new AmqpTimestamp(((DateTimeOffset)DateTime.UtcNow).ToUnixTimeSeconds()),
            ContentType = "application/json",
            DeliveryMode = DeliveryModes.Persistent,
        };

        await _channel.BasicPublishAsync(
            exchange: eventArgs.Exchange,
            routingKey: eventArgs.RoutingKey,
            mandatory: false,
            basicProperties: properties,
            body: body,
            cancellationToken: cancellationToken
        );
    }

    private static TimeSpan GetReconnectDelay(int attempt)
    {
        var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
        return TimeSpan.FromSeconds(seconds);
    }

    private static async Task WaitForShutdownOrStopAsync(
        Task shutdownTask,
        CancellationToken stoppingToken
    )
    {
        var cancellationTask = Task.Delay(Timeout.Infinite, stoppingToken);
        await Task.WhenAny(shutdownTask, cancellationTask);
        stoppingToken.ThrowIfCancellationRequested();
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于 RabbitMQ 的消息消费者后台服务实现,它是一个泛型类,用于消费指定类型的消息。让我详细解释这个代码的各个部分:

主要功能

这个类实现了一个可靠的 RabbitMQ 消息消费者,具有以下特性:

  • 自动重连机制
  • 消息重试处理
  • 优雅关闭
  • 完整的错误处理

核心组件

1. 依赖注入

private readonly IRabbitMQConnectionFactory _connectionFactory;  // 连接工厂
private readonly IMessageRoutingConvention _routingConvention;   // 路由约定
private readonly IServiceProvider _serviceProvider;             // 服务提供者
private readonly ILogger<RabbitMQConsumerBackgroundService<TMessage>> _logger; // 日志记录器

2. 核心执行方法 ExecuteAsync

连接和重连逻辑:

  • 使用无限循环确保服务持续运行
  • 实现指数退避重连策略(延迟时间逐步增加)
  • 监听 Channel 断开事件,自动触发重连

RabbitMQ 初始化:

// 声明 Exchange(交换机)
await channel.ExchangeDeclareAsync(/*...*/);

// 声明 Queue(队列)
await channel.QueueDeclareAsync(/*...*/);

// 绑定 Queue 到 Exchange
await channel.QueueBindAsync(/*...*/);

// 设置 QoS(服务质量)
await channel.BasicQosAsync(prefetchCount: 1);

3. 消息处理机制 MessageHandlerAsync

消息反序列化:

  • 将消息体从字节数组转换为 UTF-8 字符串
  • 使用 JsonSerializer 反序列化为强类型对象

处理流程:

  1. 创建服务范围(Scope)获取消息处理器
  2. 调用处理器处理消息
  3. 根据处理结果决定后续操作

重试机制:

if (!success) {
    var nextRetryCount = message.RetryCount + 1;
    
    if (nextRetryCount <= 3) {
        // 重新发布消息用于重试
        message.RetryCount = nextRetryCount;
        await RepublishForRetryAsync(message, eventArgs, cancellationToken);
        await _channel.BasicAckAsync(/*...*/); // 确认原消息
    } else {
        // 超过最大重试次数,丢弃消息
        await _channel.BasicNackAsync(/*...*/);
    }
}

4. 重试策略

重连延迟计算:

private static TimeSpan GetReconnectDelay(int attempt)
{
    var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
    return TimeSpan.FromSeconds(seconds);
}
  • 使用指数退避算法
  • 最大延迟限制为 30 秒
  • 防止过度频繁的重连尝试

5. 优雅关闭 StopAsync

public override async Task StopAsync(CancellationToken cancellationToken)
{
    // 优雅关闭 Channel
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
    await _channel.CloseAsync(cancellationToken: cts.Token);
    
    await base.StopAsync(cancellationToken);
}

设计亮点

  1. 可靠性:完整的错误处理和重连机制
  2. 可扩展性:泛型设计支持任意消息类型
  3. 资源管理:正确的异步资源清理
  4. 监控友好:详细的日志记录
  5. 性能优化:使用服务范围避免内存泄漏

使用场景

这个类适用于需要可靠消息处理的场景,如:

  • 订单处理系统
  • 事件驱动架构
  • 微服务间通信
  • 异步任务处理

整体上,这是一个生产级别的消息消费者实现,具备了企业级应用所需的可靠性和健壮性特性。

评论加载中...