using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO.Compression;
using System.Threading.Channels;
using Dpz.Core.Service.ObjectStorage.Services;
using Dpz.Core.Shard.Service;
using Hangfire;
using Hangfire.Annotations;
namespace Dpz.Core.Web.Jobs.Hangfire;
[UsedImplicitly]
public class BackupProgramActivator(
IConfiguration configuration,
ILogger<BackupProgramActivator> logger,
ISafeFileService safeFileService
) : JobActivator
{
private readonly CancellationTokenSource _cts = new();
private int _fileCount;
private int _folderCount;
private int _errorCount;
private long _totalSize;
private int _processedCount;
private DateTime _lastLogTime;
private long _inMemorySize;
private long _peakInMemorySize;
private readonly BackupJobSettings _settings =
configuration.GetSection("BackupSettings").Get<BackupJobSettings>() ?? new BackupJobSettings("", [], [], []);
public async Task StartAsync()
{
if (
string.IsNullOrWhiteSpace(_settings.BackupProgramPath)
|| !Directory.Exists(_settings.BackupProgramPath)
)
{
logger.LogWarning("未配置程序备份路径或路径无效");
return;
}
var zipFilePath = await PackAsync(_settings.BackupProgramPath);
if (string.IsNullOrWhiteSpace(zipFilePath))
{
logger.LogWarning("备份未成功创建");
return;
}
// 上传到 FTP
var uploadSuccess = await UploadAsync(zipFilePath);
if (uploadSuccess)
{
// 仅在上传成功后删除本地文件
logger.LogInformation("FTP 上传成功,删除本地备份文件: {Path}", zipFilePath);
File.Delete(zipFilePath);
}
else
{
logger.LogWarning("FTP 上传失败,保留本地备份文件: {Path}", zipFilePath);
}
}
private async Task<bool> UploadAsync(string filePath)
{
var sw = new Stopwatch();
sw.Start();
var fileName = Path.GetFileName(filePath);
var date = DateTime.Now;
List<string> pathToFile =
[
"program_backup",
date.Year.ToString(),
date.Month.ToString(),
$"{fileName}",
];
#if DEBUG
pathToFile.Insert(0, "Test");
#endif
await using var stream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
1 << 12
);
var cloudFile = new CloudFile { PathToFile = pathToFile, Stream = stream };
var address = await safeFileService.UploadFileForFtpAsync(cloudFile, _cts.Token);
sw.Stop();
if (address == null)
{
logger.LogError("FTP 上传失败: 返回地址为空");
return false;
}
logger.LogInformation(
"FTP 上传成功,耗时:{Elapsed}: {FileName} -> {Address}",
sw.Elapsed.ToString("g"),
fileName,
address
);
return true;
}
private async Task<string?> PackAsync(string backupProgramPath)
{
var startTime = DateTime.Now;
ResetStatistics();
try
{
// 生成备份文件名(包含时间戳)
var timestamp = DateTime.Now.ToString("yyyyMMdd_HHmmss");
var backupDirectory = Path.Combine(AppContext.BaseDirectory, "program_backup");
if (!Directory.Exists(backupDirectory))
{
Directory.CreateDirectory(backupDirectory);
}
var zipFullName = Path.Combine(backupDirectory, $"backup_{timestamp}.zip");
logger.LogInformation(
"开始打包备份: {Source} -> {Destination}\n压缩级别: {Level}",
backupProgramPath,
zipFullName,
_settings.CompressionLevel
);
// 1. 扫描文件
var filesToPack = new ConcurrentBag<FileEntry>();
var sourceDirectory = new DirectoryInfo(backupProgramPath);
await CollectFilesAsync(sourceDirectory, sourceDirectory.FullName, filesToPack);
var totalFiles = filesToPack.Count;
logger.LogInformation(
"已扫描文件: {TotalFiles} 个文件, {TotalFolders} 个目录 (已忽略部分文件)",
totalFiles,
_folderCount
);
if (totalFiles == 0)
{
logger.LogWarning("没有找到需要备份的文件");
return null;
}
// 2. 准备并行处理管道
// 限制缓冲区大小,防止内存溢出
// 这里给500个槽位,极端情况如果连续遇到 500 个接近 20MB 的文件,内存占用大约 500 * 20MB = 10GB
// 如果是大文件,不读入内存,直接流式处理
var channel = Channel.CreateBounded<FileJob>(
new BoundedChannelOptions(500)
{
SingleReader = true,
SingleWriter = false,
FullMode = BoundedChannelFullMode.Wait,
}
);
// 3. 启动消费者(写入任务)
var consumeTask = ConsumeFileJobsAsync(
channel.Reader,
zipFullName,
totalFiles,
_cts.Token
);
// 4. 启动生产者(读取任务)
// 生产者负责并行读取文件并写入 Channel
await ProduceFileJobsAsync(filesToPack, channel.Writer, _cts.Token);
// 标记通道完成,告诉消费者没有更多数据了
channel.Writer.Complete();
// 等待消费者写入完成
await consumeTask;
LogCompletionStatistics(zipFullName, startTime);
// 验证文件已成功创建
if (File.Exists(zipFullName))
{
return zipFullName;
}
logger.LogError("备份文件创建失败: {Path}", zipFullName);
return null;
}
catch (OperationCanceledException)
{
logger.LogWarning("备份打包操作已取消");
return null;
}
catch (Exception ex)
{
logger.LogError(ex, "备份打包失败: {Path}", backupProgramPath);
return null;
}
}
private async Task ProduceFileJobsAsync(
IEnumerable<FileEntry> filesToPack,
ChannelWriter<FileJob> writer,
CancellationToken cancellationToken
)
{
// 小文件 -> 使用多线程并行读取文件到内存
await Parallel.ForEachAsync(
filesToPack,
new ParallelOptions
{
// IO密集型增加并发
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
CancellationToken = cancellationToken,
},
async (fileEntry, ct) =>
{
try
{
var fileInfo = new FileInfo(fileEntry.FullPath);
if (!fileInfo.Exists)
{
return;
}
Stream? contentStream = null;
var isInMemory = false;
// 策略:小于 20MB 的文件读取到内存,加速IO并减少写入时的磁盘寻址
// 大于 20MB 的文件直接传递路径,由写入线程流式读取,避免占用过多内存
const long memoryThreshold = 20 * 1024 * 1024;
if (fileInfo.Length < memoryThreshold)
{
var ms = new MemoryStream((int)fileInfo.Length);
await using var fs = new FileStream(
fileEntry.FullPath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
64 * 1024,
FileOptions.SequentialScan | FileOptions.Asynchronous
);
await fs.CopyToAsync(ms, ct);
ms.Position = 0;
contentStream = ms;
isInMemory = true;
// 累计内存使用量并检查是否超过警告阈值
var currentMemorySize = Interlocked.Add(ref _inMemorySize, fileInfo.Length);
// 更新峰值
long currentPeak;
long newPeak;
do
{
currentPeak = Interlocked.Read(ref _peakInMemorySize);
newPeak = Math.Max(currentPeak, currentMemorySize);
} while (
Interlocked.CompareExchange(ref _peakInMemorySize, newPeak, currentPeak)
!= currentPeak
);
const long memoryWarningThreshold = 2L * 1024 * 1024 * 1024;
if (
currentMemorySize > memoryWarningThreshold
&& currentMemorySize - fileInfo.Length <= memoryWarningThreshold
)
{
var memoryGb = currentMemorySize / (1024.0 * 1024.0 * 1024.0);
logger.LogWarning(
"小文件内存使用量已超过 2GB 警告阈值: {MemorySize:F2} GB",
memoryGb
);
}
}
// 如果队列满了这里会等待,形成背压
await writer.WriteAsync(
new FileJob(fileEntry, contentStream, isInMemory, fileInfo.Length),
ct
);
}
catch (Exception ex)
{
Interlocked.Increment(ref _errorCount);
logger.LogError(ex, "读取文件失败: {Path}", fileEntry.FullPath);
}
}
);
}
private async Task ConsumeFileJobsAsync(
ChannelReader<FileJob> reader,
string zipFileName,
int totalFiles,
CancellationToken cancellationToken
)
{
await using var fileStream = new FileStream(
zipFileName,
FileMode.Create,
FileAccess.Write,
FileShare.None,
// 1MB buffer
1024 * 1024,
FileOptions.SequentialScan
);
await using var archive = new ZipArchive(fileStream, ZipArchiveMode.Create, false);
_lastLogTime = DateTime.Now;
// 从通道读取处理好的数据
while (await reader.WaitToReadAsync(cancellationToken))
{
while (reader.TryRead(out var job))
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
await WriteToArchiveAsync(archive, job, totalFiles, cancellationToken);
}
}
}
private void ResetStatistics()
{
_fileCount = 0;
_folderCount = 0;
_errorCount = 0;
_totalSize = 0;
_processedCount = 0;
_lastLogTime = DateTime.Now;
_inMemorySize = 0;
_peakInMemorySize = 0;
}
private async Task WriteToArchiveAsync(
ZipArchive archive,
FileJob job,
int totalFiles,
CancellationToken cancellationToken
)
{
var (fileInfo, stream, isInMemory, fileLength) = job;
try
{
var entry = archive.CreateEntry(fileInfo.RelativePath, _settings.CompressionLevel);
// 恢复最后修改时间
// 如果文件被占用或权限不足,这里可能会抛出异常,但不影响文件内容备份,所以忽略时间设置失败
try
{
entry.LastWriteTime = File.GetLastWriteTime(fileInfo.FullPath);
}
catch (Exception ex)
{
logger.LogInformation(ex, "无法读取文件修改时间: {Path}", fileInfo.FullPath);
}
await using var entryStream = await entry.OpenAsync(cancellationToken);
if (isInMemory && stream != null)
{
// 内存流直接复制
await stream.CopyToAsync(entryStream, cancellationToken);
}
else
{
// 大文件直接从磁盘读取
await using var fs = new FileStream(
fileInfo.FullPath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
64 * 1024,
FileOptions.SequentialScan
);
await fs.CopyToAsync(entryStream, cancellationToken);
}
_fileCount++;
_totalSize += fileLength;
_processedCount++;
// 处理完后释放内存流并减去内存使用量
if (isInMemory && stream != null)
{
await stream.DisposeAsync();
// 释放后减去内存占用
Interlocked.Add(ref _inMemorySize, -fileLength);
}
LogProgressIfNeeded(_processedCount, totalFiles);
}
catch (Exception ex)
{
_errorCount++;
logger.LogError(ex, "写入压缩包失败: {Path}", fileInfo.FullPath);
// 即使失败也要释放内存并减去计数
if (isInMemory && stream != null)
{
try
{
await stream.DisposeAsync();
}
catch
{
// 忽略释放时的异常
}
Interlocked.Add(ref _inMemorySize, -fileLength);
}
}
}
private void LogProgressIfNeeded(int currentProcessed, int totalFiles)
{
// 每5秒或每2000个文件记录一次进度
if (currentProcessed % 2000 == 0 || (DateTime.Now - _lastLogTime).TotalSeconds >= 5)
{
_lastLogTime = DateTime.Now;
var progress = totalFiles > 0 ? (currentProcessed * 100.0) / totalFiles : 0;
var sizeMb = _totalSize / (1024.0 * 1024.0);
logger.LogInformation(
"打包进度: {Progress:F2}% ({Processed}/{Total}), 已处理: {Size:F2} MB",
progress,
currentProcessed,
totalFiles,
sizeMb
);
}
}
private void LogCompletionStatistics(string zipFileName, DateTime startTime)
{
var elapsed = DateTime.Now - startTime;
var finalSizeMb = _totalSize / (1024.0 * 1024.0);
var finalSizeGb = _totalSize / (1024.0 * 1024.0 * 1024.0);
var peakInMemorySizeGb = _peakInMemorySize / (1024.0 * 1024.0 * 1024.0);
var zipFileInfo = new FileInfo(zipFileName);
var compressedSizeMb = zipFileInfo.Length / (1024.0 * 1024.0);
var compressionRatio =
_totalSize > 0 ? (1 - (double)zipFileInfo.Length / _totalSize) * 100 : 0;
logger.LogInformation(
"备份打包完成: {ZipFile}\n"
+ "统计信息:\n"
+ " - 文件数: {FileCount}\n"
+ " - 目录数: {FolderCount}\n"
+ " - 错误数: {ErrorCount}\n"
+ " - 原始大小: {OriginalSize:F2} GB ({OriginalSizeMB:F2} MB)\n"
+ " - 压缩后大小: {CompressedSize:F2} MB\n"
+ " - 压缩率: {CompressionRatio:F2}%\n"
+ " - 小文件内存峰值: {PeakInMemorySize:F2} GB\n"
+ " - 耗时: {Elapsed}",
zipFileName,
_fileCount,
_folderCount,
_errorCount,
finalSizeGb,
finalSizeMb,
compressedSizeMb,
compressionRatio,
peakInMemorySizeGb,
elapsed
);
}
private async Task CollectFilesAsync(
DirectoryInfo directory,
string rootPath,
ConcurrentBag<FileEntry> files
)
{
try
{
Interlocked.Increment(ref _folderCount);
// 收集当前目录的所有文件
var directoryFiles = directory.GetFiles();
foreach (var file in directoryFiles)
{
if (ShouldIgnore(file, rootPath))
{
continue;
}
try
{
var relativePath = Path.GetRelativePath(rootPath, file.FullName);
files.Add(new FileEntry(file.FullName, relativePath));
}
catch (Exception ex)
{
Interlocked.Increment(ref _errorCount);
logger.LogError(ex, "收集文件信息失败 {FilePath},已跳过", file.FullName);
}
}
// 并行处理子目录
var subDirectories = directory.GetDirectories();
await Parallel.ForEachAsync(
subDirectories,
new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
async (subDirectory, _) =>
{
if (ShouldIgnoreDirectory(subDirectory, rootPath))
{
return;
}
await ParallelFolderAsync(rootPath, files, subDirectory);
}
);
}
catch (UnauthorizedAccessException ex)
{
Interlocked.Increment(ref _errorCount);
logger.LogError(
ex,
"没有访问目录 {DirectoryPath} 的权限,已跳过该目录",
directory.FullName
);
}
catch (Exception ex)
{
Interlocked.Increment(ref _errorCount);
logger.LogError(
ex,
"处理目录 {DirectoryPath} 时发生错误,已跳过该目录",
directory.FullName
);
}
}
/// <summary>
/// 验证忽略路径是否合法(必须在备份根目录下)
/// </summary>
private bool IsValidIgnorePath(
string ignorePath,
string rootPath,
out string normalizedIgnorePath
)
{
normalizedIgnorePath = string.Empty;
try
{
// 规范化路径,处理相对路径和 .. 等情况
var fullIgnorePath = Path.GetFullPath(ignorePath);
var fullRootPath = Path.GetFullPath(rootPath);
// 确保两个路径都以目录分隔符结尾,便于比较
if (!fullRootPath.EndsWith(Path.DirectorySeparatorChar))
{
fullRootPath += Path.DirectorySeparatorChar;
}
if (!fullIgnorePath.EndsWith(Path.DirectorySeparatorChar))
{
fullIgnorePath += Path.DirectorySeparatorChar;
}
// 检查忽略路径是否在备份根路径下
if (!fullIgnorePath.StartsWith(fullRootPath, StringComparison.OrdinalIgnoreCase))
{
// logger.LogWarning(
// "忽略路径配置不合理: {IgnorePath} 不在备份路径 {RootPath} 下,已跳过此配置",
// ignorePath,
// rootPath
// );
return false;
}
normalizedIgnorePath = fullIgnorePath.TrimEnd(Path.DirectorySeparatorChar);
return true;
}
catch (Exception ex)
{
logger.LogWarning(ex, "忽略路径配置无效: {IgnorePath},已跳过此配置", ignorePath);
return false;
}
}
private bool ShouldIgnore(FileInfo file, string rootPath)
{
// 检查文件名 -> 大小写敏感
if (_settings.IgnoreFiles.Any(x => x == file.Name))
{
return true;
}
// 检查扩展名 -> 大小写敏感
if (_settings.IgnoreExtensions.Any(x => x == file.Extension))
{
return true;
}
// 检查文件所在目录是否应该被忽略
if (file.Directory != null && ShouldIgnoreDirectory(file.Directory, rootPath))
{
return true;
}
return false;
}
private bool ShouldIgnoreDirectory(DirectoryInfo dir, string rootPath)
{
if (_settings.IgnorePaths.Length == 0)
{
return false;
}
var fullDirPath = Path.GetFullPath(dir.FullName);
foreach (var ignorePath in _settings.IgnorePaths)
{
if (string.IsNullOrWhiteSpace(ignorePath))
{
continue;
}
if (!IsValidIgnorePath(ignorePath, rootPath, out var normalizedIgnorePath))
{
continue;
}
// 检查目录是否完全匹配忽略路径
if (fullDirPath.Equals(normalizedIgnorePath, StringComparison.OrdinalIgnoreCase))
{
return true;
}
// 检查目录是否在忽略路径下
if (
fullDirPath.StartsWith(
normalizedIgnorePath + Path.DirectorySeparatorChar,
StringComparison.OrdinalIgnoreCase
)
)
{
return true;
}
}
return false;
}
private async Task ParallelFolderAsync(
string rootPath,
ConcurrentBag<FileEntry> files,
DirectoryInfo subDirectory
)
{
try
{
await CollectFilesAsync(subDirectory, rootPath, files);
}
catch (Exception ex)
{
Interlocked.Increment(ref _errorCount);
logger.LogError(ex, "处理子目录异常: {Path}", subDirectory.FullName);
}
}
private readonly record struct FileEntry(string FullPath, string RelativePath);
private record FileJob(FileEntry FileInfo, Stream? Content, bool IsInMemory, long FileLength);
private record BackupJobSettings(
string? BackupProgramPath,
string[] IgnorePaths,
string[] IgnoreFiles,
string[] IgnoreExtensions,
CompressionLevel CompressionLevel = CompressionLevel.Fastest
);
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
代码功能解析:BackupProgramActivator
这段代码实现了一个程序备份功能,主要作用是将指定目录下的程序文件打包成ZIP压缩包,并上传到FTP服务器。以下是详细的功能解析:
主要功能
程序备份打包
- 扫描指定目录下的所有文件和子目录
- 将文件打包成ZIP压缩包
- 支持多线程并行处理提高效率
- 包含详细的进度日志和统计信息
FTP上传
- 将生成的ZIP备份文件上传到FTP服务器
- 上传成功后删除本地备份文件
配置管理
- 从配置文件中读取备份设置
- 支持忽略特定路径、文件和扩展名
核心组件和工作流程
StartAsync() - 入口方法
- 检查配置有效性
- 调用PackAsync()打包文件
- 调用UploadAsync()上传备份
PackAsync() - 打包流程
- 创建带时间戳的ZIP文件名
- 扫描目录收集文件(CollectFilesAsync)
- 使用生产者-消费者模式并行处理文件
- 生产者(ProduceFileJobsAsync)读取文件到内存或准备流
- 消费者(ConsumeFileJobsAsync)写入ZIP文件
文件处理策略
- 小文件(<20MB)读入内存加速处理
- 大文件流式处理避免内存溢出
- 内存使用监控和预警
忽略规则
- 忽略特定路径(包括子目录)
- 忽略特定文件名
- 忽略特定扩展名
技术亮点
高效并行处理
- 使用Channel实现生产者-消费者模式
- 并行文件收集和读取
- 内存和IO优化
资源管理
- 内存使用监控和限制
- 流式处理大文件
- 及时释放资源
详细的日志和统计
- 进度日志
- 错误统计
- 压缩率计算
- 性能指标
容错处理
- 异常捕获和记录
- 跳过无权限目录
- 错误计数
使用场景
这段代码适合用于:
- 定期备份服务器上的应用程序
- 需要将备份文件传输到远程存储的场景
- 需要高效处理大量文件的备份需求
- 需要灵活配置忽略规则的环境
代码结构清晰,功能完善,是一个专业级的备份解决方案。
评论加载中...