using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Dpz.Core.Entity.Base;
using Dpz.Core.Entity.Base.Indexes;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;
namespace Dpz.Core.MongodbAccess;
/// <summary>
/// MongoDB 索引初始化器,根据实体上的 <see cref="IIndexedEntity{T}"/> 定义自动创建索引。
/// 每个实体类型在应用生命周期内只执行一次索引检查。
/// </summary>
public sealed class MongoIndexInitializer(ILogger<MongoIndexInitializer> logger)
: IMongoIndexInitializer
{
private readonly ConcurrentDictionary<Type, Task> _initializedTypes = new();
/// <summary>
/// 确保实体对应的集合存在必要的索引(每个类型只执行一次)
/// </summary>
public Task EnsureAsync<T>(
IMongoCollection<T> collection,
CancellationToken cancellationToken = default
)
where T : IBaseEntity
{
return _initializedTypes.GetOrAdd(
typeof(T),
_ => EnsureCoreAsync(collection, cancellationToken)
);
}
/// <summary>
/// 根据运行时类型确保索引存在(用于启动时批量初始化)
/// </summary>
public Task EnsureForTypeAsync(
Type entityType,
string connectionString,
CancellationToken cancellationToken = default
)
{
return _initializedTypes.GetOrAdd(
entityType,
_ =>
{
var method = GetType()
.GetMethod(
nameof(EnsureForTypeCoreAsync),
BindingFlags.NonPublic | BindingFlags.Instance
)!
.MakeGenericMethod(entityType);
return (Task)method.Invoke(this, [connectionString, cancellationToken])!;
}
);
}
public Task ResetAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
_initializedTypes.Clear();
logger.LogInformation("MongoDB 索引初始化状态已重置");
return Task.CompletedTask;
}
private Task EnsureForTypeCoreAsync<T>(
string connectionString,
CancellationToken cancellationToken
)
where T : IBaseEntity
{
var access = new MongodbAccess<T>(connectionString);
return EnsureCoreAsync(access.Collection, cancellationToken);
}
private async Task EnsureCoreAsync<T>(
IMongoCollection<T> collection,
CancellationToken cancellationToken
)
where T : IBaseEntity
{
try
{
var totalStopwatch = Stopwatch.StartNew();
var definitions = GetIndexDefinitions<T>();
if (definitions is not { Count: > 0 })
{
return;
}
var existingIndexes = await (
await collection.Indexes.ListAsync(cancellationToken)
).ToListAsync(cancellationToken);
var existingIndexMap = existingIndexes
.Where(static index => index.TryGetValue("name", out _))
.ToDictionary(
static index => index["name"].AsString,
static index => index,
StringComparer.OrdinalIgnoreCase
);
var createdCount = 0;
var droppedCount = 0;
var matchedCount = 0;
foreach (var def in definitions)
{
var resolvedName = def.GetResolvedName();
var model = BuildIndexModel<T>(def, resolvedName);
if (model == null)
{
continue;
}
if (existingIndexMap.TryGetValue(resolvedName, out var existing))
{
if (IsIndexMatched(existing, def))
{
matchedCount++;
continue;
}
// 同名索引定义发生漂移时,遵循代码优先,删除并重建。
var dropStopwatch = Stopwatch.StartNew();
await collection.Indexes.DropOneAsync(resolvedName, cancellationToken);
dropStopwatch.Stop();
droppedCount++;
logger.LogInformation(
"删除索引完成: {EntityType}.{IndexName}, 耗时 {ElapsedMs}ms",
typeof(T).Name,
resolvedName,
dropStopwatch.ElapsedMilliseconds
);
}
var createStopwatch = Stopwatch.StartNew();
await collection.Indexes.CreateOneAsync(
model,
cancellationToken: cancellationToken
);
createStopwatch.Stop();
createdCount++;
logger.LogInformation(
"创建索引完成: {EntityType}.{IndexName}, 耗时 {ElapsedMs}ms",
typeof(T).Name,
resolvedName,
createStopwatch.ElapsedMilliseconds
);
}
totalStopwatch.Stop();
logger.LogInformation(
"索引初始化完成: {EntityType}, 定义={DefinedCount}, 已匹配={MatchedCount}, 删除={DroppedCount}, 创建={CreatedCount}, 总耗时={ElapsedMs}ms",
typeof(T).Name,
definitions.Count,
matchedCount,
droppedCount,
createdCount,
totalStopwatch.ElapsedMilliseconds
);
}
catch (Exception e)
{
// 失败时移除已缓存的 Task,允许下次重试
_initializedTypes.TryRemove(typeof(T), out _);
logger.LogError(e, "索引确保失败: {EntityType}", typeof(T).Name);
}
}
private static CreateIndexModel<T>? BuildIndexModel<T>(
EntityIndexDefinition def,
string resolvedName
)
where T : IBaseEntity
{
IndexKeysDefinition<T>? keys = null;
foreach (var field in def.Fields)
{
IndexKeysDefinition<T> key = field.KeyType switch
{
IndexKeyType.Text => Builders<T>.IndexKeys.Text(field.FieldName),
_ => field.Direction == IndexSortDirection.Descending
? Builders<T>.IndexKeys.Descending(field.FieldName)
: Builders<T>.IndexKeys.Ascending(field.FieldName),
};
keys = keys == null ? key : Builders<T>.IndexKeys.Combine(keys, key);
}
if (keys == null)
{
return null;
}
var options = new CreateIndexOptions { Name = resolvedName, Unique = def.Unique };
// 文本索引权重
if (def.TextWeights is { Count: > 0 })
{
var weightsDoc = new BsonDocument();
foreach (var (fieldName, weight) in def.TextWeights)
{
weightsDoc[fieldName] = weight;
}
options.Weights = weightsDoc;
}
if (!string.IsNullOrEmpty(def.DefaultLanguage))
{
options.DefaultLanguage = def.DefaultLanguage;
}
return new CreateIndexModel<T>(keys, options);
}
private static bool IsIndexMatched(BsonDocument existing, EntityIndexDefinition def)
{
var hasTextField = def.Fields.Any(static field => field.KeyType == IndexKeyType.Text);
if (!MatchesKeyDefinition(existing, def.Fields, hasTextField))
{
return false;
}
var existingUnique =
existing.TryGetValue("unique", out var uniqueValue) && uniqueValue.ToBoolean();
if (existingUnique != def.Unique)
{
return false;
}
if (!hasTextField)
{
return true;
}
if (def.TextWeights is { Count: > 0 } && !MatchesTextWeights(existing, def.TextWeights))
{
return false;
}
if (!string.IsNullOrWhiteSpace(def.DefaultLanguage))
{
if (!existing.TryGetValue("default_language", out var languageValue))
{
return false;
}
if (
!string.Equals(
languageValue.AsString,
def.DefaultLanguage,
StringComparison.Ordinal
)
)
{
return false;
}
}
return true;
}
private static bool MatchesKeyDefinition(
BsonDocument existing,
IReadOnlyList<EntityIndexField> fields,
bool hasTextField
)
{
if (!existing.TryGetValue("key", out var keyValue) || keyValue is not BsonDocument keyDoc)
{
return false;
}
if (hasTextField)
{
return keyDoc.TryGetValue("_fts", out var textType) && textType == "text";
}
if (keyDoc.ElementCount != fields.Count)
{
return false;
}
for (var index = 0; index < fields.Count; index++)
{
var expectedField = fields[index];
var element = keyDoc.GetElement(index);
if (!string.Equals(element.Name, expectedField.FieldName, StringComparison.Ordinal))
{
return false;
}
var expectedDirection = (int)expectedField.Direction;
if (!int.TryParse(element.Value.ToString(), out var existingDirection))
{
return false;
}
if (existingDirection != expectedDirection)
{
return false;
}
}
return true;
}
private static bool MatchesTextWeights(
BsonDocument existing,
IReadOnlyDictionary<string, int> expectedWeights
)
{
if (
!existing.TryGetValue("weights", out var weightsValue)
|| weightsValue is not BsonDocument weights
)
{
return false;
}
if (weights.ElementCount != expectedWeights.Count)
{
return false;
}
foreach (var (fieldName, expectedWeight) in expectedWeights)
{
if (!weights.TryGetValue(fieldName, out var weightValue))
{
return false;
}
if (!int.TryParse(weightValue.ToString(), out var actualWeight))
{
return false;
}
if (actualWeight != expectedWeight)
{
return false;
}
}
return true;
}
private static IReadOnlyList<EntityIndexDefinition>? GetIndexDefinitions<T>()
where T : IBaseEntity
{
if (!typeof(IIndexedEntity<T>).IsAssignableFrom(typeof(T)))
{
return null;
}
// 通过带完整约束的辅助方法调用 static abstract 接口方法
var method = typeof(MongoIndexInitializer)
.GetMethod(nameof(GetDefinitionsCore), BindingFlags.NonPublic | BindingFlags.Static)!
.MakeGenericMethod(typeof(T));
return (IReadOnlyList<EntityIndexDefinition>?)method.Invoke(null, null);
}
private static IReadOnlyList<EntityIndexDefinition> GetDefinitionsCore<T>()
where T : IBaseEntity, IIndexedEntity<T>
{
return T.GetIndexDefinitions();
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于MongoDB的索引初始化器类,它的主要功能是根据实体类上定义的索引配置自动创建和管理MongoDB索引。下面是详细的代码解释:
类的核心功能
这个类实现了IMongoIndexInitializer接口,用于:
- 自动检测实体类上定义的索引
- 与数据库中现有索引进行比较
- 创建缺失的索引,删除不匹配的索引
- 确保每个实体类型在应用生命周期内只执行一次索引检查
主要特性
1. 单次执行保证
private readonly ConcurrentDictionary<Type, Task> _initializedTypes = new();
使用ConcurrentDictionary缓存每个类型的初始化任务,确保相同类型不会重复执行索引检查。
2. 泛型方法支持
提供了两个主要的公共方法:
EnsureAsync<T>: 针对具体的MongoDB集合确保索引EnsureForTypeAsync: 根据运行时类型确保索引(用于启动时批量初始化)
3. 索引匹配和管理
核心逻辑在EnsureCoreAsync方法中:
private async Task EnsureCoreAsync<T>(IMongoCollection<T> collection, CancellationToken cancellationToken)
该方法执行以下步骤:
- 获取索引定义:从实体类获取应该存在的索引定义
- 列出现有索引:查询数据库中当前存在的索引
- 比较和处理:
- 如果索引匹配,跳过
- 如果索引定义发生变化,删除旧索引并重新创建
- 如果索引不存在,创建新索引
4. 索引模型构建
BuildIndexModel方法负责根据索引定义构建MongoDB的索引模型:
private static CreateIndexModel<T>? BuildIndexModel<T>(EntityIndexDefinition def, string resolvedName)
支持多种索引类型:
- 升序/降序索引
- 文本索引
- 复合索引
- 唯一索引
5. 索引匹配验证
IsIndexMatched方法用于判断现有索引是否与定义匹配:
- 检查键定义是否一致
- 检查唯一性约束
- 对于文本索引,还检查权重和默认语言设置
6. 反射机制
使用反射来处理泛型约束和静态抽象接口方法:
private static IReadOnlyList<EntityIndexDefinition> GetDefinitionsCore<T>()
where T : IBaseEntity, IIndexedEntity<T>
{
return T.GetIndexDefinitions();
}
设计模式和最佳实践
- 依赖注入:通过构造函数注入
ILogger - 异步编程:全面使用async/await模式
- 取消令牌支持:支持操作取消
- 错误处理:包含完整的异常处理和日志记录
- 性能监控:使用
Stopwatch记录操作耗时 - 线程安全:使用
ConcurrentDictionary确保线程安全
使用场景
这个类特别适用于:
- 微服务架构中需要自动管理MongoDB索引的场景
- 需要在应用启动时批量初始化数据库索引
- 支持代码优先的索引管理策略
- 需要确保索引定义与代码同步的场景
整体来说,这是一个设计良好的MongoDB索引管理工具,提供了完整的索引生命周期管理功能。
评论加载中...