using System.Buffers;
using System.Security.Cryptography;
using Dpz.Core.Entity.Base.PublicStruct;
using Dpz.Core.Infrastructure;
using Dpz.Core.Shard.Service;
using Microsoft.Extensions.Logging;
using Microsoft.IO;

namespace Dpz.Core.Service.ObjectStorage;

public delegate Task<UploadResult> UploadAsync(
    ICollection<string> pathToFile,
    Func<HttpContent> setContent,
    string? contentMd5 = null,
    CancellationToken cancellationToken = default
);

/// <summary>
/// 分片断点续传上传
/// </summary>
/// <param name="logger"></param>
/// <param name="httpClient"></param>
/// <param name="upyunOperator"></param>
public class ParallelChunkBreakpointUpload<TUpyunOperator>(
    ILogger<ParallelChunkBreakpointUpload<TUpyunOperator>> logger,
    HttpClient httpClient,
    TUpyunOperator upyunOperator
)
    where TUpyunOperator : UpyunOperator
{
    private readonly int _chunkSize = 1 << 20;

    /// <summary>
    ///
    /// </summary>
    /// <param name="file"></param>
    /// <param name="smallFileUpload"></param>
    /// <param name="checkMd5"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    /// <exception cref="Exception"></exception>
    public async Task<FileAddress?> UploadFileAsync(
        CloudFile file,
        UploadAsync smallFileUpload,
        bool checkMd5 = false,
        CancellationToken cancellationToken = default
    )
    {
        await using var stream = ApplicationTools.MemoryStreamManager.GetStream();
        await file.Stream.CopyToAsync(stream, cancellationToken);
        stream.Position = 0;

        using var md5 = MD5.Create();
        var hashBytes = await md5.ComputeHashAsync(stream, cancellationToken);
        var md5Value = BitConverter.ToString(hashBytes).Replace("-", "").ToLowerInvariant();
        logger.LogInformation("file:{@FileToPath},MD5 value:{MD5Value}", file.PathToFile, md5Value);

        stream.Position = 0;

        if (stream.Length <= _chunkSize)
        {
            var result = await smallFileUpload(
                file.PathToFile,
                () => new StreamContent(stream),
                cancellationToken: cancellationToken
            );
            return new FileAddress(result.AccessUrl, md5Value);
        }

        var pathToFile = string.Join("/", file.PathToFile.Select(Uri.EscapeDataString));

        var taskId = await BeginUploadAsync(pathToFile, stream.Length);

        await SplitChunkAsync(stream, pathToFile, taskId);

        await CompleteUploadAsync(pathToFile, taskId);

        if (checkMd5)
        {
            var match = await CheckMd5Async(pathToFile, md5Value);
            if (!match)
            {
                logger.LogError(
                    "MD5 value are inconsistent,local calculate MD5 value:{MD5Value},path to file:{@PathToFile}",
                    md5Value,
                    pathToFile
                );
                throw new Exception("md5 value are inconsistent");
            }
        }

        var url =
            (
                upyunOperator.Host?.LastOrDefault() == '/'
                    ? upyunOperator.Host
                    : upyunOperator.Host + "/"
            ) + pathToFile;
        return new FileAddress(url, md5Value);
    }

    private HttpRequestMessage GetRequest(string pathToFile)
    {
        var request = new HttpRequestMessage(
            HttpMethod.Put,
            $"/{upyunOperator.Bucket}/{pathToFile}"
        )
        {
            Version = new Version(2, 0),
        };
        return request;
    }

    private async Task<string> BeginUploadAsync(string pathToFile, long fileSize)
    {
        return await ApplicationTools.RetryAsync(
            async () =>
            {
                var request = GetRequest(pathToFile);
                request.Headers.Add("X-Upyun-Multi-Disorder", "true");
                request.Headers.Add("X-Upyun-Multi-Stage", "initiate");
                request.Headers.Add("X-Upyun-Multi-Length", $"{fileSize}");
                request.Headers.Add("X-Upyun-Multi-Part-Size", $"{_chunkSize}");
#if DEBUG
                request.Headers.Add("X-Upyun-Meta-Ttl", "1");
#endif
                await request.SignatureAsync(upyunOperator);
                var response = await httpClient.SendAsync(request);

                var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!response.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                return multiId;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task SplitChunkAsync(
        RecyclableMemoryStream stream,
        string pathToFile,
        string taskId
    )
    {
        var uploadTasks = new List<Task<bool>>();
        var buffer = ArrayPool<byte>.Shared.Rent(_chunkSize);
        var index = 0;
        try
        {
            int bytesRead;
            while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, _chunkSize))) > 0)
            {
                var chunkCopy = new byte[bytesRead];
                buffer.AsSpan(0, bytesRead).CopyTo(chunkCopy);
                var memory = new Memory<byte>(chunkCopy, 0, bytesRead);
                var currentIndex = index;
                var chunkUploadTask = ApplicationTools.RetryAsync(
                    async () => await UploadChunkAsync(pathToFile, taskId, currentIndex, memory),
                    TimeSpan.FromSeconds(2)
                );
                uploadTasks.Add(chunkUploadTask);
                index++;
            }

            await Task.WhenAll(uploadTasks);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Failed to upload chunks, path to file: {PathToFile}", pathToFile);
            // 清理已上传的任务
            foreach (var task in uploadTasks.Where(task => !task.IsCompletedSuccessfully))
            {
                try
                {
                    await task;
                }
                catch
                {
                    // Ignore exceptions for cleanup tasks
                }
            }
            throw;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(buffer);
        }
    }

    private async Task<bool> UploadChunkAsync(
        string pathToFile,
        string taskId,
        int currentIndex,
        Memory<byte> chunk
    )
    {
        return await ApplicationTools.RetryAsync(
            async () =>
            {
                var request = GetRequest(pathToFile);
                request.Headers.Add("X-Upyun-Multi-Stage", "upload");
                request.Headers.Add("X-Upyun-Multi-Uuid", taskId);
                request.Headers.Add("X-Upyun-Part-Id", $"{currentIndex}");
                request.Content = new ReadOnlyMemoryContent(chunk);
                await request.SignatureAsync(upyunOperator);
                var response = await httpClient.SendAsync(request);

                var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!response.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                return true;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task CompleteUploadAsync(string pathToFile, string taskId)
    {
        await ApplicationTools.RetryAsync(
            async () =>
            {
                var completeRequest = GetRequest(pathToFile);
                completeRequest.Headers.Add("X-Upyun-Multi-Stage", "complete");
                completeRequest.Headers.Add("X-Upyun-Multi-Uuid", taskId);
                await completeRequest.SignatureAsync(upyunOperator);
                var completeResponse = await httpClient.SendAsync(completeRequest);

                var multiId = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!completeResponse.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError(
                        "upload fail,status code:{StatusCode}",
                        completeResponse.StatusCode
                    );
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                var mime = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Type");
                var length = completeResponse.Headers.GetResponseHeaderValue(
                    "X-Upyun-Multi-Length"
                );

                logger.LogInformation(
                    "upyun multi upload complete,multi id:{MultiId},MIME:{MIME},length:{Length}",
                    multiId,
                    mime,
                    length
                );

                return true;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task<bool> CheckMd5Async(string pathToFile, string md5Value)
    {
        var request = new HttpRequestMessage(
            HttpMethod.Head,
            $"/{upyunOperator.Bucket}/{pathToFile}"
        )
        {
            Version = new Version(2, 0),
        };
        await request.SignatureAsync(upyunOperator);
        var response = await httpClient.SendAsync(request);

        var cloudMd5 = response.Headers.GetResponseHeaderValue("X-Upyun-Meta-Multi-MD5");

        return cloudMd5 == md5Value;
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个用于实现分片断点续传上传功能的C#类,主要用于向又拍云(Upyun)对象存储服务上传大文件。让我详细解释一下这个代码的结构和功能:

类概述

ParallelChunkBreakpointUpload<TUpyunOperator> 是一个泛型类,用于处理大文件的分片上传,支持断点续传功能。

主要组件

1. 构造函数和依赖注入

public class ParallelChunkBreakpointUpload<TUpyunOperator>(
    ILogger<ParallelChunkBreakpointUpload<TUpyunOperator>> logger,
    HttpClient httpClient,
    TUpyunOperator upyunOperator
)
  • 使用了C# 12的主构造函数语法
  • 注入了日志记录器、HTTP客户端和又拍云操作器

2. 核心上传方法 UploadFileAsync

这是主要的上传方法,包含以下步骤:

  1. 文件预处理

    • 将文件流复制到内存流中
    • 计算文件的MD5哈希值用于完整性验证
  2. 小文件处理

    • 如果文件大小小于等于分片大小(1MB),直接使用小文件上传方式
  3. 大文件分片上传流程

    • 初始化分片上传任务
    • 将文件分割成多个分片并并行上传
    • 完成上传并合并分片
    • 可选的MD5验证

3. 分片上传的三个阶段

a) BeginUploadAsync - 初始化上传

request.Headers.Add("X-Upyun-Multi-Stage", "initiate");
  • 向服务器发起分片上传请求
  • 设置文件大小和分片大小
  • 返回上传任务ID

b) SplitChunkAsync - 分片上传

  • 使用 ArrayPool<byte> 优化内存使用
  • 将文件分割成1MB的分片
  • 并行上传所有分片
  • 每个分片都有重试机制

c) CompleteUploadAsync - 完成上传

  • 通知服务器合并所有分片
  • 完成整个上传流程

4. 关键特性

内存优化

  • 使用 ArrayPool<byte> 减少GC压力
  • 使用 RecyclableMemoryStream 管理内存流

错误处理和重试

  • 所有网络请求都包装在重试逻辑中
  • 使用 ApplicationTools.RetryAsync 处理临时网络错误

并行处理

  • 多个分片可以并行上传,提高上传效率

完整性验证

  • 支持MD5校验确保文件完整性
  • 可选的服务器端MD5验证

5. 调试支持

#if DEBUG
request.Headers.Add("X-Upyun-Meta-Ttl", "1");
#endif

在调试模式下设置文件TTL为1秒,便于测试。

使用场景

这个类特别适用于:

  • 大文件上传(>1MB)
  • 网络不稳定环境下的文件上传
  • 需要断点续传功能的应用
  • 对上传性能有要求的场景

技术亮点

  1. 现代C#语法:使用了主构造函数、using声明等新特性
  2. 资源管理:正确使用了usingArrayPool进行资源管理
  3. 异步编程:全面使用async/await模式
  4. 错误恢复:包含完善的重试和错误处理机制
  5. 性能优化:通过并行上传和内存池优化性能

这是一个设计良好的企业级文件上传组件,充分考虑了性能、可靠性和可维护性。

评论加载中...