using System.Net.Sockets;
using Dpz.Core.Public.ViewModel.Response;
using ProtocolType = Dpz.Core.EnumLibrary.ProtocolType;
namespace Dpz.Core.Service.Mediator.Features.Health.Commands;
/// <summary>
/// 处理健康检查任务,按配置周期执行端点探测并更新检查结果。
/// </summary>
public class HealthCheckHandler(
IRepository<HealthCheck> repository,
IMapper mapper,
IHttpClientFactory httpClientFactory
) : IRequestHandler<HealthCheckRequest>
{
private readonly HttpClient _httpClient = httpClientFactory.CreateClient("edge");
private static Lazy<List<HealthCheckResponse>>? _lazyHealthChecks;
/// <summary>
/// 执行健康检查轮询。
/// </summary>
public async ValueTask<Unit> Handle(
HealthCheckRequest request,
CancellationToken cancellationToken
)
{
if (_lazyHealthChecks is not { IsValueCreated: true })
{
var entities = await repository.SearchFor(x => true).ToListAsync(cancellationToken);
var healthChecks = mapper.Map<List<HealthCheckResponse>>(entities);
_lazyHealthChecks = new Lazy<List<HealthCheckResponse>>(() => healthChecks);
}
foreach (var item in _lazyHealthChecks.Value)
{
var lastCheckTime = item.LastCheckTime ?? DateTime.MinValue;
var currentInterval = DateTime.Now - lastCheckTime;
if (currentInterval >= item.Interval)
{
var result = await CheckAsync(item);
item.LastCheckDuration = result.LastCheckDuration;
item.LastCheckTime = result.LastCheckTime;
}
}
return Unit.Value;
}
private async Task<HealthCheckResultResponse> CheckAsync(HealthCheckResponse healthCheck)
{
var checkTimer = Stopwatch.StartNew();
var retries = 0;
Exception? lastError = null;
while (retries < healthCheck.RetryCount)
{
try
{
using var cts = new CancellationTokenSource(healthCheck.Timeout);
var isHealthy = await CheckEndpointAsync(healthCheck, cts.Token);
return new HealthCheckResultResponse()
{
Status = isHealthy ? HealthStatus.Healthy : HealthStatus.Unhealthy,
LastCheckDuration = checkTimer.Elapsed,
LastCheckTime = DateTime.Now,
ErrorMessage = lastError?.Message,
};
}
catch (Exception ex)
{
lastError = ex;
retries++;
await Task.Delay(1000);
}
}
return new HealthCheckResultResponse()
{
Status = HealthStatus.Unhealthy,
LastCheckDuration = checkTimer.Elapsed,
LastCheckTime = DateTime.Now,
ErrorMessage = lastError?.Message,
};
}
private async Task<bool> CheckEndpointAsync(
HealthCheckResponse healthCheck,
CancellationToken cancellationToken
)
{
return healthCheck.Protocol switch
{
ProtocolType.Http or ProtocolType.Https => await CheckHttpAsync(
healthCheck,
cancellationToken
),
ProtocolType.Tcp => CheckTcp(healthCheck),
_ => throw new NotSupportedException($"协议 {healthCheck.Protocol} 不支持"),
};
}
private async Task<bool> CheckHttpAsync(
HealthCheckResponse healthCheck,
CancellationToken cancellationToken
)
{
var response = await _httpClient.GetAsync(healthCheck.Endpoint, cancellationToken);
return response.IsSuccessStatusCode;
}
private static bool CheckTcp(HealthCheckResponse healthCheck)
{
var parts = healthCheck.Endpoint.Split(':');
using var tcpClient = new TcpClient();
return tcpClient.ConnectAsync(parts[0], int.Parse(parts[1])).Wait(healthCheck.Timeout);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于MediatR模式的健康检查处理器,用于定期监控各种端点的可用性。让我详细解释代码的结构和功能:
类的职责
HealthCheckHandler 实现了 IRequestHandler<HealthCheckRequest>,负责处理健康检查任务,支持HTTP/HTTPS和TCP协议的端点监控。
构造函数依赖
IRepository<HealthCheck>: 用于访问健康检查配置数据IMapper: 用于对象映射(Entity到DTO)IHttpClientFactory: 创建HTTP客户端进行HTTP检查
核心方法分析
1. Handle 方法 - 主要处理逻辑
public async ValueTask<Unit> Handle(HealthCheckRequest request, CancellationToken cancellationToken)
- 使用懒加载模式缓存健康检查配置列表
- 遍历每个配置项,检查是否到了执行检查的时间
- 如果满足时间间隔要求,执行检查并更新结果
2. CheckAsync 方法 - 执行具体检查
private async Task<HealthCheckResultResponse> CheckAsync(HealthCheckResponse healthCheck)
- 实现重试机制:失败时最多重试指定次数
- 使用
Stopwatch测量检查耗时 - 捕获异常并在重试间隔添加延迟(1秒)
- 返回包含状态、耗时、时间和错误信息的结果
3. CheckEndpointAsync 方法 - 协议路由
private async Task<bool> CheckEndpointAsync(HealthCheckResponse healthCheck, CancellationToken cancellationToken)
使用模式匹配根据协议类型分发到具体检查方法:
- HTTP/HTTPS →
CheckHttpAsync - TCP →
CheckTcp
4. 协议特定检查方法
HTTP检查:
private async Task<bool> CheckHttpAsync(HealthCheckResponse healthCheck, CancellationToken cancellationToken)
- 发送GET请求到指定端点
- 通过
IsSuccessStatusCode判断健康状态
TCP检查:
private static bool CheckTcp(HealthCheckResponse healthCheck)
- 解析端点地址(host:port格式)
- 创建TCP连接并在超时时间内等待连接结果
设计特点
- 懒加载缓存:使用
Lazy<>避免重复从数据库加载配置 - 超时控制:每次检查都有独立的超时设置
- 重试机制:支持配置重试次数,提高检查可靠性
- 协议扩展性:通过模式匹配易于添加新协议支持
- 性能监控:记录每次检查的执行时间
潜在改进点
- TCP检查使用了阻塞式
Wait(),可能影响异步性能 - 缓存更新机制可以更智能,支持配置变更时的自动刷新
- 可以考虑添加更多协议支持(如UDP、ICMP等)
这个实现提供了一个完整的健康检查框架,适用于微服务架构中的服务监控场景。
评论加载中...