using System.Text;
using System.Text.Json;
using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Enums;
using Dpz.Core.MessageQueue.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Dpz.Core.MessageQueue.RabbitMQ;

/// <summary>
/// RabbitMQ消息消费者后台服务(支持返回结果)
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
/// <typeparam name="TResult">返回结果类型</typeparam>
public class RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult> : BackgroundService
    where TResult : MessageHandlerResult
    where TMessage : MessageBase
{
    private readonly IRabbitMQConnectionFactory _connectionFactory;
    private readonly IMessageRoutingConvention _routingConvention;
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<
        RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult>
    > _logger;
    private readonly string _exchangeName;
    private readonly string _queueName;
    private readonly string _routingKey;
    private IChannel? _channel;
    private const int MaxReconnectDelaySeconds = 30;

    public RabbitMQConsumerBackgroundServiceWithResult(
        IRabbitMQConnectionFactory connectionFactory,
        IMessageRoutingConvention routingConvention,
        IServiceProvider serviceProvider,
        ILogger<RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult>> logger
    )
    {
        _connectionFactory = connectionFactory;
        _routingConvention = routingConvention;
        _serviceProvider = serviceProvider;
        _logger = logger;

        _exchangeName = _routingConvention.GetExchangeName<TMessage>();
        _queueName = _routingConvention.GetQueueName<TMessage>();
        _routingKey = _routingConvention.GetRoutingKey<TMessage>();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var reconnectAttempt = 0;

        while (!stoppingToken.IsCancellationRequested)
        {
            IChannel? channel = null;
            var shutdownSignal = new TaskCompletionSource<bool>(
                TaskCreationOptions.RunContinuationsAsynchronously
            );

            try
            {
                channel = await _connectionFactory.CreateChannelAsync();
                _channel = channel;

                channel.ChannelShutdownAsync += (_, args) =>
                {
                    if (!stoppingToken.IsCancellationRequested)
                    {
                        _logger.LogWarning(
                            "检测到Channel断开,准备重连: Queue={Queue}, ReplyCode={ReplyCode}, Reason={Reason}",
                            _queueName,
                            args.ReplyCode,
                            args.ReplyText
                        );
                    }

                    shutdownSignal.TrySetResult(true);
                    return Task.CompletedTask;
                };

                // 声明Exchange
                await channel.ExchangeDeclareAsync(
                    exchange: _exchangeName,
                    type: _routingConvention.GetExchangeType<TMessage>().ToRabbitMQString(),
                    durable: true,
                    autoDelete: false,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                // 声明Queue
                await channel.QueueDeclareAsync(
                    queue: _queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                // 绑定Queue到Exchange
                await channel.QueueBindAsync(
                    queue: _queueName,
                    exchange: _exchangeName,
                    routingKey: _routingKey,
                    arguments: null,
                    cancellationToken: stoppingToken
                );

                // 设置预取数量(QoS)
                await channel.BasicQosAsync(
                    prefetchSize: 0,
                    prefetchCount: 1,
                    global: false,
                    cancellationToken: stoppingToken
                );

                // 创建消费者
                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.ReceivedAsync += async (_, eventArgs) =>
                {
                    await MessageHandlerAsync(eventArgs, stoppingToken);
                };

                // 开始消费
                await channel.BasicConsumeAsync(
                    queue: _queueName,
                    autoAck: false,
                    consumer: consumer,
                    cancellationToken: stoppingToken
                );

                reconnectAttempt = 0;
                _logger.LogInformation(
                    "消息消费者已启动(带返回结果): Queue={Queue}, Exchange={Exchange}, RoutingKey={RoutingKey}, ResultType={ResultType}",
                    _queueName,
                    _exchangeName,
                    _routingKey,
                    typeof(TResult).Name
                );

                await WaitForShutdownOrStopAsync(shutdownSignal.Task, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "消费者运行异常,将尝试重连: {Queue}", _queueName);
            }
            finally
            {
                if (ReferenceEquals(_channel, channel))
                {
                    _channel = null;
                }

                if (channel != null)
                {
                    try
                    {
                        await channel.DisposeAsync();
                    }
                    catch
                    {
                        // 忽略清理异常,继续重连流程
                    }
                }
            }

            reconnectAttempt++;
            var delay = GetReconnectDelay(reconnectAttempt);
            _logger.LogWarning(
                "消费者将在{DelaySeconds}s后重连: Queue={Queue}, Attempt={Attempt}",
                delay.TotalSeconds,
                _queueName,
                reconnectAttempt
            );

            try
            {
                await Task.Delay(delay, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                _logger.LogInformation("消息消费者已停止: {Queue}", _queueName);
                break;
            }
        }
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("正在优雅停止消息消费者: {Queue}", _queueName);

        if (_channel != null)
        {
            try
            {
                using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
                await _channel.CloseAsync(cancellationToken: cts.Token);
                _logger.LogInformation("Channel已优雅关闭: {Queue}", _queueName);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "关闭Channel时发生异常: {Queue}", _queueName);
            }
        }

        await base.StopAsync(cancellationToken);
    }

    private async Task MessageHandlerAsync(
        BasicDeliverEventArgs eventArgs,
        CancellationToken cancellationToken
    )
    {
        var messageId = eventArgs.BasicProperties.MessageId ?? "Unknown";

        if (_channel == null)
        {
            _logger.LogError("无法处理消息,Channel未初始化: MessageId={MessageId}", messageId);
            return;
        }

        try
        {
            // 反序列化消息
            var body = eventArgs.Body.ToArray();
            var json = Encoding.UTF8.GetString(body);
            var message = JsonSerializer.Deserialize<TMessage>(json);

            if (message == null)
            {
                _logger.LogWarning("消息反序列化失败: MessageId={MessageId}", messageId);
                await _channel.BasicNackAsync(
                    eventArgs.DeliveryTag,
                    false,
                    false,
                    cancellationToken
                );
                return;
            }

            _logger.LogInformation("开始处理消息: MessageId={MessageId}", message.MessageId);

            // 使用Scope创建Handler实例
            using var scope = _serviceProvider.CreateScope();
            var handler = scope.ServiceProvider.GetRequiredService<
                IMessageHandler<TMessage, TResult>
            >();

            // 处理消息并获取结果
            var result = await handler.HandleAsync(message, cancellationToken);

            TryExtractWrappedResult(result, out var wrappedResult);
            await HandleWrappedResultAsync(message, wrappedResult, eventArgs, cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理消息时发生异常: MessageId={MessageId}", messageId);

            if (_channel != null)
            {
                try
                {
                    await _channel.BasicNackAsync(
                        eventArgs.DeliveryTag,
                        false,
                        true,
                        cancellationToken
                    );
                }
                catch (Exception nackEx)
                {
                    _logger.LogWarning(
                        nackEx,
                        "异常后Nack失败,消息可能由Broker自动重新投递: MessageId={MessageId}",
                        messageId
                    );
                }
            }
        }
    }

    private async Task HandleWrappedResultAsync(
        TMessage message,
        WrappedResult result,
        BasicDeliverEventArgs eventArgs,
        CancellationToken cancellationToken
    )
    {
        if (_channel == null)
        {
            _logger.LogError(
                "无法处理消息结果,Channel未初始化: MessageId={MessageId}",
                message.MessageId
            );
            return;
        }

        if (result.Success)
        {
            // 成功:记录结果并确认消息
            _logger.LogInformation(
                "消息处理成功: MessageId={MessageId}, ResultType={ResultType}, Result={Result}",
                message.MessageId,
                typeof(TResult).Name,
                JsonSerializer.Serialize(result.Data)
            );

            if (result.Metadata != null && result.Metadata.Count > 0)
            {
                _logger.LogDebug(
                    "消息处理元数据: MessageId={MessageId}, Metadata={@Metadata}",
                    message.MessageId,
                    result.Metadata
                );
            }

            await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
        }
        else
        {
            var nextRetryCount = message.RetryCount + 1;
            // 失败:通过重发消息体持久化重试次数,避免requeue丢失内存中的RetryCount
            _logger.LogWarning(
                "消息处理失败: MessageId={MessageId}, RetryCount={RetryCount}, Error={Error}",
                message.MessageId,
                nextRetryCount,
                result.ErrorMessage
            );

            if (nextRetryCount <= 3)
            {
                message.RetryCount = nextRetryCount;
                await RepublishForRetryAsync(message, eventArgs, cancellationToken);
                await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
                _logger.LogInformation(
                    "消息已重新发布用于重试: MessageId={MessageId}, RetryCount={RetryCount}",
                    message.MessageId,
                    message.RetryCount
                );
            }
            else
            {
                await _channel.BasicNackAsync(
                    eventArgs.DeliveryTag,
                    false,
                    false,
                    cancellationToken
                );
                _logger.LogError(
                    "消息已达到最大重试次数,将被丢弃: MessageId={MessageId}, FinalError={Error}",
                    message.MessageId,
                    result.ErrorMessage
                );
            }
        }
    }

    public override void Dispose()
    {
        _channel?.Dispose();
        base.Dispose();
    }

    private static void TryExtractWrappedResult(TResult result, out WrappedResult wrappedResult)
    {
        var data = PropertyAccessor
            .GetGetter<TResult>(nameof(MessageHandlerResult<>.Data))
            ?.Invoke(result);

        wrappedResult = new WrappedResult
        {
            Success = result.Success,
            Data = data,
            ErrorMessage = result.ErrorMessage,
            Metadata = result.Metadata,
        };
    }

    private async Task RepublishForRetryAsync(
        TMessage message,
        BasicDeliverEventArgs eventArgs,
        CancellationToken cancellationToken
    )
    {
        if (_channel == null)
        {
            throw new InvalidOperationException("Channel未初始化,无法重发消息");
        }

        var json = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(json);

        var properties = new BasicProperties
        {
            Persistent = true,
            MessageId = message.MessageId,
            Timestamp = new AmqpTimestamp(((DateTimeOffset)DateTime.UtcNow).ToUnixTimeSeconds()),
            ContentType = "application/json",
            DeliveryMode = DeliveryModes.Persistent,
        };

        await _channel.BasicPublishAsync(
            exchange: eventArgs.Exchange,
            routingKey: eventArgs.RoutingKey,
            mandatory: false,
            basicProperties: properties,
            body: body,
            cancellationToken: cancellationToken
        );
    }

    private struct WrappedResult
    {
        public bool Success { get; set; }

        public object? Data { get; set; }

        public string? ErrorMessage { get; set; }

        public Dictionary<string, object>? Metadata { get; set; }
    }

    private static TimeSpan GetReconnectDelay(int attempt)
    {
        var seconds = Math.Min(MaxReconnectDelaySeconds, Math.Pow(2, Math.Min(attempt, 5)));
        return TimeSpan.FromSeconds(seconds);
    }

    private static async Task WaitForShutdownOrStopAsync(
        Task shutdownTask,
        CancellationToken stoppingToken
    )
    {
        var cancellationTask = Task.Delay(Timeout.Infinite, stoppingToken);
        await Task.WhenAny(shutdownTask, cancellationTask);
        stoppingToken.ThrowIfCancellationRequested();
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于RabbitMQ的消息消费者后台服务,支持返回处理结果。让我详细解释一下这个代码的功能和实现:

主要功能

这个类实现了一个RabbitMQ消息消费者,具有以下特点:

  • 泛型设计:支持自定义消息类型(TMessage)和返回结果类型(TResult)
  • 后台服务:继承自BackgroundService,作为长期运行的后台任务
  • 自动重连:当连接断开时自动重连,支持指数退避策略
  • 消息重试:支持消息处理失败后的重试机制(最多3次)
  • 优雅停机:支持优雅地停止服务

核心组件

1. 依赖注入

private readonly IRabbitMQConnectionFactory _connectionFactory;  // RabbitMQ连接工厂
private readonly IMessageRoutingConvention _routingConvention;   // 路由约定(决定交换器、队列名等)
private readonly IServiceProvider _serviceProvider;             // 服务提供者(用于创建消息处理器)
private readonly ILogger _logger;                               // 日志记录器

2. 主要执行流程(ExecuteAsync方法)

连接管理

  • 创建RabbitMQ通道
  • 监听通道断开事件,触发重连
  • 声明交换器(Exchange)、队列(Queue)并建立绑定关系

消息消费设置

  • 设置QoS(每次只预取1条消息)
  • 创建异步消费者
  • 开始消费消息

错误处理和重连

  • 捕获异常并记录日志
  • 实现指数退避重连策略(最长30秒)

3. 消息处理流程(MessageHandlerAsync方法)

消息反序列化

var body = eventArgs.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
var message = JsonSerializer.Deserialize<TMessage>(json);

处理器调用

using var scope = _serviceProvider.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<IMessageHandler<TMessage, TResult>>();
var result = await handler.HandleAsync(message, cancellationToken);

结果处理

  • 成功:记录日志并ACK消息
  • 失败:检查重试次数
    • 未超过3次:重新发布消息进行重试
    • 超过3次:NACK消息并丢弃

4. 重试机制

当消息处理失败时:

  1. 增加重试计数器
  2. 重新发布消息到同一队列
  3. ACK原消息(避免重复处理)
  4. 新消息包含更新的重试计数
if (nextRetryCount <= 3)
{
    message.RetryCount = nextRetryCount;
    await RepublishForRetryAsync(message, eventArgs, cancellationToken);
    await _channel.BasicAckAsync(eventArgs.DeliveryTag, false, cancellationToken);
}

5. 优雅停机(StopAsync方法)

  • 尝试优雅关闭RabbitMQ通道
  • 设置5秒超时防止无限等待
  • 调用基类的停止方法

设计特点

  1. 可靠性

    • 自动重连机制
    • 消息持久化
    • 错误处理和重试
  2. 可扩展性

    • 泛型设计支持不同消息类型
    • 依赖注入支持自定义组件
  3. 可观测性

    • 详细的日志记录
    • 包含消息ID、重试次数等关键信息
  4. 性能优化

    • 使用Scope避免内存泄漏
    • QoS设置控制并发数量

这个实现为消息队列处理提供了一个健壮、可靠且功能完整的解决方案。

评论加载中...