using Dpz.Core.Infrastructure;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Dpz.Core.Infrastructure.Entity;
namespace Dpz.Core.MongodbAccess;
public class RepositoryUnitOfWork<T> : IRepository<T> where T : IBaseEntity, new()
{
private readonly MongodbAccess<T> _access;
private readonly IClientSessionHandle _session;
public RepositoryUnitOfWork(MongodbAccess<T> access, IClientSessionHandle session)
{
_access = access;
_session = session;
}
public IMongoCollection<T> Collection => _access.Collection;
public IMongoQueryable<T> MongodbQueryable =>
Database.GetCollection<T>(_access.CollectionName)
.AsQueryable(_session, new AggregateOptions { AllowDiskUse = true });
public IMongoQueryable<T> SearchFor(Expression<Func<T, bool>> predicate)
{
return MongodbQueryable.Where(predicate);
}
public IFindFluent<T, T> SearchFor(FilterDefinition<T> filter)
{
return Collection.Find(filter);
}
public async IAsyncEnumerable<T> SearchForAsync(FilterDefinition<T> filter)
{
var result = await Collection.FindAsync(filter);
while (await result.MoveNextAsync())
{
foreach (var item in result.Current)
{
yield return item;
}
}
}
public async Task<T?> FindAsync(object id)
{
var filter = MongodbExtensions.GetIdPropertyFilter<T>(id);
return await (await Collection.FindAsync(filter)).SingleOrDefaultAsync();
}
public async Task InsertAsync(params T[] source)
{
if (source.Length != 0)
await Collection.InsertManyAsync(_session, source);
}
public async Task InsertAsync(IReadOnlyCollection<T> source)
{
if (source.Count != 0)
await Collection.InsertManyAsync(_session, source);
}
public async Task<DeleteResult> DeleteAsync(Expression<Func<T, bool>> filter)
{
var result = await Collection.DeleteManyAsync(_session, filter);
return result;
}
public async Task<DeleteResult> DeleteAsync(FilterDefinition<T> filter)
{
var result = await Collection.DeleteManyAsync(_session, filter);
return result;
}
public async Task<DeleteResult> DeleteAsync(object id)
{
var filter = MongodbExtensions.GetIdPropertyFilter<T>(id);
return await Collection.DeleteOneAsync(_session, filter);
}
public async Task<UpdateResult> UpdateAsync(Expression<Func<T, bool>> predicate, UpdateDefinition<T> update)
{
var result = await Collection.UpdateManyAsync(_session, predicate, update);
return result;
}
public async Task<ReplaceOneResult> UpdateAsync(T entity)
{
var filter = MongodbExtensions.GetIdPropertyFilter(entity);
var result = await Collection.ReplaceOneAsync(_session, filter, entity);
return result;
}
public async Task<BulkWriteResult<T>> UpdateAsync(IEnumerable<T> entities)
{
var writes = entities
.Select(x => new ReplaceOneModel<T>(MongodbExtensions.GetIdPropertyFilter(x), x))
.ToList();
return await Collection.BulkWriteAsync(_session, writes);
}
public IMongoDatabase Database => _access.Database;
}