网站首页 网站源码
website
站点相关全部源代码,隐藏了一些关于服务器的信息
using Dpz.Core.Service.Network.Models;

namespace Dpz.Core.Service.Network;

/// <summary>
/// 流式响应解析器
/// 负责将SSE格式的流式数据解析为完整的ChatCompletionResponse
/// </summary>
internal class StreamResponseParser(ILogger<ChatCompletionService> logger)
{
    private readonly JsonSerializerOptions _jsonOptions = new()
    {
        PropertyNameCaseInsensitive = true,
    };

    /// <summary>
    /// 解析SSE格式的流式响应行
    /// </summary>
    /// <param name="lines">响应行集合</param>
    /// <returns>解析后的ChatCompletionResponse,如果解析失败返回null</returns>
    public ChatCompletionResponse? Parse(string[] lines)
    {
        var accumulator = new StreamAccumulator();

        foreach (var line in lines)
        {
            if (!TryParseLine(line, accumulator)) { }
        }

        return accumulator.BuildFinalResponse();
    }

    /// <summary>
    /// 解析SSE格式的流式响应行,并通过回调推送增量内容
    /// </summary>
    /// <param name="lines">响应行集合</param>
    /// <param name="onDeltaContent">当收到新增内容时的回调,参数为新增的内容片段</param>
    /// <returns>解析后的ChatCompletionResponse,如果解析失败返回null</returns>
    public ChatCompletionResponse? ParseWithCallback(string[] lines, Action<string> onDeltaContent)
    {
        var accumulator = new StreamAccumulator();

        foreach (var line in lines)
        {
            if (!TryParseLineWithCallback(line, accumulator, onDeltaContent)) { }
        }

        return accumulator.BuildFinalResponse();
    }

    /// <summary>
    /// 解析SSE格式的流式响应行,并通过异步回调推送增量内容
    /// </summary>
    public async Task<ChatCompletionResponse?> ParseWithCallbackAsync(
        string[] lines,
        Func<string, Task> onDeltaContent
    )
    {
        var accumulator = new StreamAccumulator();

        foreach (var line in lines)
        {
            if (!await TryParseLineWithCallbackAsync(line, accumulator, onDeltaContent)) { }
        }

        return accumulator.BuildFinalResponse();
    }

    /// <summary>
    /// 尝试解析单行数据
    /// </summary>
    private bool TryParseLine(string line, StreamAccumulator accumulator)
    {
        if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:"))
        {
            return false;
        }

        var jsonStr = line["data:".Length..].Trim();

        // SSE规范中的结束标记
        if (jsonStr == "[DONE]")
        {
            return false;
        }

        if (string.IsNullOrEmpty(jsonStr))
        {
            return false;
        }

        return TryDeserializeChunk(jsonStr, accumulator, null);
    }

    /// <summary>
    /// 尝试解析单行数据并推送增量内容
    /// </summary>
    private bool TryParseLineWithCallback(
        string line,
        StreamAccumulator accumulator,
        Action<string> onDeltaContent
    )
    {
        if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:"))
        {
            return false;
        }

        var jsonStr = line["data:".Length..].Trim();

        // SSE规范中的结束标记
        if (jsonStr == "[DONE]")
        {
            return false;
        }

        if (string.IsNullOrEmpty(jsonStr))
        {
            return false;
        }

        return TryDeserializeChunk(jsonStr, accumulator, onDeltaContent);
    }

    /// <summary>
    /// 尝试解析单行数据并推送增量内容(异步版本)
    /// </summary>
    private async Task<bool> TryParseLineWithCallbackAsync(
        string line,
        StreamAccumulator accumulator,
        Func<string, Task> onDeltaContent
    )
    {
        if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:"))
        {
            return false;
        }

        var jsonStr = line["data:".Length..].Trim();

        // SSE规范中的结束标记
        if (jsonStr == "[DONE]")
        {
            return false;
        }

        if (string.IsNullOrEmpty(jsonStr))
        {
            return false;
        }

        return await TryDeserializeChunkAsync(jsonStr, accumulator, onDeltaContent);
    }

    /// <summary>
    /// 尝试反序列化流式块
    /// </summary>
    private bool TryDeserializeChunk(
        string jsonStr,
        StreamAccumulator accumulator,
        Action<string>? onDeltaContent = null
    )
    {
        try
        {
            var chunk = JsonSerializer.Deserialize<StreamCompletionChunk>(jsonStr, _jsonOptions);
            if (chunk == null)
            {
                return false;
            }

            // 初始化响应(使用第一个块的元数据)
            if (!accumulator.IsInitialized)
            {
                accumulator.Initialize(chunk);
            }

            // 累积内容和使用情况,并提取增量内容
            var deltaContent = accumulator.AccumulateChunk(chunk);

            // 如果有新增内容且提供了回调,则调用回调
            if (!string.IsNullOrEmpty(deltaContent) && onDeltaContent != null)
            {
                onDeltaContent(deltaContent);
            }

            return true;
        }
        catch (JsonException ex)
        {
            var truncatedJson = jsonStr[..Math.Min(100, jsonStr.Length)];
            logger.LogWarning(ex, "Failed to deserialize stream chunk: {JsonStr}", truncatedJson);
            return false;
        }
    }

