网站首页 网站源码
website
站点相关全部源代码,隐藏了一些关于服务器的信息
using System.Buffers;
using System.Security.Cryptography;
using Dpz.Core.Infrastructure;
using Dpz.Core.Infrastructure.PublicStruct;
using Dpz.Core.Shard.Service;
using Microsoft.Extensions.Logging;
using Microsoft.IO;

namespace Dpz.Core.Service.ObjectStorage;

public delegate Task<UploadResult> UploadAsync(
    ICollection<string> pathToFile,
    Func<HttpContent> setContent,
    string? contentMd5 = null
);

/// <summary>
/// 分片断点续传上传
/// </summary>
/// <param name="logger"></param>
/// <param name="httpClient"></param>
/// <param name="upyunOperator"></param>
public class ParallelChunkBreakpointUpload<TUpyunOperator>(
    ILogger<ParallelChunkBreakpointUpload<TUpyunOperator>> logger,
    HttpClient httpClient,
    TUpyunOperator upyunOperator
)
    where TUpyunOperator : UpyunOperator
{
    private readonly int _chunkSize = 1 << 20;

    /// <summary>
    ///
    /// </summary>
    /// <param name="file"></param>
    /// <param name="smallFileUpload"></param>
    /// <param name="checkMd5"></param>
    /// <returns></returns>
    /// <exception cref="Exception"></exception>
    public async Task<FileAddress?> UploadFileAsync(
        CloudFile file,
        UploadAsync smallFileUpload,
        bool checkMd5 = false
    )
    {
        await using var stream = ApplicationTools.MemoryStreamManager.GetStream();
        await file.Stream.CopyToAsync(stream);
        stream.Position = 0;

        using var md5 = MD5.Create();
        var hashBytes = await md5.ComputeHashAsync(stream);
        var md5Value = BitConverter.ToString(hashBytes).Replace("-", "").ToLowerInvariant();
        logger.LogInformation("file:{@FileToPath},MD5 value:{MD5Value}", file.PathToFile, md5Value);

        stream.Position = 0;

        if (stream.Length <= _chunkSize)
        {
            var result = await smallFileUpload(file.PathToFile, () => new StreamContent(stream));
            return new FileAddress(result.AccessUrl, md5Value);
        }

        var pathToFile = string.Join("/", file.PathToFile.Select(Uri.EscapeDataString));

        var taskId = await BeginUploadAsync(pathToFile, stream.Length);

        await SplitChunkAsync(stream, pathToFile, taskId);

        await CompleteUploadAsync(pathToFile, taskId);

        if (checkMd5)
        {
            var match = await CheckMd5Async(pathToFile, md5Value);
            if (!match)
            {
                logger.LogError(
                    "MD5 value are inconsistent,local calculate MD5 value:{MD5Value},path to file:{@PathToFile}",
                    md5Value,
                    pathToFile
                );
                throw new Exception("md5 value are inconsistent");
            }
        }

        var url =
            (
                upyunOperator.Host?.LastOrDefault() == '/'
                    ? upyunOperator.Host
                    : upyunOperator.Host + "/"
            ) + pathToFile;
        return new FileAddress(url, md5Value);
    }

    private HttpRequestMessage GetRequest(string pathToFile)
    {
        var request = new HttpRequestMessage(
            HttpMethod.Put,
            $"/{upyunOperator.Bucket}/{pathToFile}"
        )
        {
            Version = new Version(2, 0)
        };
        return request;
    }

    private async Task<string> BeginUploadAsync(string pathToFile, long fileSize)
    {
        return await ApplicationTools.RetryAsync(
            async () =>
            {
                var request = GetRequest(pathToFile);
                request.Headers.Add("X-Upyun-Multi-Disorder", "true");
                request.Headers.Add("X-Upyun-Multi-Stage", "initiate");
                request.Headers.Add("X-Upyun-Multi-Length", $"{fileSize}");
                request.Headers.Add("X-Upyun-Multi-Part-Size", $"{_chunkSize}");
#if DEBUG
                request.Headers.Add("X-Upyun-Meta-Ttl", "1");
#endif
                await request.SignatureAsync(upyunOperator);
                var response = await httpClient.SendAsync(request);

                var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!response.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                return multiId;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task SplitChunkAsync(
        RecyclableMemoryStream stream,
        string pathToFile,
        string taskId
    )
    {
        var uploadTasks = new List<Task<bool>>();
        var buffer = ArrayPool<byte>.Shared.Rent(_chunkSize);
        var index = 0;
        try
        {
            int bytesRead;
            while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, _chunkSize))) > 0)
            {
                var memory = new Memory<byte>(buffer, 0, bytesRead);
                var currentIndex = index;
                var chunkUploadTask = ApplicationTools.RetryAsync(
                    async () => await UploadChunkAsync(pathToFile, taskId, currentIndex, memory),
                    TimeSpan.FromSeconds(2)
                );
                uploadTasks.Add(chunkUploadTask);
                index++;
            }

            await Task.WhenAll(uploadTasks);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Failed to upload chunks, path to file: {PathToFile}",pathToFile);
            // 清理已上传的任务
            foreach (var task in uploadTasks.Where(task => !task.IsCompletedSuccessfully))
            {
                try
                {
                    await task;
                }
                catch
                {
                    // Ignore exceptions for cleanup tasks
                }
            }
            throw;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(buffer);
        }
        
    }

    private async Task<bool> UploadChunkAsync(
        string pathToFile,
        string taskId,
        int currentIndex,
        Memory<byte> chunk
    )
    {
        return await ApplicationTools.RetryAsync(
            async () =>
            {
                var request = GetRequest(pathToFile);
                request.Headers.Add("X-Upyun-Multi-Stage", "upload");
                request.Headers.Add("X-Upyun-Multi-Uuid", taskId);
                request.Headers.Add("X-Upyun-Part-Id", $"{currentIndex}");
                request.Content = new ReadOnlyMemoryContent(chunk);
                await request.SignatureAsync(upyunOperator);
                var response = await httpClient.SendAsync(request);

                var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!response.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                return true;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task CompleteUploadAsync(string pathToFile, string taskId)
    {
        await ApplicationTools.RetryAsync(
            async () =>
            {
                var completeRequest = GetRequest(pathToFile);
                completeRequest.Headers.Add("X-Upyun-Multi-Stage", "complete");
                completeRequest.Headers.Add("X-Upyun-Multi-Uuid", taskId);
                await completeRequest.SignatureAsync(upyunOperator);
                var completeResponse = await httpClient.SendAsync(completeRequest);

                var multiId = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!completeResponse.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError(
                        "upload fail,status code:{StatusCode}",
                        completeResponse.StatusCode
                    );
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                var mime = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Type");
                var length = completeResponse.Headers.GetResponseHeaderValue(
                    "X-Upyun-Multi-Length"
                );

                logger.LogInformation(
                    "upyun multi upload complete,multi id:{MultiId},MIME:{MIME},length:{Length}",
                    multiId,
                    mime,
                    length
                );

                return true;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task<bool> CheckMd5Async(string pathToFile, string md5Value)
    {
        var request = new HttpRequestMessage(
            HttpMethod.Head,
            $"/{upyunOperator.Bucket}/{pathToFile}"
        )
        {
            Version = new Version(2, 0)
        };
        await request.SignatureAsync(upyunOperator);
        var response = await httpClient.SendAsync(request);

        var cloudMd5 = response.Headers.GetResponseHeaderValue("X-Upyun-Meta-Multi-MD5");

        return cloudMd5 == md5Value;
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

上述代码实现了一个用于分片断点续传上传文件的类 ParallelChunkBreakpointUpload<TUpyunOperator>,该类主要用于将大文件分割成多个小块(chunk)并并行上传到一个对象存储服务(如 Upyun)。以下是代码的主要功能和结构的详细解释:

主要功能

  1. 分片上传

    • 当文件大小超过设定的 _chunkSize(1MB)时,文件会被分割成多个小块进行上传。
    • 每个小块会被单独上传,并且上传过程是并行的,以提高上传效率。
  2. MD5 校验

    • 在上传文件之前,计算文件的 MD5 值,并在上传完成后进行校验,以确保文件在传输过程中没有损坏。
  3. 断点续传

    • 通过使用多阶段上传的机制,支持在上传过程中出现错误时的重试和恢复。
  4. 日志记录

    • 使用 ILogger 记录上传过程中的信息和错误,便于后续的调试和监控。

主要方法

  1. UploadFileAsync

    • 这是主要的上传方法,接受一个 CloudFile 对象和一个上传小文件的委托 smallFileUpload
    • 计算文件的 MD5 值,判断文件大小并决定是直接上传还是分片上传。
    • 如果文件需要分片上传,调用 BeginUploadAsyncSplitChunkAsyncCompleteUploadAsync 方法进行分片上传。
  2. BeginUploadAsync

    • 初始化上传,向对象存储服务发送请求以开始多部分上传,并返回一个任务 ID。
  3. SplitChunkAsync

    • 将文件流分割成多个小块并并行上传。使用 Task.WhenAll 等待所有上传任务完成。
  4. UploadChunkAsync

    • 上传单个小块,设置必要的请求头并发送请求。
  5. CompleteUploadAsync

    • 完成上传,向对象存储服务发送请求以完成多部分上传。
  6. CheckMd5Async

    • 上传完成后,发送请求以检查云端文件的 MD5 值是否与本地计算的值一致。

其他细节

  • 异常处理

    • 在上传过程中,如果发生异常,会记录错误并尝试清理已上传的部分。
  • 使用 ArrayPool

    • 通过 ArrayPool<byte> 来管理内存,避免频繁的内存分配和释放,提高性能。
  • HttpClient

    • 使用 HttpClient 发送 HTTP 请求与对象存储服务进行交互。

总结

这个类提供了一种高效、可靠的方式来上传大文件到对象存储服务,支持分片上传、断点续传和 MD5 校验,适合需要处理大文件上传的应用场景。

loading