using Dpz.Core.Entity.Base.Indexes;
using Medallion.Threading;
using Microsoft.Extensions.Hosting;
namespace Dpz.Core.Service;
/// <summary>
/// 应用启动时自动扫描所有 IIndexedEntity 实现并创建 MongoDB 索引
/// </summary>
internal sealed class MongoIndexInitializerService(
IConfiguration configuration,
IMongoIndexInitializer mongoIndexInitializer,
IFusionCache fusionCache,
IDistributedLockProvider distributedLockProvider,
ILogger<MongoIndexInitializerService> logger
) : IHostedService
{
private readonly string _instanceId = Guid.NewGuid().ToString();
private const string LockKey = "Dpz.Core.MongoIndexInitializer.Startup";
private const string LastSuccessCacheKey = "Dpz.Core.MongoIndexInitializer.LastSuccessUtc";
private const int DefaultSkipHours = 6;
private static readonly TimeSpan LockWaitTimeout = TimeSpan.FromSeconds(3);
private static readonly TimeSpan LastSuccessCacheTtl = TimeSpan.FromDays(7);
public async Task StartAsync(CancellationToken cancellationToken)
{
await using var lockHandle = await TryAcquireStartupLockAsync(cancellationToken);
if (lockHandle == null)
{
logger.LogInformation(
"实例 {InstanceId} 未获取到索引初始化锁,跳过本次初始化",
_instanceId
);
return;
}
var connectionString = configuration.GetConnectionString("mongodb");
if (string.IsNullOrEmpty(connectionString))
{
logger.LogWarning("未配置 mongodb 连接字符串,跳过索引初始化");
return;
}
Assembly entityAssembly;
try
{
entityAssembly = Assembly.Load("Dpz.Core.Public.Entity");
}
catch (Exception e)
{
logger.LogError(e, "未找到 Dpz.Core.Public.Entity 程序集,忽略初始化索引");
return;
}
logger.LogInformation("实例 {InstanceId} 获取到索引初始化锁,开始初始化", _instanceId);
var lastSuccess = await fusionCache.TryGetAsync<DateTimeOffset>(
LastSuccessCacheKey,
token: cancellationToken
);
if (lastSuccess.HasValue)
{
var elapsed = DateTimeOffset.UtcNow - lastSuccess.Value;
var threshold = TimeSpan.FromHours(DefaultSkipHours);
if (elapsed < threshold)
{
logger.LogInformation(
"实例 {InstanceId} 跳过索引初始化:距离上次成功仅 {ElapsedMinutes:F1} 分钟,阈值 {SkipHours} 小时",
_instanceId,
elapsed.TotalMinutes,
DefaultSkipHours
);
return;
}
}
var stopwatch = Stopwatch.StartNew();
var indexedTypes = entityAssembly
.GetExportedTypes()
.Where(t =>
t is { IsClass: true, IsAbstract: false }
&& t.GetInterfaces()
.Any(i =>
i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IIndexedEntity<>)
)
)
.ToList();
if (indexedTypes.Count == 0)
{
logger.LogInformation("未找到实现 IIndexedEntity 的实体类型");
return;
}
var failedCount = 0;
foreach (var entityType in indexedTypes)
{
try
{
await mongoIndexInitializer.EnsureForTypeAsync(
entityType,
connectionString,
cancellationToken
);
}
catch (Exception ex)
{
failedCount++;
logger.LogError(ex, "初始化 {EntityType} 的索引时出错", entityType.Name);
}
}
stopwatch.Stop();
if (failedCount == 0)
{
await fusionCache.SetAsync(
LastSuccessCacheKey,
DateTimeOffset.UtcNow,
LastSuccessCacheTtl,
token: cancellationToken
);
}
logger.LogInformation(
"MongoDB 索引初始化完成,共处理 {Count} 个实体类型,失败 {FailedCount},耗时 {ElapsedMs}ms",
indexedTypes.Count,
failedCount,
stopwatch.ElapsedMilliseconds
);
}
private async Task<IDistributedSynchronizationHandle?> TryAcquireStartupLockAsync(
CancellationToken cancellationToken
)
{
using var lockCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken
);
lockCancellationTokenSource.CancelAfter(LockWaitTimeout);
try
{
return await distributedLockProvider.AcquireLockAsync(
LockKey,
timeout: null,
cancellationToken: lockCancellationTokenSource.Token
);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return null;
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return mongoIndexInitializer.ResetAsync(cancellationToken);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个 MongoDB 索引初始化服务,用于在应用程序启动时自动扫描并创建 MongoDB 索引。让我详细解释这个代码的功能和实现:
核心功能
这个服务实现了 IHostedService 接口,作为后台服务在应用启动时运行,主要负责:
- 自动发现实现了
IIndexedEntity<>接口的实体类 - 为这些实体类创建相应的 MongoDB 索引
关键特性
1. 分布式锁机制
private const string LockKey = "Dpz.Core.MongoIndexInitializer.Startup";
- 使用分布式锁确保在多实例部署环境中,只有一个实例执行索引初始化
- 避免并发创建索引导致的冲突和资源浪费
2. 智能跳过机制
private const int DefaultSkipHours = 6;
private const string LastSuccessCacheKey = "Dpz.Core.MongoIndexInitializer.LastSuccessUtc";
- 通过缓存记录上次成功初始化的时间
- 如果距离上次成功初始化不足 6 小时,则跳过本次初始化
- 避免频繁重启应用时重复创建索引
3. 自动类型发现
var indexedTypes = entityAssembly
.GetExportedTypes()
.Where(t =>
t is { IsClass: true, IsAbstract: false }
&& t.GetInterfaces()
.Any(i =>
i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IIndexedEntity<>)
)
)
.ToList();
- 动态扫描
Dpz.Core.Public.Entity程序集 - 查找所有实现了
IIndexedEntity<>接口的具体类 - 无需手动配置,自动发现需要创建索引的实体
执行流程
StartAsync 方法流程:
获取分布式锁
- 尝试获取启动锁,超时时间 3 秒
- 如果获取失败,跳过初始化
检查配置
- 验证 MongoDB 连接字符串是否配置
- 加载实体程序集
检查执行频率
- 从缓存中获取上次成功时间
- 如果距离上次成功不足 6 小时,跳过执行
扫描和初始化
- 扫描所有实现
IIndexedEntity<>的类型 - 逐个调用
mongoIndexInitializer.EnsureForTypeAsync创建索引 - 记录失败次数和执行时间
- 扫描所有实现
更新缓存
- 如果全部成功,更新最后成功时间到缓存
- 缓存 TTL 为 7 天
StopAsync 方法:
- 应用停止时调用
mongoIndexInitializer.ResetAsync进行清理
错误处理和日志
- 完善的日志记录,包括实例 ID、执行状态、耗时等信息
- 对单个实体索引创建失败进行容错处理,不影响其他实体
- 统计失败数量并记录到日志
设计优势
- 高可用性:分布式锁确保多实例安全
- 性能优化:智能跳过机制避免重复执行
- 自动化:无需手动配置,自动发现实体
- 容错性:单个失败不影响整体流程
- 可观测性:详细的日志记录便于监控和调试
这是一个设计良好的基础设施服务,特别适合微服务架构中的 MongoDB 索引管理需求。
评论加载中...