using Dpz.Core.Entity.Base.Indexes;
using Medallion.Threading;
using Microsoft.Extensions.Hosting;

namespace Dpz.Core.Service;

/// <summary>
/// 应用启动时自动扫描所有 IIndexedEntity 实现并创建 MongoDB 索引
/// </summary>
internal sealed class MongoIndexInitializerService(
    IConfiguration configuration,
    IMongoIndexInitializer mongoIndexInitializer,
    IFusionCache fusionCache,
    IDistributedLockProvider distributedLockProvider,
    ILogger<MongoIndexInitializerService> logger
) : IHostedService
{
    private readonly string _instanceId = Guid.NewGuid().ToString();
    private const string LockKey = "Dpz.Core.MongoIndexInitializer.Startup";
    private const string LastSuccessCacheKey = "Dpz.Core.MongoIndexInitializer.LastSuccessUtc";
    private const int DefaultSkipHours = 6;
    private static readonly TimeSpan LockWaitTimeout = TimeSpan.FromSeconds(3);
    private static readonly TimeSpan LastSuccessCacheTtl = TimeSpan.FromDays(7);

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await using var lockHandle = await TryAcquireStartupLockAsync(cancellationToken);
        if (lockHandle == null)
        {
            logger.LogInformation(
                "实例 {InstanceId} 未获取到索引初始化锁,跳过本次初始化",
                _instanceId
            );
            return;
        }

        var connectionString = configuration.GetConnectionString("mongodb");
        if (string.IsNullOrEmpty(connectionString))
        {
            logger.LogWarning("未配置 mongodb 连接字符串,跳过索引初始化");
            return;
        }

        Assembly entityAssembly;

        try
        {
            entityAssembly = Assembly.Load("Dpz.Core.Public.Entity");
        }
        catch (Exception e)
        {
            logger.LogError(e, "未找到 Dpz.Core.Public.Entity 程序集,忽略初始化索引");
            return;
        }

        logger.LogInformation("实例 {InstanceId} 获取到索引初始化锁,开始初始化", _instanceId);

        var lastSuccess = await fusionCache.TryGetAsync<DateTimeOffset>(
            LastSuccessCacheKey,
            token: cancellationToken
        );
        if (lastSuccess.HasValue)
        {
            var elapsed = DateTimeOffset.UtcNow - lastSuccess.Value;
            var threshold = TimeSpan.FromHours(DefaultSkipHours);
            if (elapsed < threshold)
            {
                logger.LogInformation(
                    "实例 {InstanceId} 跳过索引初始化:距离上次成功仅 {ElapsedMinutes:F1} 分钟,阈值 {SkipHours} 小时",
                    _instanceId,
                    elapsed.TotalMinutes,
                    DefaultSkipHours
                );
                return;
            }
        }

        var stopwatch = Stopwatch.StartNew();

        var indexedTypes = entityAssembly
            .GetExportedTypes()
            .Where(t =>
                t is { IsClass: true, IsAbstract: false }
                && t.GetInterfaces()
                    .Any(i =>
                        i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IIndexedEntity<>)
                    )
            )
            .ToList();

        if (indexedTypes.Count == 0)
        {
            logger.LogInformation("未找到实现 IIndexedEntity 的实体类型");
            return;
        }

        var failedCount = 0;
        foreach (var entityType in indexedTypes)
        {
            try
            {
                await mongoIndexInitializer.EnsureForTypeAsync(
                    entityType,
                    connectionString,
                    cancellationToken
                );
            }
            catch (Exception ex)
            {
                failedCount++;
                logger.LogError(ex, "初始化 {EntityType} 的索引时出错", entityType.Name);
            }
        }

        stopwatch.Stop();

        if (failedCount == 0)
        {
            await fusionCache.SetAsync(
                LastSuccessCacheKey,
                DateTimeOffset.UtcNow,
                LastSuccessCacheTtl,
                token: cancellationToken
            );
        }

        logger.LogInformation(
            "MongoDB 索引初始化完成,共处理 {Count} 个实体类型,失败 {FailedCount},耗时 {ElapsedMs}ms",
            indexedTypes.Count,
            failedCount,
            stopwatch.ElapsedMilliseconds
        );
    }

    private async Task<IDistributedSynchronizationHandle?> TryAcquireStartupLockAsync(
        CancellationToken cancellationToken
    )
    {
        using var lockCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken
        );
        lockCancellationTokenSource.CancelAfter(LockWaitTimeout);

        try
        {
            return await distributedLockProvider.AcquireLockAsync(
                LockKey,
                timeout: null,
                cancellationToken: lockCancellationTokenSource.Token
            );
        }
        catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
        {
            return null;
        }
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        return mongoIndexInitializer.ResetAsync(cancellationToken);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个 MongoDB 索引初始化服务,用于在应用程序启动时自动扫描并创建 MongoDB 索引。让我详细解释这个代码的功能和实现:

