using Dpz.Core.Service.Network.Models;
namespace Dpz.Core.Service.Network;
/// <summary>
/// 流式响应解析器
/// 负责将SSE格式的流式数据解析为完整的ChatCompletionResponse
/// </summary>
internal class StreamResponseParser(ILogger<ChatCompletionService> logger)
{
private readonly JsonSerializerOptions _jsonOptions = new()
{
PropertyNameCaseInsensitive = true,
};
/// <summary>
/// 解析SSE格式的流式响应行
/// </summary>
/// <param name="lines">响应行集合</param>
/// <returns>解析后的ChatCompletionResponse,如果解析失败返回null</returns>
public ChatCompletionResponse? Parse(string[] lines)
{
var accumulator = new StreamAccumulator();
foreach (var line in lines)
{
if (!TryParseLine(line, accumulator)) { }
}
return accumulator.BuildFinalResponse();
}
/// <summary>
/// 解析SSE格式的流式响应行,并通过回调推送增量内容
/// </summary>
/// <param name="lines">响应行集合</param>
/// <param name="onDeltaContent">当收到新增内容时的回调,参数为新增的内容片段</param>
/// <returns>解析后的ChatCompletionResponse,如果解析失败返回null</returns>
public ChatCompletionResponse? ParseWithCallback(string[] lines, Action<string> onDeltaContent)
{
var accumulator = new StreamAccumulator();
foreach (var line in lines)
{
if (!TryParseLineWithCallback(line, accumulator, onDeltaContent)) { }
}
return accumulator.BuildFinalResponse();
}
/// <summary>
/// 解析SSE格式的流式响应行,并通过异步回调推送增量内容
/// </summary>
public async Task<ChatCompletionResponse?> ParseWithCallbackAsync(
string[] lines,
Func<string, Task> onDeltaContent
)
{
var accumulator = new StreamAccumulator();
foreach (var line in lines)
{
if (!await TryParseLineWithCallbackAsync(line, accumulator, onDeltaContent)) { }
}
return accumulator.BuildFinalResponse();
}
/// <summary>
/// 尝试解析单行数据
/// </summary>
private bool TryParseLine(string line, StreamAccumulator accumulator)
{
if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:"))
{
return false;
}
var jsonStr = line["data:".Length..].Trim();
// SSE规范中的结束标记
if (jsonStr == "[DONE]")
{
return false;
}
if (string.IsNullOrEmpty(jsonStr))
{
return false;
}
return TryDeserializeChunk(jsonStr, accumulator, null);
}
/// <summary>
/// 尝试解析单行数据并推送增量内容
/// </summary>
private bool TryParseLineWithCallback(
string line,
StreamAccumulator accumulator,
Action<string> onDeltaContent
)
{
if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:"))
{
return false;
}
var jsonStr = line["data:".Length..].Trim();
// SSE规范中的结束标记
if (jsonStr == "[DONE]")
{
return false;
}
if (string.IsNullOrEmpty(jsonStr))
{
return false;
}
return TryDeserializeChunk(jsonStr, accumulator, onDeltaContent);
}
/// <summary>
/// 尝试解析单行数据并推送增量内容(异步版本)
/// </summary>
private async Task<bool> TryParseLineWithCallbackAsync(
string line,
StreamAccumulator accumulator,
Func<string, Task> onDeltaContent
)
{
if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:"))
{
return false;
}
var jsonStr = line["data:".Length..].Trim();
// SSE规范中的结束标记
if (jsonStr == "[DONE]")
{
return false;
}
if (string.IsNullOrEmpty(jsonStr))
{
return false;
}
return await TryDeserializeChunkAsync(jsonStr, accumulator, onDeltaContent);
}
/// <summary>
/// 尝试反序列化流式块
/// </summary>
private bool TryDeserializeChunk(
string jsonStr,
StreamAccumulator accumulator,
Action<string>? onDeltaContent = null
)
{
try
{
var chunk = JsonSerializer.Deserialize<StreamCompletionChunk>(jsonStr, _jsonOptions);
if (chunk == null)
{
return false;
}
// 初始化响应(使用第一个块的元数据)
if (!accumulator.IsInitialized)
{
accumulator.Initialize(chunk);
}
// 累积内容和使用情况,并提取增量内容
var deltaContent = accumulator.AccumulateChunk(chunk);
// 如果有新增内容且提供了回调,则调用回调
if (!string.IsNullOrEmpty(deltaContent) && onDeltaContent != null)
{
onDeltaContent(deltaContent);
}
return true;
}
catch (JsonException ex)
{
var truncatedJson = jsonStr[..Math.Min(100, jsonStr.Length)];
logger.LogWarning(ex, "Failed to deserialize stream chunk: {JsonStr}", truncatedJson);
return false;
}
}
/// <summary>
/// 尝试反序列化流式块(异步版本)
/// </summary>
private async Task<bool> TryDeserializeChunkAsync(
string jsonStr,
StreamAccumulator accumulator,
Func<string, Task> onDeltaContent
)
{
try
{
var chunk = JsonSerializer.Deserialize<StreamCompletionChunk>(jsonStr, _jsonOptions);
if (chunk == null)
{
return false;
}
// 初始化响应(使用第一个块的元数据)
if (!accumulator.IsInitialized)
{
accumulator.Initialize(chunk);
}
// 累积内容和使用情况,并提取增量内容
var deltaContent = accumulator.AccumulateChunk(chunk);
// 如果有新增内容,则异步调用回调
if (!string.IsNullOrEmpty(deltaContent))
{
await onDeltaContent(deltaContent);
}
return true;
}
catch (JsonException ex)
{
var truncatedJson = jsonStr[..Math.Min(100, jsonStr.Length)];
logger.LogWarning(ex, "Failed to deserialize stream chunk: {JsonStr}", truncatedJson);
return false;
}
}
}
/// <summary>
/// 流式响应累积器
/// 负责累积多个流式块的数据,最终组合为完整响应
/// </summary>
internal class StreamAccumulator
{
private ChatCompletionResponse? _response;
private readonly StringBuilder _contentBuilder = new();
private readonly StringBuilder _reasoningBuilder = new();
private bool _initialized;
public bool IsInitialized => _initialized;
/// <summary>
/// 使用第一个块的元数据初始化响应
/// </summary>
public void Initialize(StreamCompletionChunk chunk)
{
_response = new ChatCompletionResponse
{
Id = chunk.Id,
Object = chunk.Object,
Created = chunk.Created,
Model = chunk.Model,
Provider = chunk.Provider,
Choices = BuildInitialChoices(chunk),
Usage = chunk.Usage,
};
_initialized = true;
}
/// <summary>
/// 从流式块构建初始的选择项
/// </summary>
private static List<CompletionChoice> BuildInitialChoices(StreamCompletionChunk chunk)
{
return chunk
.Choices.Select(c => new CompletionChoice
{
Index = c.Index,
Message = new CompletionMessage
{
Role = c.Delta.Role ?? "assistant",
Content = string.Empty,
},
LogProbs = c.LogProbs,
FinishReason = c.FinishReason ?? string.Empty,
})
.ToList();
}
/// <summary>
/// 累积单个流式块的数据,返回本次新增的内容
/// </summary>
/// <returns>本次累积产生的新增内容(delta)</returns>
public string AccumulateChunk(StreamCompletionChunk chunk)
{
if (_response == null || _response.Choices.Count == 0)
{
return string.Empty;
}
if (chunk.Choices.Count == 0)
{
return string.Empty;
}
var choice = chunk.Choices[0];
var deltaContent = AccumulateContent(choice);
AccumulateFinishReason(choice);
AccumulateUsage(chunk);
return deltaContent;
}
/// <summary>
/// 累积内容和推理数据,返回本次新增内容
/// </summary>
private string AccumulateContent(StreamCompletionChoice choice)
{
var delta = new StringBuilder();
if (choice.Delta.Content != null)
{
_contentBuilder.Append(choice.Delta.Content);
delta.Append(choice.Delta.Content);
}
if (choice.Delta.Reasoning != null)
{
_reasoningBuilder.Append(choice.Delta.Reasoning);
}
return delta.ToString();
}
/// <summary>
/// 更新完成原因
/// </summary>
private void AccumulateFinishReason(StreamCompletionChoice choice)
{
if (!string.IsNullOrEmpty(choice.FinishReason) && _response != null)
{
_response.Choices[choice.Index].FinishReason = choice.FinishReason;
}
}
/// <summary>
/// 更新使用统计信息
/// </summary>
private void AccumulateUsage(StreamCompletionChunk chunk)
{
if (chunk.Usage != null && _response != null)
{
_response.Usage = chunk.Usage;
}
}
/// <summary>
/// 构建最终响应
/// </summary>
public ChatCompletionResponse? BuildFinalResponse()
{
if (_response?.Choices.Count > 0)
{
_response.Choices[0].Message.Content = _contentBuilder.ToString();
}
return _response;
}
}