网站首页 网站源码
website
站点相关全部源代码,隐藏了一些关于服务器的信息
using Dpz.Core.Infrastructure;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Dpz.Core.Infrastructure.Entity;

namespace Dpz.Core.MongodbAccess;

public class RepositoryUnitOfWork<T> : IRepository<T> where T : IBaseEntity, new()
{
    private readonly MongodbAccess<T> _access;

    private readonly IClientSessionHandle _session;

    public RepositoryUnitOfWork(MongodbAccess<T> access, IClientSessionHandle session)
    {
        _access = access;
        _session = session;
    }

    public IMongoCollection<T> Collection => _access.Collection;

    public IMongoQueryable<T> MongodbQueryable =>
        Database.GetCollection<T>(_access.CollectionName)
            .AsQueryable(_session, new AggregateOptions { AllowDiskUse = true });

    public IMongoQueryable<T> SearchFor(Expression<Func<T, bool>> predicate)
    {
        return MongodbQueryable.Where(predicate);
    }

    public IFindFluent<T, T> SearchFor(FilterDefinition<T> filter)
    {
        return Collection.Find(filter);
    }

    public async IAsyncEnumerable<T> SearchForAsync(FilterDefinition<T> filter)
    {
        var result = await Collection.FindAsync(filter);
        while (await result.MoveNextAsync())
        {
            foreach (var item in result.Current)
            {
                yield return item;
            }
        }
    }

    public async Task<T?> FindAsync(object id)
    {
        var filter = MongodbExtensions.GetIdPropertyFilter<T>(id);
        return await (await Collection.FindAsync(filter)).SingleOrDefaultAsync();
    }

    public async Task InsertAsync(params T[] source)
    {
        if (source.Length != 0)
            await Collection.InsertManyAsync(_session, source);
    }

    public async Task InsertAsync(IReadOnlyCollection<T> source)
    {
        if (source.Count != 0)
            await Collection.InsertManyAsync(_session, source);
    }

    public async Task<DeleteResult> DeleteAsync(Expression<Func<T, bool>> filter)
    {
        var result = await Collection.DeleteManyAsync(_session, filter);
        return result;
    }

    public async Task<DeleteResult> DeleteAsync(FilterDefinition<T> filter)
    {
        var result = await Collection.DeleteManyAsync(_session, filter);
        return result;
    }

    public async Task<DeleteResult> DeleteAsync(object id)
    {
        var filter = MongodbExtensions.GetIdPropertyFilter<T>(id);
        return await Collection.DeleteOneAsync(_session, filter);
    }

    public async Task<UpdateResult> UpdateAsync(Expression<Func<T, bool>> predicate, UpdateDefinition<T> update)
    {
        var result = await Collection.UpdateManyAsync(_session, predicate, update);
        return result;
    }

    public async Task<ReplaceOneResult> UpdateAsync(T entity)
    {
        var filter = MongodbExtensions.GetIdPropertyFilter(entity);
        var result = await Collection.ReplaceOneAsync(_session, filter, entity);
        return result;
    }

    public async Task<BulkWriteResult<T>> UpdateAsync(IEnumerable<T> entities)
    {
        var writes = entities
            .Select(x => new ReplaceOneModel<T>(MongodbExtensions.GetIdPropertyFilter(x), x))
            .ToList();

        return await Collection.BulkWriteAsync(_session, writes);
    }

    public IMongoDatabase Database => _access.Database;
}
loading