网站首页 网站源码

using Dpz.Core.EnumLibrary;
using Dpz.Core.Public.ViewModel;
using Dpz.Core.Service.ObjectStorage.Services;
using Dpz.Core.Service.RepositoryService;
using Dpz.Core.Web.Jobs.Configuration;
using Hangfire;
using Microsoft.Extensions.Options;
namespace Dpz.Core.Web.Jobs.Hangfire;
public class TaskListHandleActivator : JobActivator
{
private readonly IObjectStorageOperation _objectStorageService;
private readonly IWaitExecutionService _waitExecutionService;
private readonly ILogger<TaskListHandleActivator> _logger;
private readonly string _host;
private readonly CrawlerOptions _crawlerOptions;
public TaskListHandleActivator(
IObjectStorageOperation objectStorageService,
IWaitExecutionService waitExecutionService,
IConfiguration configuration,
ILogger<TaskListHandleActivator> logger,
IOptions<CrawlerOptions> crawlerOptions
)
{
_objectStorageService = objectStorageService;
_waitExecutionService = waitExecutionService;
_logger = logger;
_crawlerOptions = crawlerOptions.Value;
_host =
configuration.GetSection("upyun")["Host"] ?? throw new Exception("upyun Host is null");
}
[ProlongExpirationTime]
public async Task HandleAsync()
{
var taskList = await _waitExecutionService.GetNotExecuteTaskAsync();
// foreach (var item in taskList.Where(x => x.Category == ExecuteCategory.Delete))
// {
// await DeleteAsync(item);
// }
// 限制并发度
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = _crawlerOptions.MaxDegreeOfParallelism,
CancellationToken = CancellationToken.None,
};
await Parallel.ForEachAsync(
taskList,
parallelOptions,
async (item, cancellationToken) =>
{
if (item.Category == ExecuteCategory.Delete)
{
await Task.Delay(
TimeSpan.FromMilliseconds(_crawlerOptions.RequestDelayMs),
cancellationToken
);
await DeleteAsync(item);
}
}
);
await _waitExecutionService.ClearExpiredAsync();
}
private async Task DeleteAsync(VmWaitExecution task)
{
switch (task.Scheme)
{
case ExecuteScheme.DeleteUpyun:
try
{
if (
!string.IsNullOrEmpty(task.Relation)
&& task.Relation.StartsWith(
_host,
StringComparison.CurrentCultureIgnoreCase
)
)
{
var relation = task.Relation[_host.Length..];
await _objectStorageService.DeleteAsync(relation);
}
}
catch (Exception e)
{
_logger.LogError(e, "delete fail");
await _waitExecutionService.UpdateStatusAsync(
task.Id,
ExecuteStatus.ExecuteFail
);
}
await _waitExecutionService.UpdateStatusAsync(
task.Id,
ExecuteStatus.ExecuteComplete
);
break;
case ExecuteScheme.None:
default:
_logger.LogInformation("execute scheme:{ExecuteScheme}", task.Scheme.ToString());
break;
}
}
}
这段代码定义了一个名为 TaskListHandleActivator 的类,它是一个 Hangfire 任务处理器,主要用于处理待执行的任务列表。以下是代码的主要功能和结构的详细解释:
IObjectStorageOperation、IWaitExecutionService、IConfiguration 和 ILogger<TaskListHandleActivator>。这些参数通过依赖注入提供,允许类在运行时使用这些服务。upyun 的 Host 值,如果未找到则抛出异常。HandleAsync_waitExecutionService.GetNotExecuteTaskAsync() 方法获取所有未执行的任务列表。Parallel.ForEachAsync 方法并行处理任务列表中的每个任务。对于每个任务,如果其类别为 ExecuteCategory.Delete,则调用 DeleteAsync 方法进行删除操作。DeleteAsyncScheme 属性决定如何处理任务。Scheme 是 DeleteUpyun,则检查 Relation 属性是否以 _host 开头。如果是,则提取出相对路径并调用 _objectStorageService.DeleteAsync(relation) 方法进行删除。ExecuteFail。ExecuteComplete。Scheme 是 None 或其他未定义的情况,记录相关信息。整体来看,TaskListHandleActivator 类的主要功能是从待执行任务列表中获取任务,并根据任务的类别和方案执行相应的操作,特别是删除操作。它使用了异步编程和并行处理来提高性能,并通过日志记录和异常处理来确保操作的可靠性。