    /// <summary>
    /// 尝试反序列化流式块(异步版本)
    /// </summary>
    private async Task<bool> TryDeserializeChunkAsync(
        string jsonStr,
        StreamAccumulator accumulator,
        Func<string, Task> onDeltaContent
    )
    {
        try
        {
            var chunk = JsonSerializer.Deserialize<StreamCompletionChunk>(jsonStr, _jsonOptions);
            if (chunk == null)
            {
                return false;
            }

            // 初始化响应(使用第一个块的元数据)
            if (!accumulator.IsInitialized)
            {
                accumulator.Initialize(chunk);
            }

            // 累积内容和使用情况,并提取增量内容
            var deltaContent = accumulator.AccumulateChunk(chunk);

            // 如果有新增内容,则异步调用回调
            if (!string.IsNullOrEmpty(deltaContent))
            {
                await onDeltaContent(deltaContent);
            }

            return true;
        }
        catch (JsonException ex)
        {
            var truncatedJson = jsonStr[..Math.Min(100, jsonStr.Length)];
            logger.LogWarning(ex, "Failed to deserialize stream chunk: {JsonStr}", truncatedJson);
            return false;
        }
    }
}

/// <summary>
/// 流式响应累积器
/// 负责累积多个流式块的数据,最终组合为完整响应
/// </summary>
internal class StreamAccumulator
{
    private ChatCompletionResponse? _response;
    private readonly StringBuilder _contentBuilder = new();
    private readonly StringBuilder _reasoningBuilder = new();
    private bool _initialized;

    public bool IsInitialized => _initialized;

    /// <summary>
    /// 使用第一个块的元数据初始化响应
    /// </summary>
    public void Initialize(StreamCompletionChunk chunk)
    {
        _response = new ChatCompletionResponse
        {
            Id = chunk.Id,
            Object = chunk.Object,
            Created = chunk.Created,
            Model = chunk.Model,
            Provider = chunk.Provider,
            Choices = BuildInitialChoices(chunk),
            Usage = chunk.Usage,
        };
        _initialized = true;
    }

    /// <summary>
    /// 从流式块构建初始的选择项
    /// </summary>
    private static List<CompletionChoice> BuildInitialChoices(StreamCompletionChunk chunk)
    {
        return chunk
            .Choices.Select(c => new CompletionChoice
            {
                Index = c.Index,
                Message = new CompletionMessage
                {
                    Role = c.Delta.Role ?? "assistant",
                    Content = string.Empty,
                },
                LogProbs = c.LogProbs,
                FinishReason = c.FinishReason ?? string.Empty,
            })
            .ToList();
    }

    /// <summary>
    /// 累积单个流式块的数据,返回本次新增的内容
    /// </summary>
    /// <returns>本次累积产生的新增内容(delta)</returns>
    public string AccumulateChunk(StreamCompletionChunk chunk)
    {
        if (_response == null || _response.Choices.Count == 0)
        {
            return string.Empty;
        }

        if (chunk.Choices.Count == 0)
        {
            return string.Empty;
        }

        var choice = chunk.Choices[0];
        var deltaContent = AccumulateContent(choice);
        AccumulateFinishReason(choice);
        AccumulateUsage(chunk);

        return deltaContent;
    }

    /// <summary>
    /// 累积内容和推理数据,返回本次新增内容
    /// </summary>
    private string AccumulateContent(StreamCompletionChoice choice)
    {
        var delta = new StringBuilder();

        if (choice.Delta.Content != null)
        {
            _contentBuilder.Append(choice.Delta.Content);
            delta.Append(choice.Delta.Content);
        }

        if (choice.Delta.Reasoning != null)
        {
            _reasoningBuilder.Append(choice.Delta.Reasoning);
        }

        return delta.ToString();
    }

    /// <summary>
    /// 更新完成原因
    /// </summary>
    private void AccumulateFinishReason(StreamCompletionChoice choice)
    {
        if (!string.IsNullOrEmpty(choice.FinishReason) && _response != null)
        {
            _response.Choices[choice.Index].FinishReason = choice.FinishReason;
        }
    }

    /// <summary>
    /// 更新使用统计信息
    /// </summary>
    private void AccumulateUsage(StreamCompletionChunk chunk)
    {
        if (chunk.Usage != null && _response != null)
        {
            _response.Usage = chunk.Usage;
        }
    }

    /// <summary>
    /// 构建最终响应
    /// </summary>
    public ChatCompletionResponse? BuildFinalResponse()
    {
        if (_response?.Choices.Count > 0)
        {
            _response.Choices[0].Message.Content = _contentBuilder.ToString();
        }

        return _response;
    }
}
loading