using System.Net.Sockets;
using Dpz.Core.Public.ViewModel.Response;
using ProtocolType = Dpz.Core.EnumLibrary.ProtocolType;

namespace Dpz.Core.Service.Mediator.Features.Health.Commands;

/// <summary>
/// 处理健康检查任务,按配置周期执行端点探测并更新检查结果。
/// </summary>
public class HealthCheckHandler(
    IRepository<HealthCheck> repository,
    IMapper mapper,
    IHttpClientFactory httpClientFactory
) : IRequestHandler<HealthCheckRequest>
{
    private readonly HttpClient _httpClient = httpClientFactory.CreateClient("edge");
    private static Lazy<List<HealthCheckResponse>>? _lazyHealthChecks;

    /// <summary>
    /// 执行健康检查轮询。
    /// </summary>
    public async ValueTask<Unit> Handle(
        HealthCheckRequest request,
        CancellationToken cancellationToken
    )
    {
        if (_lazyHealthChecks is not { IsValueCreated: true })
        {
            var entities = await repository.SearchFor(x => true).ToListAsync(cancellationToken);
            var healthChecks = mapper.Map<List<HealthCheckResponse>>(entities);
            _lazyHealthChecks = new Lazy<List<HealthCheckResponse>>(() => healthChecks);
        }

        foreach (var item in _lazyHealthChecks.Value)
        {
            var lastCheckTime = item.LastCheckTime ?? DateTime.MinValue;
            var currentInterval = DateTime.Now - lastCheckTime;
            if (currentInterval >= item.Interval)
            {
                var result = await CheckAsync(item);
                item.LastCheckDuration = result.LastCheckDuration;
                item.LastCheckTime = result.LastCheckTime;
            }
        }

        return Unit.Value;
    }

    private async Task<HealthCheckResultResponse> CheckAsync(HealthCheckResponse healthCheck)
    {
        var checkTimer = Stopwatch.StartNew();
        var retries = 0;
        Exception? lastError = null;

        while (retries < healthCheck.RetryCount)
        {
            try
            {
                using var cts = new CancellationTokenSource(healthCheck.Timeout);
                var isHealthy = await CheckEndpointAsync(healthCheck, cts.Token);

                return new HealthCheckResultResponse()
                {
                    Status = isHealthy ? HealthStatus.Healthy : HealthStatus.Unhealthy,
                    LastCheckDuration = checkTimer.Elapsed,
                    LastCheckTime = DateTime.Now,
                    ErrorMessage = lastError?.Message,
                };
            }
            catch (Exception ex)
            {
                lastError = ex;
                retries++;
                await Task.Delay(1000);
            }
        }

        return new HealthCheckResultResponse()
        {
            Status = HealthStatus.Unhealthy,
            LastCheckDuration = checkTimer.Elapsed,
            LastCheckTime = DateTime.Now,
            ErrorMessage = lastError?.Message,
        };
    }

    private async Task<bool> CheckEndpointAsync(
        HealthCheckResponse healthCheck,
        CancellationToken cancellationToken
    )
    {
        return healthCheck.Protocol switch
        {
            ProtocolType.Http or ProtocolType.Https => await CheckHttpAsync(
                healthCheck,
                cancellationToken
            ),
            ProtocolType.Tcp => CheckTcp(healthCheck),
            _ => throw new NotSupportedException($"协议 {healthCheck.Protocol} 不支持"),
        };
    }

    private async Task<bool> CheckHttpAsync(
        HealthCheckResponse healthCheck,
        CancellationToken cancellationToken
    )
    {
        var response = await _httpClient.GetAsync(healthCheck.Endpoint, cancellationToken);
        return response.IsSuccessStatusCode;
    }

    private static bool CheckTcp(HealthCheckResponse healthCheck)
    {
        var parts = healthCheck.Endpoint.Split(':');
        using var tcpClient = new TcpClient();
        return tcpClient.ConnectAsync(parts[0], int.Parse(parts[1])).Wait(healthCheck.Timeout);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于MediatR模式的健康检查处理器,用于定期监控各种端点的可用性。让我详细解释代码的结构和功能:

类的职责

HealthCheckHandler 实现了 IRequestHandler<HealthCheckRequest>,负责处理健康检查任务,支持HTTP/HTTPS和TCP协议的端点监控。

构造函数依赖

  • IRepository<HealthCheck>: 用于访问健康检查配置数据
  • IMapper: 用于对象映射(Entity到DTO)
  • IHttpClientFactory: 创建HTTP客户端进行HTTP检查

核心方法分析

1. Handle 方法 - 主要处理逻辑

public async ValueTask<Unit> Handle(HealthCheckRequest request, CancellationToken cancellationToken)
  • 使用懒加载模式缓存健康检查配置列表
  • 遍历每个配置项,检查是否到了执行检查的时间
  • 如果满足时间间隔要求,执行检查并更新结果

2. CheckAsync 方法 - 执行具体检查

private async Task<HealthCheckResultResponse> CheckAsync(HealthCheckResponse healthCheck)
  • 实现重试机制:失败时最多重试指定次数
  • 使用 Stopwatch 测量检查耗时
  • 捕获异常并在重试间隔添加延迟(1秒)
  • 返回包含状态、耗时、时间和错误信息的结果

3. CheckEndpointAsync 方法 - 协议路由

private async Task<bool> CheckEndpointAsync(HealthCheckResponse healthCheck, CancellationToken cancellationToken)

使用模式匹配根据协议类型分发到具体检查方法:

  • HTTP/HTTPS → CheckHttpAsync
  • TCP → CheckTcp

4. 协议特定检查方法

HTTP检查:

private async Task<bool> CheckHttpAsync(HealthCheckResponse healthCheck, CancellationToken cancellationToken)
  • 发送GET请求到指定端点
  • 通过 IsSuccessStatusCode 判断健康状态

TCP检查:

private static bool CheckTcp(HealthCheckResponse healthCheck)
  • 解析端点地址(host:port格式)
  • 创建TCP连接并在超时时间内等待连接结果

设计特点

  1. 懒加载缓存:使用 Lazy<> 避免重复从数据库加载配置
  2. 超时控制:每次检查都有独立的超时设置
  3. 重试机制:支持配置重试次数,提高检查可靠性
  4. 协议扩展性:通过模式匹配易于添加新协议支持
  5. 性能监控:记录每次检查的执行时间

潜在改进点

  • TCP检查使用了阻塞式 Wait(),可能影响异步性能
  • 缓存更新机制可以更智能,支持配置变更时的自动刷新
  • 可以考虑添加更多协议支持(如UDP、ICMP等)

这个实现提供了一个完整的健康检查框架,适用于微服务架构中的服务监控场景。

评论加载中...