using Dpz.Core.Entity.Base;
using Dpz.Core.MessageQueue.Abstractions;
using Dpz.Core.MessageQueue.Configuration;
using Dpz.Core.MessageQueue.Models;
using Dpz.Core.MessageQueue.RabbitMQ;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace Dpz.Core.MessageQueue.Extensions;
/// <summary>
/// 消息队列服务扩展方法
/// </summary>
public static class MessageQueueServiceExtensions
{
/// <summary>
/// 添加RabbitMQ服务
/// </summary>
/// <param name="services">服务集合</param>
/// <param name="configuration">配置</param>
/// <param name="configSection">配置节名称,默认为"RabbitMQ"</param>
/// <returns></returns>
public static IServiceCollection AddRabbitMQ(
this IServiceCollection services,
IConfiguration configuration,
string configSection = RabbitMQOptions.SectionName
)
{
// 注册配置
var section = configuration.GetSection(configSection);
services.Configure<RabbitMQOptions>(section);
// 注册连接工厂(单例)
services.AddSingleton<IRabbitMQConnectionFactory, RabbitMQConnectionFactory>();
// 注册路由约定(单例)
services.AddSingleton<IMessageRoutingConvention, DefaultMessageRoutingConvention>();
// 注册发布者(单例)
services.AddSingleton(typeof(IMessagePublisher<>), typeof(RabbitMQPublisher<>));
return services;
}
/// <summary>
/// 添加消息消费者
/// 每个消息类型配置一个消费者和处理器
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
/// <typeparam name="THandler">处理器类型</typeparam>
/// <param name="services">服务集合</param>
/// <returns></returns>
public static IServiceCollection AddMessageConsumer<TMessage, THandler>(
this IServiceCollection services
)
where TMessage : MessageBase
where THandler : class, IMessageHandler<TMessage>
{
// 注册处理器(Scoped,每次处理消息创建新实例)
services.AddScoped<IMessageHandler<TMessage>, THandler>();
// 注册消费者后台服务
services.AddHostedService<RabbitMQConsumerBackgroundService<TMessage>>();
return services;
}
/// <summary>
/// 添加消息消费者(支持返回结果)
/// 每个消息类型配置一个消费者和处理器
/// </summary>
/// <typeparam name="TMessage">消息类型</typeparam>
/// <typeparam name="THandler">处理器类型</typeparam>
/// <typeparam name="TResult">返回结果类型</typeparam>
/// <param name="services">服务集合</param>
/// <returns></returns>
public static IServiceCollection AddMessageConsumer<TMessage, THandler, TResult>(
this IServiceCollection services
)
where TMessage : MessageBase
where TResult : MessageHandlerResult
where THandler : class, IMessageHandler<TMessage, TResult>
{
// 注册处理器(Scoped,每次处理消息创建新实例)
services.AddScoped<IMessageHandler<TMessage, TResult>, THandler>();
// 注册消费者后台服务(支持返回结果)
services.AddHostedService<RabbitMQConsumerBackgroundServiceWithResult<TMessage, TResult>>();
return services;
}
/// <summary>
/// 添加自定义路由约定
/// </summary>
/// <typeparam name="TConvention">自定义路由约定类型</typeparam>
/// <param name="services">服务集合</param>
/// <returns></returns>
public static IServiceCollection AddCustomRoutingConvention<TConvention>(
this IServiceCollection services
)
where TConvention : class, IMessageRoutingConvention
{
services.AddSingleton<IMessageRoutingConvention, TConvention>();
return services;
}
/// <summary>
/// 添加批次追踪服务
/// (多实例部署时需要配置分布式缓存如Redis)
/// </summary>
/// <param name="services">服务集合</param>
/// <returns></returns>
public static IServiceCollection AddBatchTracking(this IServiceCollection services)
{
services.AddSingleton<IBatchTracker, DistributedBatchTracker>();
return services;
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于消息队列服务的扩展方法类,主要用于在 .NET 依赖注入容器中注册 RabbitMQ 相关的服务。让我详细解释一下每个方法的功能:
类概述
MessageQueueServiceExtensions 是一个静态扩展类,提供了一系列扩展方法来简化消息队列服务的注册和配置。
主要方法解析
1. AddRabbitMQ 方法
public static IServiceCollection AddRabbitMQ(...)
功能:注册 RabbitMQ 基础服务
- 注册
RabbitMQOptions配置(从配置文件读取) - 注册连接工厂(单例模式)
- 注册消息路由约定(单例模式)
- 注册消息发布者(单例模式,支持泛型)
2. AddMessageConsumer<TMessage, THandler> 方法
public static IServiceCollection AddMessageConsumer<TMessage, THandler>(...)
功能:注册消息消费者(无返回值版本)
TMessage必须继承自MessageBaseTHandler必须实现IMessageHandler<TMessage>- 处理器注册为 Scoped 生命周期(每次处理消息创建新实例)
- 注册后台服务来持续监听和处理消息
3. AddMessageConsumer<TMessage, THandler, TResult> 方法
public static IServiceCollection AddMessageConsumer<TMessage, THandler, TResult>(...)
功能:注册消息消费者(带返回值版本)
- 除了基本约束外,还要求
TResult继承自MessageHandlerResult THandler必须实现IMessageHandler<TMessage, TResult>- 支持处理后返回结果的消息处理场景
4. AddCustomRoutingConvention<TConvention> 方法
public static IServiceCollection AddCustomRoutingConvention<TConvention>(...)
功能:注册自定义路由约定
- 允许用户自定义消息路由规则
- 替换默认的路由约定实现
5. AddBatchTracking 方法
public static IServiceCollection AddBatchTracking(...)
功能:注册批次追踪服务
- 用于追踪消息批次处理状态
- 在多实例部署环境下需要配合分布式缓存(如 Redis)使用
设计特点
- 链式调用:所有方法都返回
IServiceCollection,支持链式调用 - 泛型约束:使用泛型约束确保类型安全
- 生命周期管理:合理设置服务的生命周期(Singleton、Scoped)
- 扩展性:支持自定义路由约定和处理器
- 配置驱动:通过配置文件管理 RabbitMQ 连接参数
使用示例
services
.AddRabbitMQ(configuration)
.AddMessageConsumer<OrderCreatedMessage, OrderCreatedHandler>()
.AddBatchTracking();
这个扩展类很好地封装了消息队列服务的注册复杂性,提供了简洁的 API 供开发者使用。
评论加载中...