using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Configuration;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Dpz.Core.MessageQueue.Extensions;

/// <summary>
/// 消息队列服务扩展方法
/// </summary>
public static class MessageQueueServiceExtensions
{
    /// <summary>
    /// 添加RabbitMQ服务
    /// </summary>
    /// <param name="services">服务集合</param>
    /// <param name="configuration">配置</param>
    /// <param name="configSection">配置节名称,默认为"RabbitMQ"</param>
    /// <returns></returns>
    public static IServiceCollection AddRabbitMQ(
        this IServiceCollection services,
        IConfiguration configuration,
        string configSection = RabbitMQOptions.SectionName
    )
    {
        // 注册配置
        var section = configuration.GetSection(configSection);
        services.Configure<RabbitMQOptions>(section);

        // 注册连接工厂(单例)
        services.AddSingleton<IRabbitMQConnectionFactory, RabbitMQConnectionFactory>();

        // 注册路由约定(单例)
        services.AddSingleton<IMessageRoutingConvention, DefaultMessageRoutingConvention>();

        // 注册发布者(单例)
        services.AddSingleton(typeof(IMessagePublisher<>), typeof(RabbitMQPublisher<>));

        return services;
    }

    /// <summary>
    /// 添加消息消费者
    /// 每个消息类型配置一个消费者和处理器
    /// </summary>
    /// <typeparam name="TMessage">消息类型</typeparam>
    /// <typeparam name="THandler">处理器类型</typeparam>
    /// <param name="services">服务集合</param>
    /// <returns></returns>
    public static IServiceCollection AddMessageConsumer<TMessage, THandler>(
        this IServiceCollection services
    )
        where TMessage : MessageBase
        where THandler : class, IMessageHandler<TMessage>
    {
        // 注册处理器(Scoped,每次处理消息创建新实例)
        services.AddScoped<IMessageHandler<TMessage>, THandler>();

        // 注册消费者后台服务
        services.AddHostedService<RabbitMQConsumerBackgroundService<TMessage>>();

        return services;
    }

    /// <summary>
    /// 添加消息消费者(支持返回结果)
    /// 每个消息类型配置一个消费者和处理器
    /// </summary>
    /// <typeparam name="TMessage">消息类型</typeparam>
    /// <typeparam name="THandler">处理器类型</typeparam>
    /// <typeparam name="TResult">返回结果类型</typeparam>
    /// <param name="services">服务集合</param>
    /// <returns></returns>
    public static IServiceCollection AddMessageConsumer<TMessage, THandler, TResult>(
        this IServiceCollection services
    )
        where TMessage : MessageBase
        where TResult : MessageHandlerResult
        where THandler : class, IMessageHandler<TMessage, TResult>
    {
        // 注册处理器(Scoped,每次处理消息创建新实例)
        services.AddScoped<IMessageHandler<TMessage, TResult>, THandler>();

        // 注册消费者后台服务(支持返回结果)
        services.AddHostedService<RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult>>();

        return services;
    }

    /// <summary>
    /// 添加自定义路由约定
    /// </summary>
    /// <typeparam name="TConvention">自定义路由约定类型</typeparam>
    /// <param name="services">服务集合</param>
    /// <returns></returns>
    public static IServiceCollection AddCustomRoutingConvention<TConvention>(
        this IServiceCollection services
    )
        where TConvention : class, IMessageRoutingConvention
    {
        services.AddSingleton<IMessageRoutingConvention, TConvention>();
        return services;
    }

    /// <summary>
    /// 添加批次追踪服务
    /// (多实例部署时需要配置分布式缓存如Redis)
    /// </summary>
    /// <param name="services">服务集合</param>
    /// <returns></returns>
    public static IServiceCollection AddBatchTracking(this IServiceCollection services)
    {
        services.AddSingleton<IBatchTracker, DistributedBatchTracker>();
        return services;
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个用于消息队列服务的扩展方法类,主要用于在 .NET 依赖注入容器中注册 RabbitMQ 相关的服务。让我详细解释一下每个方法的功能:

类概述

MessageQueueServiceExtensions 是一个静态扩展类,提供了一系列扩展方法来简化消息队列服务的注册和配置。

主要方法解析

1. AddRabbitMQ 方法

public static IServiceCollection AddRabbitMQ(...)

功能:注册 RabbitMQ 基础服务

  • 注册 RabbitMQOptions 配置(从配置文件读取)
  • 注册连接工厂(单例模式)
  • 注册消息路由约定(单例模式)
  • 注册消息发布者(单例模式,支持泛型)

2. AddMessageConsumer<TMessage, THandler> 方法

public static IServiceCollection AddMessageConsumer<TMessage, THandler>(...)

功能:注册消息消费者(无返回值版本)

  • TMessage 必须继承自 MessageBase
  • THandler 必须实现 IMessageHandler<TMessage>
  • 处理器注册为 Scoped 生命周期(每次处理消息创建新实例)
  • 注册后台服务来持续监听和处理消息

3. AddMessageConsumer<TMessage, THandler, TResult> 方法

public static IServiceCollection AddMessageConsumer<TMessage, THandler, TResult>(...)

功能:注册消息消费者(带返回值版本)

  • 除了基本约束外,还要求 TResult 继承自 MessageHandlerResult
  • THandler 必须实现 IMessageHandler<TMessage, TResult>
  • 支持处理后返回结果的消息处理场景

4. AddCustomRoutingConvention<TConvention> 方法

public static IServiceCollection AddCustomRoutingConvention<TConvention>(...)

功能:注册自定义路由约定

  • 允许用户自定义消息路由规则
  • 替换默认的路由约定实现

5. AddBatchTracking 方法

public static IServiceCollection AddBatchTracking(...)

功能:注册批次追踪服务

  • 用于追踪消息批次处理状态
  • 在多实例部署环境下需要配合分布式缓存(如 Redis)使用

设计特点

  1. 链式调用:所有方法都返回 IServiceCollection,支持链式调用
  2. 泛型约束:使用泛型约束确保类型安全
  3. 生命周期管理:合理设置服务的生命周期(Singleton、Scoped)
  4. 扩展性:支持自定义路由约定和处理器
  5. 配置驱动:通过配置文件管理 RabbitMQ 连接参数

使用示例

services
    .AddRabbitMQ(configuration)
    .AddMessageConsumer<OrderCreatedMessage, OrderCreatedHandler>()
    .AddBatchTracking();

这个扩展类很好地封装了消息队列服务的注册复杂性,提供了简洁的 API 供开发者使用。

评论加载中...