using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO.Compression;
using System.Threading.Channels;
using Dpz.Core.Service.ObjectStorage.Services;
using Dpz.Core.Shard.Service;
using Hangfire;
using Hangfire.Annotations;

namespace Dpz.Core.Web.Jobs.Hangfire;

[UsedImplicitly]
public class BackupProgramActivator(
    IConfiguration configuration,
    ILogger<BackupProgramActivator> logger,
    ISafeFileService safeFileService
) : JobActivator
{
    private readonly CancellationTokenSource _cts = new();
    private int _fileCount;
    private int _folderCount;
    private int _errorCount;
    private long _totalSize;
    private int _processedCount;
    private DateTime _lastLogTime;
    private long _inMemorySize;
    private long _peakInMemorySize;

    private readonly BackupJobSettings _settings =
        configuration.GetSection("BackupSettings").Get<BackupJobSettings>() ?? new BackupJobSettings("", [], [], []);

    public async Task StartAsync()
    {
        if (
            string.IsNullOrWhiteSpace(_settings.BackupProgramPath)
            || !Directory.Exists(_settings.BackupProgramPath)
        )
        {
            logger.LogWarning("未配置程序备份路径或路径无效");
            return;
        }

        var zipFilePath = await PackAsync(_settings.BackupProgramPath);

        if (string.IsNullOrWhiteSpace(zipFilePath))
        {
            logger.LogWarning("备份未成功创建");
            return;
        }

        // 上传到 FTP
        var uploadSuccess = await UploadAsync(zipFilePath);

        if (uploadSuccess)
        {
            // 仅在上传成功后删除本地文件
            logger.LogInformation("FTP 上传成功,删除本地备份文件: {Path}", zipFilePath);
            File.Delete(zipFilePath);
        }
        else
        {
            logger.LogWarning("FTP 上传失败,保留本地备份文件: {Path}", zipFilePath);
        }
    }

    private async Task<bool> UploadAsync(string filePath)
    {
        var sw = new Stopwatch();
        sw.Start();
        var fileName = Path.GetFileName(filePath);
        var date = DateTime.Now;
        List<string> pathToFile =
        [
            "program_backup",
            date.Year.ToString(),
            date.Month.ToString(),
            $"{fileName}",
        ];
#if DEBUG
        pathToFile.Insert(0, "Test");
#endif
        await using var stream = new FileStream(
            filePath,
            FileMode.Open,
            FileAccess.Read,
            FileShare.Read,
            1 << 12
        );

        var cloudFile = new CloudFile { PathToFile = pathToFile, Stream = stream };

        var address = await safeFileService.UploadFileForFtpAsync(cloudFile, _cts.Token);
        sw.Stop();

        if (address == null)
        {
            logger.LogError("FTP 上传失败: 返回地址为空");
            return false;
        }

        logger.LogInformation(
            "FTP 上传成功,耗时:{Elapsed}: {FileName} -> {Address}",
            sw.Elapsed.ToString("g"),
            fileName,
            address
        );
        return true;
    }

    private async Task<string?> PackAsync(string backupProgramPath)
    {
        var startTime = DateTime.Now;
        ResetStatistics();

        try
        {
            // 生成备份文件名(包含时间戳)
            var timestamp = DateTime.Now.ToString("yyyyMMdd_HHmmss");
            var backupDirectory = Path.Combine(AppContext.BaseDirectory, "program_backup");
            if (!Directory.Exists(backupDirectory))
            {
                Directory.CreateDirectory(backupDirectory);
            }

            var zipFullName = Path.Combine(backupDirectory, $"backup_{timestamp}.zip");

            logger.LogInformation(
                "开始打包备份: {Source} -> {Destination}\n压缩级别: {Level}",
                backupProgramPath,
                zipFullName,
                _settings.CompressionLevel
            );

            // 1. 扫描文件
            var filesToPack = new ConcurrentBag<FileEntry>();
            var sourceDirectory = new DirectoryInfo(backupProgramPath);
            await CollectFilesAsync(sourceDirectory, sourceDirectory.FullName, filesToPack);

            var totalFiles = filesToPack.Count;
            logger.LogInformation(
                "已扫描文件: {TotalFiles} 个文件, {TotalFolders} 个目录 (已忽略部分文件)",
                totalFiles,
                _folderCount
            );

            if (totalFiles == 0)
            {
                logger.LogWarning("没有找到需要备份的文件");
                return null;
            }

            // 2. 准备并行处理管道
            // 限制缓冲区大小,防止内存溢出
            // 这里给500个槽位,极端情况如果连续遇到 500 个接近 20MB 的文件,内存占用大约 500 * 20MB = 10GB
            // 如果是大文件,不读入内存,直接流式处理
            var channel = Channel.CreateBounded<FileJob>(
                new BoundedChannelOptions(500)
                {
                    SingleReader = true,
                    SingleWriter = false,
                    FullMode = BoundedChannelFullMode.Wait,
                }
            );

            // 3. 启动消费者(写入任务)
            var consumeTask = ConsumeFileJobsAsync(
                channel.Reader,
                zipFullName,
                totalFiles,
                _cts.Token
            );

            // 4. 启动生产者(读取任务)
            // 生产者负责并行读取文件并写入 Channel
            await ProduceFileJobsAsync(filesToPack, channel.Writer, _cts.Token);

            // 标记通道完成,告诉消费者没有更多数据了
            channel.Writer.Complete();

            // 等待消费者写入完成
            await consumeTask;

            LogCompletionStatistics(zipFullName, startTime);

            // 验证文件已成功创建
            if (File.Exists(zipFullName))
            {
                return zipFullName;
            }

            logger.LogError("备份文件创建失败: {Path}", zipFullName);
            return null;
        }
        catch (OperationCanceledException)
        {
            logger.LogWarning("备份打包操作已取消");
            return null;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "备份打包失败: {Path}", backupProgramPath);
            return null;
        }
    }

    private async Task ProduceFileJobsAsync(
        IEnumerable<FileEntry> filesToPack,
        ChannelWriter<FileJob> writer,
        CancellationToken cancellationToken
    )
    {
        // 小文件 -> 使用多线程并行读取文件到内存
        await Parallel.ForEachAsync(
            filesToPack,
            new ParallelOptions
            {
                // IO密集型增加并发
                MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
                CancellationToken = cancellationToken,
            },
            async (fileEntry, ct) =>
            {
                try
                {
                    var fileInfo = new FileInfo(fileEntry.FullPath);
                    if (!fileInfo.Exists)
                    {
                        return;
                    }

                    Stream? contentStream = null;
                    var isInMemory = false;

                    // 策略:小于 20MB 的文件读取到内存,加速IO并减少写入时的磁盘寻址
                    // 大于 20MB 的文件直接传递路径,由写入线程流式读取,避免占用过多内存
                    const long memoryThreshold = 20 * 1024 * 1024;

                    if (fileInfo.Length < memoryThreshold)
                    {
                        var ms = new MemoryStream((int)fileInfo.Length);
                        await using var fs = new FileStream(
                            fileEntry.FullPath,
                            FileMode.Open,
                            FileAccess.Read,
                            FileShare.Read,
                            64 * 1024,
                            FileOptions.SequentialScan | FileOptions.Asynchronous
                        );
                        await fs.CopyToAsync(ms, ct);
                        ms.Position = 0;
                        contentStream = ms;
                        isInMemory = true;

                        // 累计内存使用量并检查是否超过警告阈值
                        var currentMemorySize = Interlocked.Add(ref _inMemorySize, fileInfo.Length);

                        // 更新峰值
                        long currentPeak;
                        long newPeak;
                        do
                        {
                            currentPeak = Interlocked.Read(ref _peakInMemorySize);
                            newPeak = Math.Max(currentPeak, currentMemorySize);
                        } while (
                            Interlocked.CompareExchange(ref _peakInMemorySize, newPeak, currentPeak)
                            != currentPeak
                        );

                        const long memoryWarningThreshold = 2L * 1024 * 1024 * 1024;
                        if (
                            currentMemorySize > memoryWarningThreshold
                            && currentMemorySize - fileInfo.Length <= memoryWarningThreshold
                        )
                        {
                            var memoryGb = currentMemorySize / (1024.0 * 1024.0 * 1024.0);
                            logger.LogWarning(
                                "小文件内存使用量已超过 2GB 警告阈值: {MemorySize:F2} GB",
                                memoryGb
                            );
                        }
                    }

                    // 如果队列满了这里会等待,形成背压
                    await writer.WriteAsync(
                        new FileJob(fileEntry, contentStream, isInMemory, fileInfo.Length),
                        ct
                    );
                }
                catch (Exception ex)
                {
                    Interlocked.Increment(ref _errorCount);
                    logger.LogError(ex, "读取文件失败: {Path}", fileEntry.FullPath);
                }
            }
        );
    }

    private async Task ConsumeFileJobsAsync(
        ChannelReader<FileJob> reader,
        string zipFileName,
        int totalFiles,
        CancellationToken cancellationToken
    )
    {
        await using var fileStream = new FileStream(
            zipFileName,
            FileMode.Create,
            FileAccess.Write,
            FileShare.None,
            // 1MB buffer
            1024 * 1024,
            FileOptions.SequentialScan
        );

        await using var archive = new ZipArchive(fileStream, ZipArchiveMode.Create, false);
        _lastLogTime = DateTime.Now;

        // 从通道读取处理好的数据
        while (await reader.WaitToReadAsync(cancellationToken))
        {
            while (reader.TryRead(out var job))
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }
                await WriteToArchiveAsync(archive, job, totalFiles, cancellationToken);
            }
        }
    }

    private void ResetStatistics()
    {
        _fileCount = 0;
        _folderCount = 0;
        _errorCount = 0;
        _totalSize = 0;
        _processedCount = 0;
        _lastLogTime = DateTime.Now;
        _inMemorySize = 0;
        _peakInMemorySize = 0;
    }

    private async Task WriteToArchiveAsync(
        ZipArchive archive,
        FileJob job,
        int totalFiles,
        CancellationToken cancellationToken
    )
    {
        var (fileInfo, stream, isInMemory, fileLength) = job;
        try
        {
            var entry = archive.CreateEntry(fileInfo.RelativePath, _settings.CompressionLevel);
            // 恢复最后修改时间
            // 如果文件被占用或权限不足,这里可能会抛出异常,但不影响文件内容备份,所以忽略时间设置失败
            try
            {
                entry.LastWriteTime = File.GetLastWriteTime(fileInfo.FullPath);
            }
            catch (Exception ex)
            {
                logger.LogInformation(ex, "无法读取文件修改时间: {Path}", fileInfo.FullPath);
            }

            await using var entryStream = await entry.OpenAsync(cancellationToken);

            if (isInMemory && stream != null)
            {
                // 内存流直接复制
                await stream.CopyToAsync(entryStream, cancellationToken);
            }
            else
            {
                // 大文件直接从磁盘读取
                await using var fs = new FileStream(
                    fileInfo.FullPath,
                    FileMode.Open,
                    FileAccess.Read,
                    FileShare.Read,
                    64 * 1024,
                    FileOptions.SequentialScan
                );
                await fs.CopyToAsync(entryStream, cancellationToken);
            }

            _fileCount++;
            _totalSize += fileLength;
            _processedCount++;

            // 处理完后释放内存流并减去内存使用量
            if (isInMemory && stream != null)
            {
                await stream.DisposeAsync();
                // 释放后减去内存占用
                Interlocked.Add(ref _inMemorySize, -fileLength);
            }

            LogProgressIfNeeded(_processedCount, totalFiles);
        }
        catch (Exception ex)
        {
            _errorCount++;
            logger.LogError(ex, "写入压缩包失败: {Path}", fileInfo.FullPath);

            // 即使失败也要释放内存并减去计数
            if (isInMemory && stream != null)
            {
                try
                {
                    await stream.DisposeAsync();
                }
                catch
                {
                    // 忽略释放时的异常
                }
                Interlocked.Add(ref _inMemorySize, -fileLength);
            }
        }
    }

    private void LogProgressIfNeeded(int currentProcessed, int totalFiles)
    {
        // 每5秒或每2000个文件记录一次进度
        if (currentProcessed % 2000 == 0 || (DateTime.Now - _lastLogTime).TotalSeconds >= 5)
        {
            _lastLogTime = DateTime.Now;
            var progress = totalFiles > 0 ? (currentProcessed * 100.0) / totalFiles : 0;
            var sizeMb = _totalSize / (1024.0 * 1024.0);
            logger.LogInformation(
                "打包进度: {Progress:F2}% ({Processed}/{Total}), 已处理: {Size:F2} MB",
                progress,
                currentProcessed,
                totalFiles,
                sizeMb
            );
        }
    }

    private void LogCompletionStatistics(string zipFileName, DateTime startTime)
    {
        var elapsed = DateTime.Now - startTime;

        var finalSizeMb = _totalSize / (1024.0 * 1024.0);
        var finalSizeGb = _totalSize / (1024.0 * 1024.0 * 1024.0);
        var peakInMemorySizeGb = _peakInMemorySize / (1024.0 * 1024.0 * 1024.0);

        var zipFileInfo = new FileInfo(zipFileName);
        var compressedSizeMb = zipFileInfo.Length / (1024.0 * 1024.0);
        var compressionRatio =
            _totalSize > 0 ? (1 - (double)zipFileInfo.Length / _totalSize) * 100 : 0;

        logger.LogInformation(
            "备份打包完成: {ZipFile}\n"
                + "统计信息:\n"
                + "  - 文件数: {FileCount}\n"
                + "  - 目录数: {FolderCount}\n"
                + "  - 错误数: {ErrorCount}\n"
                + "  - 原始大小: {OriginalSize:F2} GB ({OriginalSizeMB:F2} MB)\n"
                + "  - 压缩后大小: {CompressedSize:F2} MB\n"
                + "  - 压缩率: {CompressionRatio:F2}%\n"
                + "  - 小文件内存峰值: {PeakInMemorySize:F2} GB\n"
                + "  - 耗时: {Elapsed}",
            zipFileName,
            _fileCount,
            _folderCount,
            _errorCount,
            finalSizeGb,
            finalSizeMb,
            compressedSizeMb,
            compressionRatio,
            peakInMemorySizeGb,
            elapsed
        );
    }

    private async Task CollectFilesAsync(
        DirectoryInfo directory,
        string rootPath,
        ConcurrentBag<FileEntry> files
    )
    {
        try
        {
            Interlocked.Increment(ref _folderCount);

            // 收集当前目录的所有文件
            var directoryFiles = directory.GetFiles();
            foreach (var file in directoryFiles)
            {
                if (ShouldIgnore(file, rootPath))
                {
                    continue;
                }

                try
                {
                    var relativePath = Path.GetRelativePath(rootPath, file.FullName);
                    files.Add(new FileEntry(file.FullName, relativePath));
                }
                catch (Exception ex)
                {
                    Interlocked.Increment(ref _errorCount);
                    logger.LogError(ex, "收集文件信息失败 {FilePath},已跳过", file.FullName);
                }
            }

            // 并行处理子目录
            var subDirectories = directory.GetDirectories();
            await Parallel.ForEachAsync(
                subDirectories,
                new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
                async (subDirectory, _) =>
                {
                    if (ShouldIgnoreDirectory(subDirectory, rootPath))
                    {
                        return;
                    }
                    await ParallelFolderAsync(rootPath, files, subDirectory);
                }
            );
        }
        catch (UnauthorizedAccessException ex)
        {
            Interlocked.Increment(ref _errorCount);
            logger.LogError(
                ex,
                "没有访问目录 {DirectoryPath} 的权限,已跳过该目录",
                directory.FullName
            );
        }
        catch (Exception ex)
        {
            Interlocked.Increment(ref _errorCount);
            logger.LogError(
                ex,
                "处理目录 {DirectoryPath} 时发生错误,已跳过该目录",
                directory.FullName
            );
        }
    }

    /// <summary>
    /// 验证忽略路径是否合法(必须在备份根目录下)
    /// </summary>
    private bool IsValidIgnorePath(
        string ignorePath,
        string rootPath,
        out string normalizedIgnorePath
    )
    {
        normalizedIgnorePath = string.Empty;

        try
        {
            // 规范化路径,处理相对路径和 .. 等情况
            var fullIgnorePath = Path.GetFullPath(ignorePath);
            var fullRootPath = Path.GetFullPath(rootPath);

            // 确保两个路径都以目录分隔符结尾,便于比较
            if (!fullRootPath.EndsWith(Path.DirectorySeparatorChar))
            {
                fullRootPath += Path.DirectorySeparatorChar;
            }

            if (!fullIgnorePath.EndsWith(Path.DirectorySeparatorChar))
            {
                fullIgnorePath += Path.DirectorySeparatorChar;
            }

            // 检查忽略路径是否在备份根路径下
            if (!fullIgnorePath.StartsWith(fullRootPath, StringComparison.OrdinalIgnoreCase))
            {
                // logger.LogWarning(
                //     "忽略路径配置不合理: {IgnorePath} 不在备份路径 {RootPath} 下,已跳过此配置",
                //     ignorePath,
                //     rootPath
                // );
                return false;
            }

            normalizedIgnorePath = fullIgnorePath.TrimEnd(Path.DirectorySeparatorChar);
            return true;
        }
        catch (Exception ex)
        {
            logger.LogWarning(ex, "忽略路径配置无效: {IgnorePath},已跳过此配置", ignorePath);
            return false;
        }
    }

    private bool ShouldIgnore(FileInfo file, string rootPath)
    {
        // 检查文件名 -> 大小写敏感
        if (_settings.IgnoreFiles.Any(x => x == file.Name))
        {
            return true;
        }

        // 检查扩展名 -> 大小写敏感
        if (_settings.IgnoreExtensions.Any(x => x == file.Extension))
        {
            return true;
        }

        // 检查文件所在目录是否应该被忽略
        if (file.Directory != null && ShouldIgnoreDirectory(file.Directory, rootPath))
        {
            return true;
        }

        return false;
    }

    private bool ShouldIgnoreDirectory(DirectoryInfo dir, string rootPath)
    {
        if (_settings.IgnorePaths.Length == 0)
        {
            return false;
        }

        var fullDirPath = Path.GetFullPath(dir.FullName);

        foreach (var ignorePath in _settings.IgnorePaths)
        {
            if (string.IsNullOrWhiteSpace(ignorePath))
            {
                continue;
            }

            if (!IsValidIgnorePath(ignorePath, rootPath, out var normalizedIgnorePath))
            {
                continue;
            }

            // 检查目录是否完全匹配忽略路径
            if (fullDirPath.Equals(normalizedIgnorePath, StringComparison.OrdinalIgnoreCase))
            {
                return true;
            }

            // 检查目录是否在忽略路径下
            if (
                fullDirPath.StartsWith(
                    normalizedIgnorePath + Path.DirectorySeparatorChar,
                    StringComparison.OrdinalIgnoreCase
                )
            )
            {
                return true;
            }
        }

        return false;
    }

    private async Task ParallelFolderAsync(
        string rootPath,
        ConcurrentBag<FileEntry> files,
        DirectoryInfo subDirectory
    )
    {
        try
        {
            await CollectFilesAsync(subDirectory, rootPath, files);
        }
        catch (Exception ex)
        {
            Interlocked.Increment(ref _errorCount);
            logger.LogError(ex, "处理子目录异常: {Path}", subDirectory.FullName);
        }
    }

    private readonly record struct FileEntry(string FullPath, string RelativePath);

    private record FileJob(FileEntry FileInfo, Stream? Content, bool IsInMemory, long FileLength);

    private record BackupJobSettings(
        string? BackupProgramPath,
        string[] IgnorePaths,
        string[] IgnoreFiles,
        string[] IgnoreExtensions,
        CompressionLevel CompressionLevel = CompressionLevel.Fastest
    );
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

