网站首页 网站源码
using Dpz.Core.Service.Network.Models;
namespace Dpz.Core.Service.Network;
/// <summary>
/// Chat Completion API 服务,用于调用通用的AI ChatCompletion接口
/// 支持 ChatGPT、Claude、Gemini、DeepSeek 等多个AI提供商
/// </summary>
public class ChatCompletionService(
IConfiguration configuration,
ILogger<ChatCompletionService> logger,
HttpClient httpClient
)
{
private readonly Lazy<string> _host = new(() =>
configuration["ChatCompletionHost"]
?? throw new InvalidConfigurationException("ChatCompletionHost configuration is missing")
);
private readonly Lazy<string> _apiKey = new(() =>
configuration["ChatCompletionApiKey"]
?? throw new InvalidConfigurationException("ChatCompletionApiKey configuration is missing")
);
/// <summary>
/// 发送聊天消息
/// 流式或非流式响应取决于ChatCompletionOption中的Stream属性
/// 如果是流式响应,返回原始流式数据;如果是非流式,返回解析后的ChatCompletionResponse
/// </summary>
/// <param name="messages">消息列表</param>
/// <param name="options">API请求选项配置</param>
/// <returns>API响应结果,根据是否流式返回不同类型</returns>
public async Task<ResponseResult<ChatCompletionResponse?>> SendChatAsync(
List<ChatMessage> messages,
Action<ChatCompletionOption>? options = null
)
{
if (messages is not { Count: > 0 })
{
logger.LogWarning("Message list is empty");
return new ResponseResult<ChatCompletionResponse?>().FailResult(
"Messages cannot be empty"
);
}
logger.LogInformation("Starting chat request with {MessageCount} messages", messages.Count);
var callOption =
configuration.GetSection("DefaultChatCompletionOption").Get<ChatCompletionOption>()
?? new ChatCompletionOption();
options?.Invoke(callOption);
// 根据配置决定是否使用流式响应
if (callOption.Stream)
{
return await SendStreamChatAsync(messages, callOption);
}
return await SendRequestAsync(messages, callOption);
}
/// <summary>
/// 发送流式聊天请求并返回解析后的响应(一次性读取完整内容后再拆分)
/// </summary>
private async Task<ResponseResult<ChatCompletionResponse?>> SendStreamChatAsync(
List<ChatMessage> messages,
ChatCompletionOption option
)
{
var result = new ResponseResult<ChatCompletionResponse?>();
try
{
var rawResponse = await SendRawRequestAsync(messages, option);
if (!rawResponse.Success)
{
logger.LogWarning("Stream chat request failed: {Message}", rawResponse.Message);
return result.FailResult(rawResponse.Message ?? "Request failed");
}
var content = rawResponse.Data;
if (string.IsNullOrEmpty(content))
{
logger.LogWarning("Stream response content is empty");
return result.FailResult("Response content is empty");
}
// 解析流式响应
var parsedResponse = ParseStreamResponse(content);
if (parsedResponse == null)
{
logger.LogWarning("Failed to parse stream response");
return result.FailResult("Failed to parse stream response");
}
logger.LogInformation(
"Stream chat completed - ID: {ResponseId}, Model: {Model}",
parsedResponse.Id,
parsedResponse.Model
);
return result.SuccessResult(parsedResponse);
}
catch (Exception e)
{
logger.LogError(e, "Stream chat request exception");
return result.FailResult($"Exception: {e.Message}");
}
}
/// <summary>
/// 解析流式响应,将所有块合并为单个响应
/// </summary>
private ChatCompletionResponse? ParseStreamResponse(string rawContent)
{
try
{
var lines = rawContent.Split(["\n", "\r\n"], StringSplitOptions.RemoveEmptyEntries);
var parser = new StreamResponseParser(logger);
return parser.Parse(lines);
}
catch (Exception e)
{
logger.LogError(e, "Parse stream response exception");
return null;
}
}
/// <summary>
/// 发送请求并返回解析后的响应
/// </summary>
private async Task<ResponseResult<ChatCompletionResponse?>> SendRequestAsync(
List<ChatMessage> messages,
ChatCompletionOption option
)
{
var result = new ResponseResult<ChatCompletionResponse?>();
try
{
var rawResponse = await SendRawRequestAsync(messages, option);
if (!rawResponse.Success)
{
logger.LogWarning("Non-stream chat request failed: {Message}", rawResponse.Message);
return result.FailResult(rawResponse.Message ?? "Request failed");
}
var content = rawResponse.Data;
if (string.IsNullOrEmpty(content))
{
logger.LogWarning("Non-stream response content is empty");
return result.FailResult("Response content is empty");
}
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var response = JsonSerializer.Deserialize<ChatCompletionResponse>(content, jsonOptions);
if (response == null)
{
var truncatedContent = content[..Math.Min(200, content.Length)];
logger.LogWarning(
"Failed to deserialize response. Content (first 200 chars): {Content}",
truncatedContent
);
return result.FailResult("Failed to deserialize response");
}
logger.LogInformation(
"Non-stream chat completed - ID: {ResponseId}, Model: {Model}, Tokens: {PromptTokens}/{CompletionTokens}/{TotalTokens}",
response.Id,
response.Model,
response.Usage?.PromptTokens ?? 0,
response.Usage?.CompletionTokens ?? 0,
response.Usage?.TotalTokens ?? 0
);
return result.SuccessResult(response);
}
catch (Exception e)
{
logger.LogError(e, "Non-stream chat request exception");
return result.FailResult($"Exception: {e.Message}");
}
}
/// <summary>
/// 发送原始请求并返回未解析的响应内容(缓冲模式)
/// </summary>
private async Task<ResponseResult<string?>> SendRawRequestAsync(
List<ChatMessage> messages,
ChatCompletionOption option
)
{
var result = new ResponseResult<string?>();
var request = new HttpRequestMessage(HttpMethod.Post, $"{_host.Value}/v2/chat/completions");
request.Headers.Add("Authorization", $"Basic {_apiKey.Value}");
var requestBody = BuildRequestBody(messages, option);
var jsonOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
};
logger.LogDebug(
"Sending API request - URL: {Url}, Model: {Model}, Stream: {Stream}",
request.RequestUri,
((Dictionary<string, object?>)requestBody)["model"],
((Dictionary<string, object?>)requestBody)["stream"]
);
var jsonContent = JsonSerializer.Serialize(requestBody, jsonOptions);
request.Content = new StringContent(jsonContent, Encoding.UTF8, "application/json");
HttpResponseMessage response;
try
{
response = await httpClient.SendAsync(request);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync();
var truncatedError = errorContent[..Math.Min(500, errorContent.Length)];
logger.LogWarning(
"API call failed - Status: {StatusCode}, Error: {ErrorContent}",
response.StatusCode,
truncatedError
);
return result.FailResult(
$"API call failed with status {response.StatusCode}: {errorContent}"
);
}
}
catch (Exception e)
{
logger.LogError(e, "HTTP request exception");
return result.FailResult($"HTTP request exception: {e.Message}");
}
var content = await response.Content.ReadAsStringAsync();
logger.LogDebug(
"API response received - Content length: {ContentLength} chars",
content.Length
);
return result.SuccessResult(content);
}
/// <summary>
/// 构建请求体,只包含有值的字段
/// </summary>
private static object BuildRequestBody(List<ChatMessage> messages, ChatCompletionOption option)
{
// 当启用thinking功能时,temperature必须为1.0
var temperature = option.Temperature;
if (option.Thinking != null)
{
temperature = 1.0;
}
var body = new Dictionary<string, object?>
{
{ "messages", messages },
{ "model", AiModelMapper.GetModelString(option.Model) },
{ "stream", option.Stream },
{ "max_tokens", option.MaxTokens },
{ "temperature", temperature },
};
if (option.FrequencyPenalty.HasValue)
{
body["frequency_penalty"] = option.FrequencyPenalty;
}
if (option.TopP.HasValue)
{
body["top_p"] = option.TopP;
}
if (option.Thinking != null)
{
body["thinking"] = new
{
type = option.Thinking.Type,
budget_tokens = option.Thinking.BudgetTokens,
};
}
return body;
}
/// <summary>
/// 发送流式聊天请求,通过回调逐步推送内容增量
/// </summary>
/// <param name="messages">消息列表</param>
/// <param name="onDeltaContent">当收到内容增量时的回调</param>
/// <param name="options">API请求选项配置</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>完整的API响应结果</returns>
public async Task<ResponseResult<ChatCompletionResponse?>> SendStreamChatWithCallbackAsync(
List<ChatMessage> messages,
Func<string, Task> onDeltaContent,
Action<ChatCompletionOption>? options = null,
CancellationToken cancellationToken = default
)
{
if (messages is not { Count: > 0 })
{
logger.LogWarning("Message list is empty");
return new ResponseResult<ChatCompletionResponse?>().FailResult(
"Messages cannot be empty"
);
}
logger.LogInformation("Starting stream chat request with callback");
var callOption =
configuration.GetSection("DefaultChatCompletionOption").Get<ChatCompletionOption>()
?? new ChatCompletionOption();
options?.Invoke(callOption);
// 强制启用流式模式
callOption.Stream = true;
return await SendStreamChatHttpIncrementalAsync(messages, callOption, onDeltaContent, cancellationToken);
}
/// <summary>
/// HTTP增量读取实现:使用 ResponseHeadersRead 和逐行解析SSE
/// </summary>
private async Task<ResponseResult<ChatCompletionResponse?>> SendStreamChatHttpIncrementalAsync(
List<ChatMessage> messages,
ChatCompletionOption option,
Func<string, Task> onDeltaContent,
CancellationToken cancellationToken = default
)
{
var result = new ResponseResult<ChatCompletionResponse?>();
// 在开始前检查取消
cancellationToken.ThrowIfCancellationRequested();
var request = new HttpRequestMessage(HttpMethod.Post, $"{_host.Value}/v2/chat/completions");
request.Headers.Add("Authorization", $"Basic {_apiKey.Value}");
var requestBody = BuildRequestBody(messages, option);
var jsonOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
};
var jsonContent = JsonSerializer.Serialize(requestBody, jsonOptions);
request.Content = new StringContent(jsonContent, Encoding.UTF8, "application/json");
HttpResponseMessage response;
try
{
response = await httpClient.SendAsync(
request,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken
);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync(cancellationToken);
var truncatedError = errorContent[..Math.Min(300, errorContent.Length)];
logger.LogWarning(
"Streaming API call failed - Status: {StatusCode}, Error: {Error}",
response.StatusCode,
truncatedError
);
return result.FailResult(
$"API call failed with status {response.StatusCode}: {truncatedError}"
);
}
}
catch (OperationCanceledException)
{
logger.LogInformation("Streaming HTTP request was cancelled");
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Streaming HTTP request exception");
return result.FailResult($"HTTP request exception: {ex.Message}");
}
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var reader = new StreamReader(stream, Encoding.UTF8);
var accumulator = new StreamAccumulator();
// 累积同一事件的多行 data:
var sbEvent = new StringBuilder();
var jsonDeserializeOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
};
async Task ProcessEventAsync()
{
// 在处理事件前检查取消
cancellationToken.ThrowIfCancellationRequested();
if (sbEvent.Length == 0)
{
return;
}
var jsonStr = sbEvent.ToString().Trim();
sbEvent.Clear();
if (string.IsNullOrEmpty(jsonStr))
{
return;
}
if (jsonStr == "[DONE]")
{
// 正常结束标记
return;
}
StreamCompletionChunk? chunk = null;
try
{
chunk = JsonSerializer.Deserialize<StreamCompletionChunk>(
jsonStr,
jsonDeserializeOptions
);
}
catch (JsonException je)
{
var truncated = jsonStr[..Math.Min(120, jsonStr.Length)];
logger.LogWarning(je, "Failed to deserialize streaming chunk: {Chunk}", truncated);
}
if (chunk == null)
{
return;
}
if (!accumulator.IsInitialized)
{
accumulator.Initialize(chunk);
}
var delta = accumulator.AccumulateChunk(chunk);
if (!string.IsNullOrEmpty(delta))
{
// 在回调前再次检查取消
cancellationToken.ThrowIfCancellationRequested();
try
{
await onDeltaContent(delta);
}
catch (OperationCanceledException)
{
// 回调中的取消操作需要向上传播
throw;
}
catch (Exception callbackEx)
{
logger.LogWarning(callbackEx, "Delta callback threw an exception");
}
}
}
try
{
string? line;
while ((line = await reader.ReadLineAsync(cancellationToken)) != null)
{
// 在每次循环开始时立即检查取消令牌
cancellationToken.ThrowIfCancellationRequested();
if (string.IsNullOrEmpty(line))
{
// 事件结束分隔(空行)
await ProcessEventAsync();
continue;
}
// SSE注释行或非data行直接忽略
if (line.StartsWith(":"))
{
continue;
}
if (!line.StartsWith("data:"))
{
// 忽略非data行
continue;
}
var payload = line["data:".Length..].TrimStart();
// 结束标记可立即处理(清空缓冲并跳出)
if (payload == "[DONE]")
{
// 先处理已有累积块
await ProcessEventAsync();
break;
}
// 累积当前事件的 data 行(可能多行)
sbEvent.Append(payload);
}
// 处理可能剩余的事件
await ProcessEventAsync();
}
catch (OperationCanceledException)
{
logger.LogInformation("Stream processing was cancelled");
throw;
}
var final = accumulator.BuildFinalResponse();
if (final == null)
{
return result.FailResult("Failed to build final streaming response");
}
logger.LogInformation(
"Streaming chat completed - ID: {Id}, Model: {Model}",
final.Id,
final.Model
);
return result.SuccessResult(final);
}
}
这个 ChatCompletionService 是一个用于与多种AI聊天模型(如ChatGPT、Claude、Gemini、DeepSeek等)交互的服务类,提供了标准化的聊天完成(chat completion)API接口。
统一接口支持多种AI模型:
核心方法:
SendChatAsync: 发送聊天消息,根据配置决定使用流式或非流式响应SendStreamChatWithCallbackAsync: 发送流式聊天请求并通过回调函数实时处理增量内容配置管理:
请求构建:
响应处理:
错误处理:
性能优化:
这个服务适用于需要与多种AI聊天模型交互的应用,特别是需要支持以下功能的场景:
服务通过统一的接口封装了不同AI提供商的细节,使上层应用可以专注于业务逻辑而非底层通信细节。
