using Dpz.Core.Infrastructure;
using Dpz.Core.Infrastructure.PublicStruct;
using Dpz.Core.Shard.Service;
using FluentFTP;
using Microsoft.Extensions.Logging;
namespace Dpz.Core.Service.ObjectStorage.Services.Impl;
public class SafeFileService(
HttpClient httpClient,
SafeFileConfiguration safeFileOperator,
ParallelChunkBreakpointUpload<SafeFileConfiguration> parallelChunkBreakpointUpload,
UpyunUpload upyunUpload,
ILogger<SafeFileService> logger,
IFtpLogger ftpLogger
) : ISafeFileService
{
public async Task<UploadResult> UploadAsync(
Stream stream,
ICollection<string> path,
string filename
)
{
return await UploadAsync(path, filename, () => new StreamContent(stream));
}
public async Task<FileAddress?> UploadFileAsync(CloudFile file)
{
return await parallelChunkBreakpointUpload.UploadFileAsync(file, UploadAsync);
}
public async Task<Stream> DownloadAsync(string pathToFile)
{
// 构建 URI
var uriParts = pathToFile.Split('/').Select(Uri.EscapeDataString).ToList();
uriParts.Insert(0, "");
uriParts.Insert(
1,
safeFileOperator.Bucket ?? throw new BusinessException("bucket is null")
);
var uri = string.Join('/', uriParts);
var request = new HttpRequestMessage(HttpMethod.Get, uri);
await request.SignatureAsync(safeFileOperator);
var response = await httpClient.SendAsync(request);
if (!response.IsSuccessStatusCode)
{
logger.LogError("download fail,status code:{StatusCode}", response.StatusCode);
throw new BusinessException(
$"download fail,response status code:{response.StatusCode}"
);
}
var stream = await response.Content.ReadAsStreamAsync();
return stream;
}
private async Task<UploadResult> UploadAsync(
ICollection<string> pathToFile,
Func<HttpContent> setContent,
string? contentMd5 = null
)
{
return await upyunUpload.UploadAsync(pathToFile, setContent, safeFileOperator, contentMd5);
}
private async Task<UploadResult> UploadAsync(
IEnumerable<string> path,
string filename,
Func<HttpContent> setContent,
string? contentMd5 = null
)
{
if (string.IsNullOrEmpty(filename))
throw new ArgumentNullException(nameof(filename));
var pathList = path.ToList();
pathList.Add(filename);
return await UploadAsync(pathList, setContent, contentMd5);
}
public async Task<FileAddress?> UploadFileForFtpAsync(CloudFile file)
{
var remotePath = string.Join("/", file.PathToFile);
#region new Fluent FTP Client
await using var client = new AsyncFtpClient(
"v0.ftp.upyun.com",
$"{safeFileOperator.Operator}/{safeFileOperator.Bucket}",
safeFileOperator.Password,
logger: ftpLogger
);
client.Config.ConnectTimeout = 1000 * 60 * 3;
client.Config.ReadTimeout = 1000 * 60 * 60;
client.Config.DataConnectionConnectTimeout = 1000 * 60 * 60;
client.Config.DataConnectionReadTimeout = 1000 * 60 * 60;
#endregion
var status = FtpStatus.Skipped;
try
{
await client.Connect();
status = await ApplicationTools.RetryAsync(
async () => await client.UploadStream(file.Stream, remotePath),
TimeSpan.FromSeconds(3)
);
}
catch (Exception ex) when (status != FtpStatus.Success)
{
logger.LogError(ex, "upload fail");
return null;
}
finally
{
await client.Disconnect();
try
{
await file.Stream.DisposeAsync();
}
catch (Exception e)
{
logger.LogError(e,"dispose stream fail");
}
}
return new FileAddress(remotePath, "");
}
}