网站首页 网站源码
website
站点相关全部源代码,隐藏了一些关于服务器的信息
using Dpz.Core.Service.Network.Models;

namespace Dpz.Core.Service.Network;

/// <summary>
/// Chat Completion API 服务,用于调用通用的AI ChatCompletion接口
/// 支持 ChatGPT、Claude、Gemini、DeepSeek 等多个AI提供商
/// </summary>
public class ChatCompletionService(
    IConfiguration configuration,
    ILogger<ChatCompletionService> logger,
    HttpClient httpClient
)
{
    private readonly Lazy<string> _host = new(() =>
        configuration["ChatCompletionHost"]
        ?? throw new InvalidConfigurationException("ChatCompletionHost configuration is missing")
    );

    private readonly Lazy<string> _apiKey = new(() =>
        configuration["ChatCompletionApiKey"]
        ?? throw new InvalidConfigurationException("ChatCompletionApiKey configuration is missing")
    );

    /// <summary>
    /// 发送聊天消息
    /// 流式或非流式响应取决于ChatCompletionOption中的Stream属性
    /// 如果是流式响应,返回原始流式数据;如果是非流式,返回解析后的ChatCompletionResponse
    /// </summary>
    /// <param name="messages">消息列表</param>
    /// <param name="options">API请求选项配置</param>
    /// <returns>API响应结果,根据是否流式返回不同类型</returns>
    public async Task<ResponseResult<ChatCompletionResponse?>> SendChatAsync(
        List<ChatMessage> messages,
        Action<ChatCompletionOption>? options = null
    )
    {
        if (messages is not { Count: > 0 })
        {
            logger.LogWarning("Message list is empty");
            return new ResponseResult<ChatCompletionResponse?>().FailResult(
                "Messages cannot be empty"
            );
        }

        logger.LogInformation("Starting chat request with {MessageCount} messages", messages.Count);

        var callOption =
            configuration.GetSection("DefaultChatCompletionOption").Get<ChatCompletionOption>()
            ?? new ChatCompletionOption();

        options?.Invoke(callOption);

        // 根据配置决定是否使用流式响应
        if (callOption.Stream)
        {
            return await SendStreamChatAsync(messages, callOption);
        }

        return await SendRequestAsync(messages, callOption);
    }

    /// <summary>
    /// 发送流式聊天请求并返回解析后的响应(一次性读取完整内容后再拆分)
    /// </summary>
    private async Task<ResponseResult<ChatCompletionResponse?>> SendStreamChatAsync(
        List<ChatMessage> messages,
        ChatCompletionOption option
    )
    {
        var result = new ResponseResult<ChatCompletionResponse?>();

        try
        {
            var rawResponse = await SendRawRequestAsync(messages, option);
            if (!rawResponse.Success)
            {
                logger.LogWarning("Stream chat request failed: {Message}", rawResponse.Message);
                return result.FailResult(rawResponse.Message ?? "Request failed");
            }

            var content = rawResponse.Data;
            if (string.IsNullOrEmpty(content))
            {
                logger.LogWarning("Stream response content is empty");
                return result.FailResult("Response content is empty");
            }

            // 解析流式响应
            var parsedResponse = ParseStreamResponse(content);

            if (parsedResponse == null)
            {
                logger.LogWarning("Failed to parse stream response");
                return result.FailResult("Failed to parse stream response");
            }

            logger.LogInformation(
                "Stream chat completed - ID: {ResponseId}, Model: {Model}",
                parsedResponse.Id,
                parsedResponse.Model
            );
            return result.SuccessResult(parsedResponse);
        }
        catch (Exception e)
        {
            logger.LogError(e, "Stream chat request exception");
            return result.FailResult($"Exception: {e.Message}");
        }
    }

    /// <summary>
    /// 解析流式响应,将所有块合并为单个响应
    /// </summary>
    private ChatCompletionResponse? ParseStreamResponse(string rawContent)
    {
        try
        {
            var lines = rawContent.Split(["\n", "\r\n"], StringSplitOptions.RemoveEmptyEntries);
            var parser = new StreamResponseParser(logger);
            return parser.Parse(lines);
        }
        catch (Exception e)
        {
            logger.LogError(e, "Parse stream response exception");
            return null;
        }
    }

    /// <summary>
    /// 发送请求并返回解析后的响应
    /// </summary>
    private async Task<ResponseResult<ChatCompletionResponse?>> SendRequestAsync(
        List<ChatMessage> messages,
        ChatCompletionOption option
    )
    {
        var result = new ResponseResult<ChatCompletionResponse?>();

        try
        {
            var rawResponse = await SendRawRequestAsync(messages, option);
            if (!rawResponse.Success)
            {
                logger.LogWarning("Non-stream chat request failed: {Message}", rawResponse.Message);
                return result.FailResult(rawResponse.Message ?? "Request failed");
            }

            var content = rawResponse.Data;
            if (string.IsNullOrEmpty(content))
            {
                logger.LogWarning("Non-stream response content is empty");
                return result.FailResult("Response content is empty");
            }

            var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
            var response = JsonSerializer.Deserialize<ChatCompletionResponse>(content, jsonOptions);

            if (response == null)
            {
                var truncatedContent = content[..Math.Min(200, content.Length)];
                logger.LogWarning(
                    "Failed to deserialize response. Content (first 200 chars): {Content}",
                    truncatedContent
                );
                return result.FailResult("Failed to deserialize response");
            }

            logger.LogInformation(
                "Non-stream chat completed - ID: {ResponseId}, Model: {Model}, Tokens: {PromptTokens}/{CompletionTokens}/{TotalTokens}",
                response.Id,
                response.Model,
                response.Usage?.PromptTokens ?? 0,
                response.Usage?.CompletionTokens ?? 0,
                response.Usage?.TotalTokens ?? 0
            );
            return result.SuccessResult(response);
        }
        catch (Exception e)
        {
            logger.LogError(e, "Non-stream chat request exception");
            return result.FailResult($"Exception: {e.Message}");
        }
    }

    /// <summary>
    /// 发送原始请求并返回未解析的响应内容(缓冲模式)
    /// </summary>
    private async Task<ResponseResult<string?>> SendRawRequestAsync(
        List<ChatMessage> messages,
        ChatCompletionOption option
    )
    {
        var result = new ResponseResult<string?>();
        var request = new HttpRequestMessage(HttpMethod.Post, $"{_host.Value}/v2/chat/completions");

        request.Headers.Add("Authorization", $"Basic {_apiKey.Value}");

        var requestBody = BuildRequestBody(messages, option);
        var jsonOptions = new JsonSerializerOptions
        {
            DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
        };

        logger.LogDebug(
            "Sending API request - URL: {Url}, Model: {Model}, Stream: {Stream}",
            request.RequestUri,
            ((Dictionary<string, object?>)requestBody)["model"],
            ((Dictionary<string, object?>)requestBody)["stream"]
        );

        var jsonContent = JsonSerializer.Serialize(requestBody, jsonOptions);
        request.Content = new StringContent(jsonContent, Encoding.UTF8, "application/json");

        HttpResponseMessage response;
        try
        {
            response = await httpClient.SendAsync(request);
            if (!response.IsSuccessStatusCode)
            {
                var errorContent = await response.Content.ReadAsStringAsync();
                var truncatedError = errorContent[..Math.Min(500, errorContent.Length)];
                logger.LogWarning(
                    "API call failed - Status: {StatusCode}, Error: {ErrorContent}",
                    response.StatusCode,
                    truncatedError
                );
                return result.FailResult(
                    $"API call failed with status {response.StatusCode}: {errorContent}"
                );
            }
        }
        catch (Exception e)
        {
            logger.LogError(e, "HTTP request exception");
            return result.FailResult($"HTTP request exception: {e.Message}");
        }

        var content = await response.Content.ReadAsStringAsync();
        logger.LogDebug(
            "API response received - Content length: {ContentLength} chars",
            content.Length
        );

        return result.SuccessResult(content);
    }

    /// <summary>
    /// 构建请求体,只包含有值的字段
    /// </summary>
    private static object BuildRequestBody(List<ChatMessage> messages, ChatCompletionOption option)
    {
        // 当启用thinking功能时,temperature必须为1.0
        var temperature = option.Temperature;
        if (option.Thinking != null)
        {
            temperature = 1.0;
        }

        var body = new Dictionary<string, object?>
        {
            { "messages", messages },
            { "model", AiModelMapper.GetModelString(option.Model) },
            { "stream", option.Stream },
            { "max_tokens", option.MaxTokens },
            { "temperature", temperature },
        };

        if (option.FrequencyPenalty.HasValue)
        {
            body["frequency_penalty"] = option.FrequencyPenalty;
        }

        if (option.TopP.HasValue)
        {
            body["top_p"] = option.TopP;
        }

        if (option.Thinking != null)
        {
            body["thinking"] = new
            {
                type = option.Thinking.Type,
                budget_tokens = option.Thinking.BudgetTokens,
            };
        }

        return body;
    }

    /// <summary>
    /// 发送流式聊天请求,通过回调逐步推送内容增量
    /// </summary>
    /// <param name="messages">消息列表</param>
    /// <param name="onDeltaContent">当收到内容增量时的回调</param>
    /// <param name="options">API请求选项配置</param>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns>完整的API响应结果</returns>
    public async Task<ResponseResult<ChatCompletionResponse?>> SendStreamChatWithCallbackAsync(
        List<ChatMessage> messages,
        Func<string, Task> onDeltaContent,
        Action<ChatCompletionOption>? options = null,
        CancellationToken cancellationToken = default
    )
    {
        if (messages is not { Count: > 0 })
        {
            logger.LogWarning("Message list is empty");
            return new ResponseResult<ChatCompletionResponse?>().FailResult(
                "Messages cannot be empty"
            );
        }

        logger.LogInformation("Starting stream chat request with callback");

        var callOption =
            configuration.GetSection("DefaultChatCompletionOption").Get<ChatCompletionOption>()
            ?? new ChatCompletionOption();

        options?.Invoke(callOption);

        // 强制启用流式模式
        callOption.Stream = true;

        return await SendStreamChatHttpIncrementalAsync(messages, callOption, onDeltaContent, cancellationToken);
    }

    /// <summary>
    /// HTTP增量读取实现:使用 ResponseHeadersRead 和逐行解析SSE
    /// </summary>
    private async Task<ResponseResult<ChatCompletionResponse?>> SendStreamChatHttpIncrementalAsync(
        List<ChatMessage> messages,
        ChatCompletionOption option,
        Func<string, Task> onDeltaContent,
        CancellationToken cancellationToken = default
    )
    {
        var result = new ResponseResult<ChatCompletionResponse?>();
        
        // 在开始前检查取消
        cancellationToken.ThrowIfCancellationRequested();
        
        var request = new HttpRequestMessage(HttpMethod.Post, $"{_host.Value}/v2/chat/completions");
        request.Headers.Add("Authorization", $"Basic {_apiKey.Value}");

        var requestBody = BuildRequestBody(messages, option);
        var jsonOptions = new JsonSerializerOptions
        {
            DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
        };
        var jsonContent = JsonSerializer.Serialize(requestBody, jsonOptions);
        request.Content = new StringContent(jsonContent, Encoding.UTF8, "application/json");

        HttpResponseMessage response;
        try
        {
            response = await httpClient.SendAsync(
                request,
                HttpCompletionOption.ResponseHeadersRead,
                cancellationToken
            );
            if (!response.IsSuccessStatusCode)
            {
                var errorContent = await response.Content.ReadAsStringAsync(cancellationToken);
                var truncatedError = errorContent[..Math.Min(300, errorContent.Length)];
                logger.LogWarning(
                    "Streaming API call failed - Status: {StatusCode}, Error: {Error}",
                    response.StatusCode,
                    truncatedError
                );
                return result.FailResult(
                    $"API call failed with status {response.StatusCode}: {truncatedError}"
                );
            }
        }
        catch (OperationCanceledException)
        {
            logger.LogInformation("Streaming HTTP request was cancelled");
            throw;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Streaming HTTP request exception");
            return result.FailResult($"HTTP request exception: {ex.Message}");
        }

        await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
        using var reader = new StreamReader(stream, Encoding.UTF8);

        var accumulator = new StreamAccumulator();
        // 累积同一事件的多行 data:
        var sbEvent = new StringBuilder();
        var jsonDeserializeOptions = new JsonSerializerOptions
        {
            PropertyNameCaseInsensitive = true,
        };

        async Task ProcessEventAsync()
        {
            // 在处理事件前检查取消
            cancellationToken.ThrowIfCancellationRequested();
            
            if (sbEvent.Length == 0)
            {
                return;
            }

            var jsonStr = sbEvent.ToString().Trim();
            sbEvent.Clear();
            if (string.IsNullOrEmpty(jsonStr))
            {
                return;
            }

            if (jsonStr == "[DONE]")
            {
                // 正常结束标记
                return;
            }

            StreamCompletionChunk? chunk = null;
            try
            {
                chunk = JsonSerializer.Deserialize<StreamCompletionChunk>(
                    jsonStr,
                    jsonDeserializeOptions
                );
            }
            catch (JsonException je)
            {
                var truncated = jsonStr[..Math.Min(120, jsonStr.Length)];
                logger.LogWarning(je, "Failed to deserialize streaming chunk: {Chunk}", truncated);
            }

            if (chunk == null)
            {
                return;
            }

            if (!accumulator.IsInitialized)
            {
                accumulator.Initialize(chunk);
            }

            var delta = accumulator.AccumulateChunk(chunk);
            if (!string.IsNullOrEmpty(delta))
            {
                // 在回调前再次检查取消
                cancellationToken.ThrowIfCancellationRequested();
                
                try
                {
                    await onDeltaContent(delta);
                }
                catch (OperationCanceledException)
                {
                    // 回调中的取消操作需要向上传播
                    throw;
                }
                catch (Exception callbackEx)
                {
                    logger.LogWarning(callbackEx, "Delta callback threw an exception");
                }
            }
        }

        try
        {
            string? line;
            while ((line = await reader.ReadLineAsync(cancellationToken)) != null)
            {
                // 在每次循环开始时立即检查取消令牌
                cancellationToken.ThrowIfCancellationRequested();

                if (string.IsNullOrEmpty(line))
                {
                    // 事件结束分隔(空行)
                    await ProcessEventAsync();
                    continue;
                }

                // SSE注释行或非data行直接忽略
                if (line.StartsWith(":"))
                {
                    continue;
                }

                if (!line.StartsWith("data:"))
                {
                    // 忽略非data行
                    continue;
                }

                var payload = line["data:".Length..].TrimStart();

                // 结束标记可立即处理(清空缓冲并跳出)
                if (payload == "[DONE]")
                {
                    // 先处理已有累积块
                    await ProcessEventAsync();
                    break;
                }

                // 累积当前事件的 data 行(可能多行)
                sbEvent.Append(payload);
            }

            // 处理可能剩余的事件
            await ProcessEventAsync();
        }
        catch (OperationCanceledException)
        {
            logger.LogInformation("Stream processing was cancelled");
            throw;
        }

        var final = accumulator.BuildFinalResponse();
        if (final == null)
        {
            return result.FailResult("Failed to build final streaming response");
        }

        logger.LogInformation(
            "Streaming chat completed - ID: {Id}, Model: {Model}",
            final.Id,
            final.Model
        );
        return result.SuccessResult(final);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

ChatCompletionService 代码分析

这个 ChatCompletionService 是一个用于与多种AI聊天模型(如ChatGPT、Claude、Gemini、DeepSeek等)交互的服务类,提供了标准化的聊天完成(chat completion)API接口。

主要功能

  1. 统一接口支持多种AI模型

    • 通过配置可以连接不同的AI提供商
    • 支持流式和非流式两种响应模式
  2. 核心方法

    • SendChatAsync: 发送聊天消息,根据配置决定使用流式或非流式响应
    • SendStreamChatWithCallbackAsync: 发送流式聊天请求并通过回调函数实时处理增量内容
  3. 配置管理

    • 从配置中读取API主机地址和密钥
    • 支持默认选项配置

技术实现细节

  1. 请求构建

    • 动态构建请求体,只包含有值的字段
    • 支持多种参数:温度(temperature)、top_p、最大令牌数(max_tokens)等
    • 特殊支持"thinking"功能(需要temperature=1.0)
  2. 响应处理

    • 非流式响应:一次性接收完整响应并反序列化
    • 流式响应
      • 一次性读取模式:完整读取后解析
      • 增量回调模式:实时处理SSE(Server-Sent Events)流
  3. 错误处理

    • 完善的日志记录
    • 统一的错误响应格式
    • 异常捕获和处理
  4. 性能优化

    • 使用HttpCompletionOption.ResponseHeadersRead实现真正的流式处理
    • 使用StringBuilder累积事件数据
    • 支持取消令牌(CancellationToken)

使用场景

这个服务适用于需要与多种AI聊天模型交互的应用,特别是需要支持以下功能的场景:

  • 实时聊天体验(使用流式响应)
  • 多模型切换
  • 复杂的聊天参数配置
  • 需要处理大量聊天交互的高性能应用

服务通过统一的接口封装了不同AI提供商的细节,使上层应用可以专注于业务逻辑而非底层通信细节。

loading