网站首页 网站源码
website
站点相关全部源代码,隐藏了一些关于服务器的信息
using System.Buffers;
using System.Security.Cryptography;
using Dpz.Core.Infrastructure;
using Dpz.Core.Infrastructure.PublicStruct;
using Dpz.Core.Shard.Service;
using Microsoft.Extensions.Logging;
using Microsoft.IO;

namespace Dpz.Core.Service.ObjectStorage;

public delegate Task<UploadResult> UploadAsync(
    ICollection<string> pathToFile,
    Func<HttpContent> setContent,
    string? contentMd5 = null
);

/// <summary>
/// 分片断点续传上传
/// </summary>
/// <param name="logger"></param>
/// <param name="httpClient"></param>
/// <param name="upyunOperator"></param>
public class ParallelChunkBreakpointUpload<TUpyunOperator>(
    ILogger<ParallelChunkBreakpointUpload<TUpyunOperator>> logger,
    HttpClient httpClient,
    TUpyunOperator upyunOperator
)
    where TUpyunOperator : UpyunOperator
{
    private readonly int _chunkSize = 1 << 20;

    /// <summary>
    ///
    /// </summary>
    /// <param name="file"></param>
    /// <param name="smallFileUpload"></param>
    /// <param name="checkMd5"></param>
    /// <returns></returns>
    /// <exception cref="Exception"></exception>
    public async Task<FileAddress?> UploadFileAsync(
        CloudFile file,
        UploadAsync smallFileUpload,
        bool checkMd5 = false
    )
    {
        await using var stream = ApplicationTools.MemoryStreamManager.GetStream();
        await file.Stream.CopyToAsync(stream);
        stream.Position = 0;

        using var md5 = MD5.Create();
        var hashBytes = await md5.ComputeHashAsync(stream);
        var md5Value = BitConverter.ToString(hashBytes).Replace("-", "").ToLowerInvariant();
        logger.LogInformation("file:{@FileToPath},MD5 value:{MD5Value}", file.PathToFile, md5Value);

        stream.Position = 0;

        if (stream.Length <= _chunkSize)
        {
            var result = await smallFileUpload(file.PathToFile, () => new StreamContent(stream));
            return new FileAddress(result.AccessUrl, md5Value);
        }

        var pathToFile = string.Join("/", file.PathToFile.Select(Uri.EscapeDataString));

        var taskId = await BeginUploadAsync(pathToFile, stream.Length);

        await SplitChunkAsync(stream, pathToFile, taskId);

        await CompleteUploadAsync(pathToFile, taskId);

        if (checkMd5)
        {
            var match = await CheckMd5Async(pathToFile, md5Value);
            if (!match)
            {
                logger.LogError(
                    "MD5 value are inconsistent,local calculate MD5 value:{MD5Value},path to file:{@PathToFile}",
                    md5Value,
                    pathToFile
                );
                throw new Exception("md5 value are inconsistent");
            }
        }

        var url =
            (
                upyunOperator.Host?.LastOrDefault() == '/'
                    ? upyunOperator.Host
                    : upyunOperator.Host + "/"
            ) + pathToFile;
        return new FileAddress(url, md5Value);
    }

    private HttpRequestMessage GetRequest(string pathToFile)
    {
        var request = new HttpRequestMessage(
            HttpMethod.Put,
            $"/{upyunOperator.Bucket}/{pathToFile}"
        )
        {
            Version = new Version(2, 0)
        };
        return request;
    }

    private async Task<string> BeginUploadAsync(string pathToFile, long fileSize)
    {
        return await ApplicationTools.RetryAsync(
            async () =>
            {
                var request = GetRequest(pathToFile);
                request.Headers.Add("X-Upyun-Multi-Disorder", "true");
                request.Headers.Add("X-Upyun-Multi-Stage", "initiate");
                request.Headers.Add("X-Upyun-Multi-Length", $"{fileSize}");
                request.Headers.Add("X-Upyun-Multi-Part-Size", $"{_chunkSize}");
#if DEBUG
                request.Headers.Add("X-Upyun-Meta-Ttl", "1");
#endif
                await request.SignatureAsync(upyunOperator);
                var response = await httpClient.SendAsync(request);

                var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!response.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                return multiId;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task SplitChunkAsync(
        RecyclableMemoryStream stream,
        string pathToFile,
        string taskId
    )
    {
        var uploadTasks = new List<Task<bool>>();
        var buffer = ArrayPool<byte>.Shared.Rent(_chunkSize);
        var index = 0;
        try
        {
            int bytesRead;
            while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, _chunkSize))) > 0)
            {
                var memory = new Memory<byte>(buffer, 0, bytesRead);
                var currentIndex = index;
                var chunkUploadTask = ApplicationTools.RetryAsync(
                    async () => await UploadChunkAsync(pathToFile, taskId, currentIndex, memory),
                    TimeSpan.FromSeconds(2)
                );
                uploadTasks.Add(chunkUploadTask);
                index++;
            }

            await Task.WhenAll(uploadTasks);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Failed to upload chunks, path to file: {PathToFile}",pathToFile);
            // 清理已上传的任务
            foreach (var task in uploadTasks.Where(task => !task.IsCompletedSuccessfully))
            {
                try
                {
                    await task;
                }
                catch
                {
                    // Ignore exceptions for cleanup tasks
                }
            }
            throw;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(buffer);
        }
        
    }

    private async Task<bool> UploadChunkAsync(
        string pathToFile,
        string taskId,
        int currentIndex,
        Memory<byte> chunk
    )
    {
        return await ApplicationTools.RetryAsync(
            async () =>
            {
                var request = GetRequest(pathToFile);
                request.Headers.Add("X-Upyun-Multi-Stage", "upload");
                request.Headers.Add("X-Upyun-Multi-Uuid", taskId);
                request.Headers.Add("X-Upyun-Part-Id", $"{currentIndex}");
                request.Content = new ReadOnlyMemoryContent(chunk);
                await request.SignatureAsync(upyunOperator);
                var response = await httpClient.SendAsync(request);

                var multiId = response.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!response.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError("upload fail,status code:{StatusCode}", response.StatusCode);
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                return true;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task CompleteUploadAsync(string pathToFile, string taskId)
    {
        await ApplicationTools.RetryAsync(
            async () =>
            {
                var completeRequest = GetRequest(pathToFile);
                completeRequest.Headers.Add("X-Upyun-Multi-Stage", "complete");
                completeRequest.Headers.Add("X-Upyun-Multi-Uuid", taskId);
                await completeRequest.SignatureAsync(upyunOperator);
                var completeResponse = await httpClient.SendAsync(completeRequest);

                var multiId = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Uuid");
                if (!completeResponse.IsSuccessStatusCode || multiId == null)
                {
                    logger.LogError(
                        "upload fail,status code:{StatusCode}",
                        completeResponse.StatusCode
                    );
                    throw new BusinessException("upload file initial multi disorder fail");
                }

                var mime = completeResponse.Headers.GetResponseHeaderValue("X-Upyun-Multi-Type");
                var length = completeResponse.Headers.GetResponseHeaderValue(
                    "X-Upyun-Multi-Length"
                );

                logger.LogInformation(
                    "upyun multi upload complete,multi id:{MultiId},MIME:{MIME},length:{Length}",
                    multiId,
                    mime,
                    length
                );

                return true;
            },
            TimeSpan.FromSeconds(2)
        );
    }

    private async Task<bool> CheckMd5Async(string pathToFile, string md5Value)
    {
        var request = new HttpRequestMessage(
            HttpMethod.Head,
            $"/{upyunOperator.Bucket}/{pathToFile}"
        )
        {
            Version = new Version(2, 0)
        };
        await request.SignatureAsync(upyunOperator);
        var response = await httpClient.SendAsync(request);

        var cloudMd5 = response.Headers.GetResponseHeaderValue("X-Upyun-Meta-Multi-MD5");

        return cloudMd5 == md5Value;
    }
}
loading