代码功能解析:BackupProgramActivator

这段代码实现了一个程序备份功能,主要作用是将指定目录下的程序文件打包成ZIP压缩包,并上传到FTP服务器。以下是详细的功能解析:

主要功能

  1. 程序备份打包

    • 扫描指定目录下的所有文件和子目录
    • 将文件打包成ZIP压缩包
    • 支持多线程并行处理提高效率
    • 包含详细的进度日志和统计信息
  2. FTP上传

    • 将生成的ZIP备份文件上传到FTP服务器
    • 上传成功后删除本地备份文件
  3. 配置管理

    • 从配置文件中读取备份设置
    • 支持忽略特定路径、文件和扩展名

核心组件和工作流程

  1. StartAsync() - 入口方法

    • 检查配置有效性
    • 调用PackAsync()打包文件
    • 调用UploadAsync()上传备份
  2. PackAsync() - 打包流程

    • 创建带时间戳的ZIP文件名
    • 扫描目录收集文件(CollectFilesAsync)
    • 使用生产者-消费者模式并行处理文件
    • 生产者(ProduceFileJobsAsync)读取文件到内存或准备流
    • 消费者(ConsumeFileJobsAsync)写入ZIP文件
  3. 文件处理策略

    • 小文件(<20MB)读入内存加速处理
    • 大文件流式处理避免内存溢出
    • 内存使用监控和预警
  4. 忽略规则

    • 忽略特定路径(包括子目录)
    • 忽略特定文件名
    • 忽略特定扩展名

技术亮点

  1. 高效并行处理

    • 使用Channel实现生产者-消费者模式
    • 并行文件收集和读取
    • 内存和IO优化
  2. 资源管理

    • 内存使用监控和限制
    • 流式处理大文件
    • 及时释放资源
  3. 详细的日志和统计

    • 进度日志
    • 错误统计
    • 压缩率计算
    • 性能指标
  4. 容错处理

    • 异常捕获和记录
    • 跳过无权限目录
    • 错误计数

使用场景

这段代码适合用于:

  • 定期备份服务器上的应用程序
  • 需要将备份文件传输到远程存储的场景
  • 需要高效处理大量文件的备份需求
  • 需要灵活配置忽略规则的环境

代码结构清晰,功能完善,是一个专业级的备份解决方案。

评论加载中...