using System.Buffers;
using System.Security.Cryptography;
using Dpz.Core.Infrastructure;
using Dpz.Core.Infrastructure.PublicStruct;
using Dpz.Core.Shard.Service;
using Microsoft.Extensions.Logging;
using Microsoft.IO;
namespace Dpz.Core.Service.ObjectStorage;
public delegate Task<UploadResult> UploadAsync(
ICollection<string> pathToFile,
Func<HttpContent> setContent,
string? contentMd5 = null
);
/// <summary>
/// 分片断点续传上传
/// </summary>
/// <param name="logger"></param>
/// <param name="httpClient"></param>
/// <param name="upyunOperator"></param>
public class ParallelChunkBreakpointUpload<TUpyunOperator>(
ILogger<ParallelChunkBreakpointUpload<TUpyunOperator>> logger,
HttpClient httpClient,
TUpyunOperator upyunOperator
)
where TUpyunOperator : UpyunOperator
{
private readonly int _chunkSize = 1 << 20;
/// <summary>
///
/// </summary>
/// <param name="file"></param>
/// <param name="smallFileUpload"></param>
/// <param name="checkMd5"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public async Task<FileAddress?> UploadFileAsync(
CloudFile file,
UploadAsync smallFileUpload,
bool checkMd5 = false
)
{
await using var stream = ApplicationTools.MemoryStreamManager.GetStream();
await file.Stream.CopyToAsync(stream);
stream.Position = 0;
using var md5 = MD5.Create();
var hashBytes = await md5.ComputeHashAsync(stream);
var md5Value = BitConverter.ToString(hashBytes).Replace("-", "").ToLowerInvariant();
logger.LogInformation("file:{@FileToPath},MD5 value:{MD5Value}", file.PathToFile, md5Value);
stream.Position = 0;
if (stream.Length <= _chunkSize)
{
var result = await smallFileUpload(file.PathToFile, () => new StreamContent(stream));
return new FileAddress(result.AccessUrl, md5Value);
}
var pathToFile = string.Join("/", file.PathToFile.Select(Uri.EscapeDataString));
var taskId = await BeginUploadAsync(pathToFile, stream.Length);
await SplitChunkAsync(stream, pathToFile, taskId);
await CompleteUploadAsync(pathToFile, taskId);
if (checkMd5)
{
var match = await CheckMd5Async(pathToFile, md5Value);
if (!match)
{
logger.LogError(
"MD5 value are inconsistent,local calculate MD5 value:{MD5Value},path to file:{@PathToFile}",
md5Value,
pathToFile
);
throw new Exception("md5 value are inconsistent");
}
}
var url =
(
upyunOperator.Host?.LastOrDefault() == '/'
? upyunOperator.Host
: upyunOperator.Host + "/"
) + pathToFile;
return new FileAddress(url, md5Value);
}
private HttpRequestMessage GetRequest(string pathToFile)
{
var request = new HttpRequestMessage(
HttpMethod.Put,
$"/{upyunOperator.Bucket}/{pathToFile}"
)
{
Version = new Version(2, 0)
};
return request;
}
private async Task<string> BeginUploadAsync(string pathToFile, long fileSize)
{
return await ApplicationTools.RetryAsync(
async () =>
{
var request = GetRequest(pathToFile);
request.Headers.Add("X-Upyun-Multi-Disorder", "true");
request.Headers.Add("X-Upyun-Multi-Stage", "initiate");
request.Headers.Add("X-Upyun-Multi-Length", $"{fileSize}");
request.Headers.Add("X-Upyun-Multi-Part-Size", $"{_chunkSize}");
#if DEBUG
request.Headers.Add("X-Upyun-Meta-Ttl", "1");
#endif
await request.SignatureAsync(upyunOperator);
var response = await httpClient.SendAsync(request);
var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
if (!response.IsSuccessStatusCode || multiId == null)
{
logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
throw new BusinessException("upload file initial multi disorder fail");
}
return multiId;
},
TimeSpan.FromSeconds(2)
);
}
private async Task SplitChunkAsync(
RecyclableMemoryStream stream,
string pathToFile,
string taskId
)
{
var uploadTasks = new List<Task<bool>>();
var buffer = ArrayPool<byte>.Shared.Rent(_chunkSize);
var index = 0;
try
{
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, _chunkSize))) > 0)
{
var memory = new Memory<byte>(buffer, 0, bytesRead);
var currentIndex = index;
var chunkUploadTask = ApplicationTools.RetryAsync(
async () => await UploadChunkAsync(pathToFile, taskId, currentIndex, memory),
TimeSpan.FromSeconds(2)
);
uploadTasks.Add(chunkUploadTask);
index++;
}
await Task.WhenAll(uploadTasks);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to upload chunks, path to file: {PathToFile}",pathToFile);
// 清理已上传的任务
foreach (var task in uploadTasks.Where(task => !task.IsCompletedSuccessfully))
{
try
{
await task;
}
catch
{
// Ignore exceptions for cleanup tasks
}
}
throw;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private async Task<bool> UploadChunkAsync(
string pathToFile,
string taskId,
int currentIndex,
Memory<byte> chunk
)
{
return await ApplicationTools.RetryAsync(
async () =>
{
var request = GetRequest(pathToFile);
request.Headers.Add("X-Upyun-Multi-Stage", "upload");
request.Headers.Add("X-Upyun-Multi-Uuid", taskId);
request.Headers.Add("X-Upyun-Part-Id", $"{currentIndex}");
request.Content = new ReadOnlyMemoryContent(chunk);
await request.SignatureAsync(upyunOperator);
var response = await httpClient.SendAsync(request);
var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
if (!response.IsSuccessStatusCode || multiId == null)
{
logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
throw new BusinessException("upload file initial multi disorder fail");
}
return true;
},
TimeSpan.FromSeconds(2)
);
}
private async Task CompleteUploadAsync(string pathToFile, string taskId)
{
await ApplicationTools.RetryAsync(
async () =>
{
var completeRequest = GetRequest(pathToFile);
completeRequest.Headers.Add("X-Upyun-Multi-Stage", "complete");
completeRequest.Headers.Add("X-Upyun-Multi-Uuid", taskId);
await completeRequest.SignatureAsync(upyunOperator);
var completeResponse = await httpClient.SendAsync(completeRequest);
var multiId = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
if (!completeResponse.IsSuccessStatusCode || multiId == null)
{
logger.LogError(
"upload fail,status code:{StatusCode}",
completeResponse.StatusCode
);
throw new BusinessException("upload file initial multi disorder fail");
}
var mime = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Type");
var length = completeResponse.Headers.GetResponseHeaderValue(
"X-Upyun-Multi-Length"
);
logger.LogInformation(
"upyun multi upload complete,multi id:{MultiId},MIME:{MIME},length:{Length}",
multiId,
mime,
length
);
return true;
},
TimeSpan.FromSeconds(2)
);
}
private async Task<bool> CheckMd5Async(string pathToFile, string md5Value)
{
var request = new HttpRequestMessage(
HttpMethod.Head,
$"/{upyunOperator.Bucket}/{pathToFile}"
)
{
Version = new Version(2, 0)
};
await request.SignatureAsync(upyunOperator);
var response = await httpClient.SendAsync(request);
var cloudMd5 = response.Headers.GetResponseHeaderValue("X-Upyun-Meta-Multi-MD5");
return cloudMd5 == md5Value;
}
}