核心功能

这个服务实现了 IHostedService 接口,作为后台服务在应用启动时运行,主要负责:

  1. 自动发现实现了 IIndexedEntity<> 接口的实体类
  2. 为这些实体类创建相应的 MongoDB 索引

关键特性

1. 分布式锁机制

private const string LockKey = "Dpz.Core.MongoIndexInitializer.Startup";
  • 使用分布式锁确保在多实例部署环境中,只有一个实例执行索引初始化
  • 避免并发创建索引导致的冲突和资源浪费

2. 智能跳过机制

private const int DefaultSkipHours = 6;
private const string LastSuccessCacheKey = "Dpz.Core.MongoIndexInitializer.LastSuccessUtc";
  • 通过缓存记录上次成功初始化的时间
  • 如果距离上次成功初始化不足 6 小时,则跳过本次初始化
  • 避免频繁重启应用时重复创建索引

3. 自动类型发现

var indexedTypes = entityAssembly
    .GetExportedTypes()
    .Where(t =>
        t is { IsClass: true, IsAbstract: false }
        && t.GetInterfaces()
            .Any(i =>
                i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IIndexedEntity<>)
            )
    )
    .ToList();
  • 动态扫描 Dpz.Core.Public.Entity 程序集
  • 查找所有实现了 IIndexedEntity<> 接口的具体类
  • 无需手动配置,自动发现需要创建索引的实体

执行流程

StartAsync 方法流程:

  1. 获取分布式锁

    • 尝试获取启动锁,超时时间 3 秒
    • 如果获取失败,跳过初始化
  2. 检查配置

    • 验证 MongoDB 连接字符串是否配置
    • 加载实体程序集
  3. 检查执行频率

    • 从缓存中获取上次成功时间
    • 如果距离上次成功不足 6 小时,跳过执行
  4. 扫描和初始化

    • 扫描所有实现 IIndexedEntity<> 的类型
    • 逐个调用 mongoIndexInitializer.EnsureForTypeAsync 创建索引
    • 记录失败次数和执行时间
  5. 更新缓存

    • 如果全部成功,更新最后成功时间到缓存
    • 缓存 TTL 为 7 天

StopAsync 方法:

  • 应用停止时调用 mongoIndexInitializer.ResetAsync 进行清理

错误处理和日志

  • 完善的日志记录,包括实例 ID、执行状态、耗时等信息
  • 对单个实体索引创建失败进行容错处理,不影响其他实体
  • 统计失败数量并记录到日志

设计优势

  1. 高可用性:分布式锁确保多实例安全
  2. 性能优化:智能跳过机制避免重复执行
  3. 自动化:无需手动配置,自动发现实体
  4. 容错性:单个失败不影响整体流程
  5. 可观测性:详细的日志记录便于监控和调试

这是一个设计良好的基础设施服务,特别适合微服务架构中的 MongoDB 索引管理需求。

评论加载中...