using System.Buffers;
using System.Collections.Specialized;
using System.Web;
using Dpz.Core.Public.Entity.VideoEntity;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Primitives;
namespace Dpz.Core.Service.Mediator.Features.Storage.Commands;
/// <summary>
/// 处理又拍云回调通知
/// </summary>
public class UpyunNotificationEventHandle(
ILogger<UpyunNotificationEventHandle> logger,
IConfiguration configuration,
IRepository<Notification> repository
) : IRequestHandler<UpyunNotificationRequest, ResponseResult>
{
/// <summary>
/// 验签并处理又拍云回调通知,保存转码元数据并清理源文件。
/// </summary>
public async ValueTask<ResponseResult> Handle(
UpyunNotificationRequest request,
CancellationToken cancellationToken
)
{
if (!await CheckSignature(request.Request, request.UpyunOperator))
{
return ResponseResult.Fail("签名验证失败", 401);
}
var body = await ReadBodyAsync(request.Request);
logger.LogInformation(
"receive upyun notification: {Body},authorization:{Authorization},content-type:{ContentType}",
body,
request.Request.Headers.Authorization,
request.Request.Headers.ContentType
);
var parameters = HttpUtility.ParseQueryString(body);
var bucketName = parameters.Get("bucket_name");
if (string.IsNullOrEmpty(bucketName) || bucketName != request.UpyunOperator.Bucket)
{
return ResponseResult.Fail("非法存储桶", 401);
}
var statusCode = parameters.Get("status_code");
if (string.IsNullOrEmpty(statusCode) || statusCode != "200")
{
return ResponseResult.Fail("回调状态异常", 400);
}
var sourceUrl = parameters.Get("media_uris[0]");
if (!string.IsNullOrEmpty(sourceUrl))
{
await request.DeleteAsync(sourceUrl, cancellationToken);
}
var meta = GetVideoMetaInformation(parameters);
if (meta != null)
{
await repository.InsertAsync(meta, cancellationToken);
}
return new ResponseResult
{
Success = true,
Code = 204,
Message = "NoContent",
};
}
private async Task<bool> CheckSignature(HttpRequest request, UpyunOperator upyunOperator)
{
var authorization = request.Headers.Authorization;
var requestDate = request.Headers.Date;
var contentMd5 = request.Headers.ContentMD5;
if (
StringValues.IsNullOrEmpty(authorization)
|| StringValues.IsNullOrEmpty(requestDate)
|| StringValues.IsNullOrEmpty(contentMd5)
)
{
return false;
}
var notifyUrl = configuration["NotifyUrl"] ?? throw new InvalidConfigurationException();
var uri = new Uri(notifyUrl);
var signatureBody = $"POST&{uri.OriginalString}&{requestDate}&{contentMd5}";
var signatureKeyBytes = Encoding.UTF8.GetBytes(
upyunOperator.Password ?? throw new InvalidConfigurationException()
);
using var hmac = new HMACSHA1(signatureKeyBytes);
var signatureBodyBytes = Encoding.UTF8.GetBytes(signatureBody);
await using var stream = ApplicationTools.MemoryStreamManager.GetStream(signatureBodyBytes);
var signatureBytes = await hmac.ComputeHashAsync(stream);
var signature = Convert.ToBase64String(signatureBytes);
return authorization == $"UPYUN {upyunOperator.Operator}:{signature}";
}
private static async Task<string> ReadBodyAsync(HttpRequest request)
{
var body = new StringBuilder();
var reader = request.BodyReader;
while (true)
{
var readResult = await reader.ReadAsync();
var buffer = readResult.Buffer;
if (readResult.IsCompleted && buffer.Length > 0)
{
AppendString(body, in buffer);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (readResult.IsCompleted)
{
break;
}
}
return body.ToString();
}
private static void AppendString(
StringBuilder result,
in ReadOnlySequence<byte> readOnlySequence
)
{
var span = readOnlySequence.IsSingleSegment
? readOnlySequence.First.Span
: readOnlySequence.ToArray().AsSpan();
result.Append(Encoding.UTF8.GetString(span));
}
private readonly JsonSerializerOptions _options = new() { PropertyNameCaseInsensitive = true };
private Notification? GetVideoMetaInformation(NameValueCollection parameters)
{
var info = parameters.Get("info");
Notification? notification = null;
if (string.IsNullOrEmpty(info))
{
return notification;
}
var bytes = DpzAppBuilderExtensions.Base64StringToBytes(info);
if (bytes.Length <= 0)
{
return notification;
}
var json = Encoding.UTF8.GetString(bytes);
try
{
notification = JsonSerializer.Deserialize<Notification>(json, _options);
}
catch (Exception e)
{
logger.LogError(e, "parse notification info failed: {Json}", json);
return notification;
}
logger.LogInformation("notification: {@Notification}", notification);
return notification;
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这段代码实现了一个处理又拍云(UPYUN)存储服务回调通知的处理器。让我详细解释其功能和实现:
整体功能
这是一个基于MediatR模式的请求处理器,用于处理又拍云存储服务的回调通知,主要用于视频文件处理完成后的后续操作。
核心处理流程
1. 签名验证 (CheckSignature 方法)
private async Task<bool> CheckSignature(HttpRequest request, UpyunOperator upyunOperator)
- 验证请求是否来自合法的又拍云服务
- 检查必要的HTTP头:
Authorization、Date、Content-MD5 - 使用HMAC-SHA1算法计算签名并与请求中的签名对比
- 签名格式:
UPYUN {operator}:{base64_encoded_signature}
2. 请求体读取 (ReadBodyAsync 方法)
private static async Task<string> ReadBodyAsync(HttpRequest request)
- 使用高性能的
BodyReader异步读取HTTP请求体 - 处理可能的分段数据,使用
ReadOnlySequence<byte> - 将字节数据转换为UTF-8字符串
3. 主处理逻辑 (Handle 方法)
public async ValueTask<ResponseResult> Handle(UpyunNotificationRequest request, CancellationToken cancellationToken)
验证步骤:
- 签名验证
- 存储桶名称验证
- 状态码验证(必须为"200")
业务处理:
- 解析回调参数
- 删除源文件(如果存在
media_uris[0]) - 提取并保存视频元数据信息
4. 视频元数据处理 (GetVideoMetaInformation 方法)
private Notification? GetVideoMetaInformation(NameValueCollection parameters)
- 从参数中获取base64编码的
info字段 - 解码并反序列化为
Notification对象 - 错误处理和日志记录
技术特点
性能优化:
- 使用
ReadOnlySequence<byte>处理大数据流 - 异步操作避免线程阻塞
- 使用内存池管理(
ApplicationTools.MemoryStreamManager)
- 使用
安全性:
- HMAC-SHA1签名验证
- 多重参数验证
- 存储桶权限检查
可靠性:
- 完善的异常处理
- 详细的日志记录
- 取消令牌支持
现代化设计:
- 依赖注入
- MediatR模式
- 配置化管理
使用场景
这个处理器主要用于视频处理工作流中:
- 视频上传到又拍云
- 又拍云进行视频转码/处理
- 处理完成后回调此接口
- 验证回调合法性,保存元数据,清理源文件
这是一个企业级的视频处理回调处理方案,具有良好的安全性、性能和可维护性。
评论加载中...