using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Dpz.Core.Entity.Base;
using Dpz.Core.Entity.Base.Indexes;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;

namespace Dpz.Core.MongodbAccess;

/// <summary>
/// MongoDB 索引初始化器,根据实体上的 <see cref="IIndexedEntity{T}"/> 定义自动创建索引。
/// 每个实体类型在应用生命周期内只执行一次索引检查。
/// </summary>
public sealed class MongoIndexInitializer(ILogger<MongoIndexInitializer> logger)
    : IMongoIndexInitializer
{
    private readonly ConcurrentDictionary<Type, Task> _initializedTypes = new();

    /// <summary>
    /// 确保实体对应的集合存在必要的索引(每个类型只执行一次)
    /// </summary>
    public Task EnsureAsync<T>(
        IMongoCollection<T> collection,
        CancellationToken cancellationToken = default
    )
        where T : IBaseEntity
    {
        return _initializedTypes.GetOrAdd(
            typeof(T),
            _ => EnsureCoreAsync(collection, cancellationToken)
        );
    }

    /// <summary>
    /// 根据运行时类型确保索引存在(用于启动时批量初始化)
    /// </summary>
    public Task EnsureForTypeAsync(
        Type entityType,
        string connectionString,
        CancellationToken cancellationToken = default
    )
    {
        return _initializedTypes.GetOrAdd(
            entityType,
            _ =>
            {
                var method = GetType()
                    .GetMethod(
                        nameof(EnsureForTypeCoreAsync),
                        BindingFlags.NonPublic | BindingFlags.Instance
                    )!
                    .MakeGenericMethod(entityType);
                return (Task)method.Invoke(this, [connectionString, cancellationToken])!;
            }
        );
    }

    public Task ResetAsync(CancellationToken cancellationToken = default)
    {
        cancellationToken.ThrowIfCancellationRequested();
        _initializedTypes.Clear();
        logger.LogInformation("MongoDB 索引初始化状态已重置");
        return Task.CompletedTask;
    }

    private Task EnsureForTypeCoreAsync<T>(
        string connectionString,
        CancellationToken cancellationToken
    )
        where T : IBaseEntity
    {
        var access = new MongodbAccess<T>(connectionString);
        return EnsureCoreAsync(access.Collection, cancellationToken);
    }

    private async Task EnsureCoreAsync<T>(
        IMongoCollection<T> collection,
        CancellationToken cancellationToken
    )
        where T : IBaseEntity
    {
        try
        {
            var totalStopwatch = Stopwatch.StartNew();
            var definitions = GetIndexDefinitions<T>();
            if (definitions is not { Count: > 0 })
            {
                return;
            }

            var existingIndexes = await (
                await collection.Indexes.ListAsync(cancellationToken)
            ).ToListAsync(cancellationToken);

            var existingIndexMap = existingIndexes
                .Where(static index => index.TryGetValue("name", out _))
                .ToDictionary(
                    static index => index["name"].AsString,
                    static index => index,
                    StringComparer.OrdinalIgnoreCase
                );

            var createdCount = 0;
            var droppedCount = 0;
            var matchedCount = 0;

            foreach (var def in definitions)
            {
                var resolvedName = def.GetResolvedName();
                var model = BuildIndexModel<T>(def, resolvedName);
                if (model == null)
                {
                    continue;
                }

                if (existingIndexMap.TryGetValue(resolvedName, out var existing))
                {
                    if (IsIndexMatched(existing, def))
                    {
                        matchedCount++;
                        continue;
                    }

                    // 同名索引定义发生漂移时,遵循代码优先,删除并重建。
                    var dropStopwatch = Stopwatch.StartNew();
                    await collection.Indexes.DropOneAsync(resolvedName, cancellationToken);
                    dropStopwatch.Stop();
                    droppedCount++;
                    logger.LogInformation(
                        "删除索引完成: {EntityType}.{IndexName}, 耗时 {ElapsedMs}ms",
                        typeof(T).Name,
                        resolvedName,
                        dropStopwatch.ElapsedMilliseconds
                    );
                }

                var createStopwatch = Stopwatch.StartNew();
                await collection.Indexes.CreateOneAsync(
                    model,
                    cancellationToken: cancellationToken
                );
                createStopwatch.Stop();
                createdCount++;
                logger.LogInformation(
                    "创建索引完成: {EntityType}.{IndexName}, 耗时 {ElapsedMs}ms",
                    typeof(T).Name,
                    resolvedName,
                    createStopwatch.ElapsedMilliseconds
                );
            }

            totalStopwatch.Stop();
            logger.LogInformation(
                "索引初始化完成: {EntityType}, 定义={DefinedCount}, 已匹配={MatchedCount}, 删除={DroppedCount}, 创建={CreatedCount}, 总耗时={ElapsedMs}ms",
                typeof(T).Name,
                definitions.Count,
                matchedCount,
                droppedCount,
                createdCount,
                totalStopwatch.ElapsedMilliseconds
            );
        }
        catch (Exception e)
        {
            // 失败时移除已缓存的 Task,允许下次重试
            _initializedTypes.TryRemove(typeof(T), out _);
            logger.LogError(e, "索引确保失败: {EntityType}", typeof(T).Name);
        }
    }

    private static CreateIndexModel<T>? BuildIndexModel<T>(
        EntityIndexDefinition def,
        string resolvedName
    )
        where T : IBaseEntity
    {
        IndexKeysDefinition<T>? keys = null;

        foreach (var field in def.Fields)
        {
            IndexKeysDefinition<T> key = field.KeyType switch
            {
                IndexKeyType.Text => Builders<T>.IndexKeys.Text(field.FieldName),
                _ => field.Direction == IndexSortDirection.Descending
                    ? Builders<T>.IndexKeys.Descending(field.FieldName)
                    : Builders<T>.IndexKeys.Ascending(field.FieldName),
            };
            keys = keys == null ? key : Builders<T>.IndexKeys.Combine(keys, key);
        }

        if (keys == null)
        {
            return null;
        }

        var options = new CreateIndexOptions { Name = resolvedName, Unique = def.Unique };

        // 文本索引权重
        if (def.TextWeights is { Count: > 0 })
        {
            var weightsDoc = new BsonDocument();
            foreach (var (fieldName, weight) in def.TextWeights)
            {
                weightsDoc[fieldName] = weight;
            }

            options.Weights = weightsDoc;
        }

        if (!string.IsNullOrEmpty(def.DefaultLanguage))
        {
            options.DefaultLanguage = def.DefaultLanguage;
        }

        return new CreateIndexModel<T>(keys, options);
    }

    private static bool IsIndexMatched(BsonDocument existing, EntityIndexDefinition def)
    {
        var hasTextField = def.Fields.Any(static field => field.KeyType == IndexKeyType.Text);

        if (!MatchesKeyDefinition(existing, def.Fields, hasTextField))
        {
            return false;
        }

        var existingUnique =
            existing.TryGetValue("unique", out var uniqueValue) && uniqueValue.ToBoolean();
        if (existingUnique != def.Unique)
        {
            return false;
        }

        if (!hasTextField)
        {
            return true;
        }

        if (def.TextWeights is { Count: > 0 } && !MatchesTextWeights(existing, def.TextWeights))
        {
            return false;
        }

        if (!string.IsNullOrWhiteSpace(def.DefaultLanguage))
        {
            if (!existing.TryGetValue("default_language", out var languageValue))
            {
                return false;
            }

            if (
                !string.Equals(
                    languageValue.AsString,
                    def.DefaultLanguage,
                    StringComparison.Ordinal
                )
            )
            {
                return false;
            }
        }

        return true;
    }

    private static bool MatchesKeyDefinition(
        BsonDocument existing,
        IReadOnlyList<EntityIndexField> fields,
        bool hasTextField
    )
    {
        if (!existing.TryGetValue("key", out var keyValue) || keyValue is not BsonDocument keyDoc)
        {
            return false;
        }

        if (hasTextField)
        {
            return keyDoc.TryGetValue("_fts", out var textType) && textType == "text";
        }

        if (keyDoc.ElementCount != fields.Count)
        {
            return false;
        }

        for (var index = 0; index < fields.Count; index++)
        {
            var expectedField = fields[index];
            var element = keyDoc.GetElement(index);

            if (!string.Equals(element.Name, expectedField.FieldName, StringComparison.Ordinal))
            {
                return false;
            }

            var expectedDirection = (int)expectedField.Direction;
            if (!int.TryParse(element.Value.ToString(), out var existingDirection))
            {
                return false;
            }

            if (existingDirection != expectedDirection)
            {
                return false;
            }
        }

        return true;
    }

    private static bool MatchesTextWeights(
        BsonDocument existing,
        IReadOnlyDictionary<string, int> expectedWeights
    )
    {
        if (
            !existing.TryGetValue("weights", out var weightsValue)
            || weightsValue is not BsonDocument weights
        )
        {
            return false;
        }

        if (weights.ElementCount != expectedWeights.Count)
        {
            return false;
        }

        foreach (var (fieldName, expectedWeight) in expectedWeights)
        {
            if (!weights.TryGetValue(fieldName, out var weightValue))
            {
                return false;
            }

            if (!int.TryParse(weightValue.ToString(), out var actualWeight))
            {
                return false;
            }

            if (actualWeight != expectedWeight)
            {
                return false;
            }
        }

        return true;
    }

    private static IReadOnlyList<EntityIndexDefinition>? GetIndexDefinitions<T>()
        where T : IBaseEntity
    {
        if (!typeof(IIndexedEntity<T>).IsAssignableFrom(typeof(T)))
        {
            return null;
        }

        // 通过带完整约束的辅助方法调用 static abstract 接口方法
        var method = typeof(MongoIndexInitializer)
            .GetMethod(nameof(GetDefinitionsCore), BindingFlags.NonPublic | BindingFlags.Static)!
            .MakeGenericMethod(typeof(T));

        return (IReadOnlyList<EntityIndexDefinition>?)method.Invoke(null, null);
    }

    private static IReadOnlyList<EntityIndexDefinition> GetDefinitionsCore<T>()
        where T : IBaseEntity, IIndexedEntity<T>
    {
        return T.GetIndexDefinitions();
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个用于MongoDB的索引初始化器类,它的主要功能是根据实体类上定义的索引配置自动创建和管理MongoDB索引。下面是详细的代码解释:

类的核心功能

这个类实现了IMongoIndexInitializer接口,用于:

  1. 自动检测实体类上定义的索引
  2. 与数据库中现有索引进行比较
  3. 创建缺失的索引,删除不匹配的索引
  4. 确保每个实体类型在应用生命周期内只执行一次索引检查

主要特性

1. 单次执行保证

private readonly ConcurrentDictionary<Type, Task> _initializedTypes = new();

使用ConcurrentDictionary缓存每个类型的初始化任务,确保相同类型不会重复执行索引检查。

2. 泛型方法支持

提供了两个主要的公共方法:

  • EnsureAsync<T>: 针对具体的MongoDB集合确保索引
  • EnsureForTypeAsync: 根据运行时类型确保索引(用于启动时批量初始化)

3. 索引匹配和管理

核心逻辑在EnsureCoreAsync方法中:

private async Task EnsureCoreAsync<T>(IMongoCollection<T> collection, CancellationToken cancellationToken)

该方法执行以下步骤:

  1. 获取索引定义:从实体类获取应该存在的索引定义
  2. 列出现有索引:查询数据库中当前存在的索引
  3. 比较和处理
    • 如果索引匹配,跳过
    • 如果索引定义发生变化,删除旧索引并重新创建
    • 如果索引不存在,创建新索引

4. 索引模型构建

BuildIndexModel方法负责根据索引定义构建MongoDB的索引模型:

private static CreateIndexModel<T>? BuildIndexModel<T>(EntityIndexDefinition def, string resolvedName)

支持多种索引类型:

  • 升序/降序索引
  • 文本索引
  • 复合索引
  • 唯一索引

5. 索引匹配验证

IsIndexMatched方法用于判断现有索引是否与定义匹配:

  • 检查键定义是否一致
  • 检查唯一性约束
  • 对于文本索引,还检查权重和默认语言设置

6. 反射机制

使用反射来处理泛型约束和静态抽象接口方法:

private static IReadOnlyList<EntityIndexDefinition> GetDefinitionsCore<T>()
    where T : IBaseEntity, IIndexedEntity<T>
{
    return T.GetIndexDefinitions();
}

设计模式和最佳实践

  1. 依赖注入:通过构造函数注入ILogger
  2. 异步编程:全面使用async/await模式
  3. 取消令牌支持:支持操作取消
  4. 错误处理:包含完整的异常处理和日志记录
  5. 性能监控:使用Stopwatch记录操作耗时
  6. 线程安全:使用ConcurrentDictionary确保线程安全

使用场景

这个类特别适用于:

  • 微服务架构中需要自动管理MongoDB索引的场景
  • 需要在应用启动时批量初始化数据库索引
  • 支持代码优先的索引管理策略
  • 需要确保索引定义与代码同步的场景

整体来说,这是一个设计良好的MongoDB索引管理工具,提供了完整的索引生命周期管理功能。

评论加载中...