using System.Buffers;
using System.Security.Cryptography;
using Dpz.Core.Entity.Base.PublicStruct;
using Dpz.Core.Infrastructure;
using Dpz.Core.Shard.Service;
using Microsoft.Extensions.Logging;
using Microsoft.IO;
namespace Dpz.Core.Service.ObjectStorage;
public delegate Task<UploadResult> UploadAsync(
ICollection<string> pathToFile,
Func<HttpContent> setContent,
string? contentMd5 = null,
CancellationToken cancellationToken = default
);
/// <summary>
/// 分片断点续传上传
/// </summary>
/// <param name="logger"></param>
/// <param name="httpClient"></param>
/// <param name="upyunOperator"></param>
public class ParallelChunkBreakpointUpload<TUpyunOperator>(
ILogger<ParallelChunkBreakpointUpload<TUpyunOperator>> logger,
HttpClient httpClient,
TUpyunOperator upyunOperator
)
where TUpyunOperator : UpyunOperator
{
private readonly int _chunkSize = 1 << 20;
/// <summary>
///
/// </summary>
/// <param name="file"></param>
/// <param name="smallFileUpload"></param>
/// <param name="checkMd5"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public async Task<FileAddress?> UploadFileAsync(
CloudFile file,
UploadAsync smallFileUpload,
bool checkMd5 = false,
CancellationToken cancellationToken = default
)
{
await using var stream = ApplicationTools.MemoryStreamManager.GetStream();
await file.Stream.CopyToAsync(stream, cancellationToken);
stream.Position = 0;
using var md5 = MD5.Create();
var hashBytes = await md5.ComputeHashAsync(stream, cancellationToken);
var md5Value = BitConverter.ToString(hashBytes).Replace("-", "").ToLowerInvariant();
logger.LogInformation("file:{@FileToPath},MD5 value:{MD5Value}", file.PathToFile, md5Value);
stream.Position = 0;
if (stream.Length <= _chunkSize)
{
var result = await smallFileUpload(
file.PathToFile,
() => new StreamContent(stream),
cancellationToken: cancellationToken
);
return new FileAddress(result.AccessUrl, md5Value);
}
var pathToFile = string.Join("/", file.PathToFile.Select(Uri.EscapeDataString));
var taskId = await BeginUploadAsync(pathToFile, stream.Length);
await SplitChunkAsync(stream, pathToFile, taskId);
await CompleteUploadAsync(pathToFile, taskId);
if (checkMd5)
{
var match = await CheckMd5Async(pathToFile, md5Value);
if (!match)
{
logger.LogError(
"MD5 value are inconsistent,local calculate MD5 value:{MD5Value},path to file:{@PathToFile}",
md5Value,
pathToFile
);
throw new Exception("md5 value are inconsistent");
}
}
var url =
(
upyunOperator.Host?.LastOrDefault() == '/'
? upyunOperator.Host
: upyunOperator.Host + "/"
) + pathToFile;
return new FileAddress(url, md5Value);
}
private HttpRequestMessage GetRequest(string pathToFile)
{
var request = new HttpRequestMessage(
HttpMethod.Put,
$"/{upyunOperator.Bucket}/{pathToFile}"
)
{
Version = new Version(2, 0),
};
return request;
}
private async Task<string> BeginUploadAsync(string pathToFile, long fileSize)
{
return await ApplicationTools.RetryAsync(
async () =>
{
var request = GetRequest(pathToFile);
request.Headers.Add("X-Upyun-Multi-Disorder", "true");
request.Headers.Add("X-Upyun-Multi-Stage", "initiate");
request.Headers.Add("X-Upyun-Multi-Length", $"{fileSize}");
request.Headers.Add("X-Upyun-Multi-Part-Size", $"{_chunkSize}");
#if DEBUG
request.Headers.Add("X-Upyun-Meta-Ttl", "1");
#endif
await request.SignatureAsync(upyunOperator);
var response = await httpClient.SendAsync(request);
var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
if (!response.IsSuccessStatusCode || multiId == null)
{
logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
throw new BusinessException("upload file initial multi disorder fail");
}
return multiId;
},
TimeSpan.FromSeconds(2)
);
}
private async Task SplitChunkAsync(
RecyclableMemoryStream stream,
string pathToFile,
string taskId
)
{
var uploadTasks = new List<Task<bool>>();
var buffer = ArrayPool<byte>.Shared.Rent(_chunkSize);
var index = 0;
try
{
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, _chunkSize))) > 0)
{
var chunkCopy = new byte[bytesRead];
buffer.AsSpan(0, bytesRead).CopyTo(chunkCopy);
var memory = new Memory<byte>(chunkCopy, 0, bytesRead);
var currentIndex = index;
var chunkUploadTask = ApplicationTools.RetryAsync(
async () => await UploadChunkAsync(pathToFile, taskId, currentIndex, memory),
TimeSpan.FromSeconds(2)
);
uploadTasks.Add(chunkUploadTask);
index++;
}
await Task.WhenAll(uploadTasks);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to upload chunks, path to file: {PathToFile}", pathToFile);
// 清理已上传的任务
foreach (var task in uploadTasks.Where(task => !task.IsCompletedSuccessfully))
{
try
{
await task;
}
catch
{
// Ignore exceptions for cleanup tasks
}
}
throw;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private async Task<bool> UploadChunkAsync(
string pathToFile,
string taskId,
int currentIndex,
Memory<byte> chunk
)
{
return await ApplicationTools.RetryAsync(
async () =>
{
var request = GetRequest(pathToFile);
request.Headers.Add("X-Upyun-Multi-Stage", "upload");
request.Headers.Add("X-Upyun-Multi-Uuid", taskId);
request.Headers.Add("X-Upyun-Part-Id", $"{currentIndex}");
request.Content = new ReadOnlyMemoryContent(chunk);
await request.SignatureAsync(upyunOperator);
var response = await httpClient.SendAsync(request);
var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
if (!response.IsSuccessStatusCode || multiId == null)
{
logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
throw new BusinessException("upload file initial multi disorder fail");
}
return true;
},
TimeSpan.FromSeconds(2)
);
}
private async Task CompleteUploadAsync(string pathToFile, string taskId)
{
await ApplicationTools.RetryAsync(
async () =>
{
var completeRequest = GetRequest(pathToFile);
completeRequest.Headers.Add("X-Upyun-Multi-Stage", "complete");
completeRequest.Headers.Add("X-Upyun-Multi-Uuid", taskId);
await completeRequest.SignatureAsync(upyunOperator);
var completeResponse = await httpClient.SendAsync(completeRequest);
var multiId = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
if (!completeResponse.IsSuccessStatusCode || multiId == null)
{
logger.LogError(
"upload fail,status code:{StatusCode}",
completeResponse.StatusCode
);
throw new BusinessException("upload file initial multi disorder fail");
}
var mime = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Type");
var length = completeResponse.Headers.GetResponseHeaderValue(
"X-Upyun-Multi-Length"
);
logger.LogInformation(
"upyun multi upload complete,multi id:{MultiId},MIME:{MIME},length:{Length}",
multiId,
mime,
length
);
return true;
},
TimeSpan.FromSeconds(2)
);
}
private async Task<bool> CheckMd5Async(string pathToFile, string md5Value)
{
var request = new HttpRequestMessage(
HttpMethod.Head,
$"/{upyunOperator.Bucket}/{pathToFile}"
)
{
Version = new Version(2, 0),
};
await request.SignatureAsync(upyunOperator);
var response = await httpClient.SendAsync(request);
var cloudMd5 = response.Headers.GetResponseHeaderValue("X-Upyun-Meta-Multi-MD5");
return cloudMd5 == md5Value;
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个用于实现分片断点续传上传功能的C#类,主要用于向又拍云(Upyun)对象存储服务上传大文件。让我详细解释一下这个代码的结构和功能:
类概述
ParallelChunkBreakpointUpload<TUpyunOperator> 是一个泛型类,用于处理大文件的分片上传,支持断点续传功能。
主要组件
1. 构造函数和依赖注入
public class ParallelChunkBreakpointUpload<TUpyunOperator>(
ILogger<ParallelChunkBreakpointUpload<TUpyunOperator>> logger,
HttpClient httpClient,
TUpyunOperator upyunOperator
)
- 使用了C# 12的主构造函数语法
- 注入了日志记录器、HTTP客户端和又拍云操作器
2. 核心上传方法 UploadFileAsync
这是主要的上传方法,包含以下步骤:
文件预处理:
- 将文件流复制到内存流中
- 计算文件的MD5哈希值用于完整性验证
小文件处理:
- 如果文件大小小于等于分片大小(1MB),直接使用小文件上传方式
大文件分片上传流程:
- 初始化分片上传任务
- 将文件分割成多个分片并并行上传
- 完成上传并合并分片
- 可选的MD5验证
3. 分片上传的三个阶段
a) BeginUploadAsync - 初始化上传
request.Headers.Add("X-Upyun-Multi-Stage", "initiate");
- 向服务器发起分片上传请求
- 设置文件大小和分片大小
- 返回上传任务ID
b) SplitChunkAsync - 分片上传
- 使用
ArrayPool<byte>优化内存使用 - 将文件分割成1MB的分片
- 并行上传所有分片
- 每个分片都有重试机制
c) CompleteUploadAsync - 完成上传
- 通知服务器合并所有分片
- 完成整个上传流程
4. 关键特性
内存优化:
- 使用
ArrayPool<byte>减少GC压力 - 使用
RecyclableMemoryStream管理内存流
错误处理和重试:
- 所有网络请求都包装在重试逻辑中
- 使用
ApplicationTools.RetryAsync处理临时网络错误
并行处理:
- 多个分片可以并行上传,提高上传效率
完整性验证:
- 支持MD5校验确保文件完整性
- 可选的服务器端MD5验证
5. 调试支持
#if DEBUG
request.Headers.Add("X-Upyun-Meta-Ttl", "1");
#endif
在调试模式下设置文件TTL为1秒,便于测试。
使用场景
这个类特别适用于:
- 大文件上传(>1MB)
- 网络不稳定环境下的文件上传
- 需要断点续传功能的应用
- 对上传性能有要求的场景
技术亮点
- 现代C#语法:使用了主构造函数、using声明等新特性
- 资源管理:正确使用了
using和ArrayPool进行资源管理 - 异步编程:全面使用async/await模式
- 错误恢复:包含完善的重试和错误处理机制
- 性能优化:通过并行上传和内存池优化性能
这是一个设计良好的企业级文件上传组件,充分考虑了性能、可靠性和可维护性。
评论加载中...