using Dpz.Core.Service.Network;
using Dpz.Core.Service.Network.Models;
using Dpz.Core.Web.Library.Api.Service;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;

namespace Dpz.Core.Web.Library.Hub;

public class RobotChatHub(
    ILogger<RobotChatHub> logger,
    IAiChatService aiChatService,
    IConfiguration configuration,
    ChatCompletionService chatCompletionService,
    GenerateSessionTitleService generateSessionTitleService
) : Microsoft.AspNetCore.SignalR.Hub
{
    [Authorize]
    public async Task SendStreamMessageToChatGpt(
        string content,
        int modelValue = 1,
        string? sessionId = null
    )
    {
        if (string.IsNullOrWhiteSpace(content))
        {
            await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("请输入内容")]);
            return;
        }

        // if (!Context.User.IsLogin())
        // {
        //     await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("尚未登录")]);
        //     return;
        // }

        // 创建取消令牌源并存储
        var cts = new CancellationTokenSource();
        var tokenKey = "";

        try
        {
            var sender = Context.User.RequiredUserInfo;

            // 判断是否为新的会话
            var isNewSession = false;

            if (string.IsNullOrEmpty(sessionId))
            {
                sessionId = Guid.NewGuid().ToString("N");
                isNewSession = true;
            }

            // 使用 sessionId + ConnectionId 作为唯一标识
            tokenKey = $"{sessionId}_{Context.ConnectionId}";
            WebToolsExtensions.CancellationTokens[tokenKey] = cts;

            // 立即检查是否已被取消(在存储后立即检查)
            cts.Token.ThrowIfCancellationRequested();

            // 如果是新会话,通知前端新的 SessionId
            if (isNewSession)
            {
                await Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token);
            }

            // 解析模型枚举,如果失败则使用默认值 Gpt5Mini
            AiModel selectedModel;
            try
            {
                if (Enum.IsDefined(typeof(AiModel), modelValue))
                {
                    selectedModel = (AiModel)modelValue;
                }
                else
                {
                    selectedModel = AiModel.Gpt5Mini;
                    logger.LogWarning(
                        "无效的模型值 {Value},使用默认模型: {Model}",
                        modelValue,
                        selectedModel
                    );
                }
            }
            catch (Exception e)
            {
                selectedModel = AiModel.Gpt5Mini;
                logger.LogError(
                    e,
                    "解析模型值 {Value} 失败,使用默认模型: {Model}",
                    modelValue,
                    selectedModel
                );
            }

            // 保存用户消息到会话
            await aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender);

            // 再次检查是否已被取消
            cts.Token.ThrowIfCancellationRequested();

            // 获取会话的历史记录
            var records = await aiChatService.GetSessionMessagesAsync(sessionId, 1, 5);
            var messages = records
                .OrderBy(x => x.SendTime)
                .Select(x => new ChatMessage
                {
                    Role = x.Sender?.Id == sessionId ? "assistant" : "user",
                    Message = x.Message,
                })
                .ToList();

            // 用于收集最终的完整回复
            var fullContent = new StringBuilder();

            // 流式调用 AI,每次收到增量内容就推送给客户端
            var result = await chatCompletionService.SendStreamChatWithCallbackAsync(
                messages,
                async deltaContent =>
                {
                    // 在发送前立即检查是否已取消
                    cts.Token.ThrowIfCancellationRequested();

                    // 累积完整内容
                    fullContent.Append(deltaContent);

                    // 实时推送增量内容给客户端
                    await Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token);
                },
                opt =>
                {
                    opt.Stream = true;
                    opt.Model = selectedModel;
                },
                cts.Token
            );

            if (!result.Success)
            {
                await Clients.Caller.SendCoreAsync(
                    "SystemError",
                    [ChatGptReceiverUser(ErrorMessage(result.Message))],
                    cts.Token
                );
                return;
            }

            // 流式完成,推送完成标记
            await Clients.Caller.SendCoreAsync("StreamCompleted", [], cts.Token);

            // 保存完整回复到数据库
            var finalContent = fullContent.ToString();
            if (string.IsNullOrEmpty(finalContent))
            {
                return;
            }

            var aiUser = new VmUserInfo
            {
                Avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
                Enable = true,
                // 使用会话ID作为AI用户ID
                Id = sessionId,
                Key = "",
                LastAccessTime = DateTime.Now,
                Name = "AI Assistant",
                Permissions = null,
            };
            await aiChatService.SaveAiMessageAsync(sender.Id, sessionId, finalContent, aiUser);

            if (isNewSession)
            {
                var sessionName = await generateSessionTitleService.GenerateAsync(
                    sender.Id,
                    sessionId,
                    content,
                    finalContent,
                    selectedModel.ToString()
                );
                await Clients.Caller.SendCoreAsync(
                    "SessionNameGenerated",
                    [new { sessionId, sessionName }],
                    cts.Token
                );
            }
        }
        catch (OperationCanceledException)
        {
            logger.LogInformation(
                "Stream message cancelled by user, Token Key: {TokenKey}",
                tokenKey
            );
            await Clients.Caller.SendCoreAsync("StreamCancelled", [], cts.Token);
        }
        catch (Exception e)
        {
            logger.LogError(e, "Stream message to ChatGpt error");
            await Clients.Caller.SendCoreAsync(
                "SystemError",
                [ChatGptReceiverUser(ErrorMessage(e.Message))],
                cts.Token
            );
        }
        finally
        {
            // 清理取消令牌源
            WebToolsExtensions.CancellationTokens.TryRemove(tokenKey, out _);
            cts.Dispose();
        }
    }

    private object ChatGptReceiverUser(string message)
    {
        return new
        {
            id = ApplicationTools.ChatGptUserId,
            avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
            username = "ChatGPT",
            type = "friend",
            content = message,
            mine = false,
        };
    }

    private static string ErrorMessage(string? message)
    {
        return $"调用API出错,错误信息:<span style='color:red;'>{(message ?? "Call API fail")}</span>";
    }

    public override Task OnConnectedAsync()
    {
        var sessionId = Context
            .GetHttpContext()
            ?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
        Clients.Caller.SendCoreAsync("Connected", ["websocket connected.", sessionId]);
        return base.OnConnectedAsync();
    }

    public override Task OnDisconnectedAsync(Exception? exception)
    {
        var sessionId = Context
            .GetHttpContext()
            ?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
        Clients.Caller.SendCoreAsync("Disconnected", ["websocket disconnected.", sessionId]);
        if (exception != null)
        {
            logger.LogError(exception, "signalR disconnected");
        }

        // 清理当前连接的所有取消令牌
        var connectionId = Context.ConnectionId;
        var keysToRemove = WebToolsExtensions
            .CancellationTokens.Keys.Where(k => k.EndsWith($"_{connectionId}"))
            .ToList();

        foreach (var key in keysToRemove)
        {
            if (WebToolsExtensions.CancellationTokens.TryRemove(key, out var cts))
            {
                cts.Dispose();
                logger.LogInformation(
                    "Cleaned up cancellation token on disconnect, Token Key: {TokenKey}",
                    key
                );
            }
        }

        return base.OnDisconnectedAsync(exception);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

下面是对你这段 SignalR Hub 代码的逐步解释、关键行为说明以及一些可能的注意点和改进建议。整体上这段代码的目的是:接收前端对话请求,向 AI(ChatGPT 风格的服务)发起流式对话请求,把增量响应实时推送给客户端,并在会话完成后把最终回复保存到数据库,按需生成会话标题,同时支持客户端中途取消(通过取消令牌)以及在连接断开时清理相关资源。

总体结构

  • 类名:RobotChatHub,继承自 SignalR 的 Hub。
  • 构造函数注入:ILogger、IAiChatService、IConfiguration、ChatCompletionService、GenerateSessionTitleService。
  • 核心方法:
    • SendStreamMessageToChatGpt:处理来自客户端的消息并以流式方式将 AI 的增量回复推给客户端。
    • OnConnectedAsync / OnDisconnectedAsync:处理连接/断开事件并做一些通知与清理。
    • 辅助方法:ChatGptReceiverUser(构造一个表示 AI 机器人的响应对象)、ErrorMessage(格式化错误信息)。

SendStreamMessageToChatGpt 的执行流程(逐步):

  1. 参数与校验

    • 接收 content、modelValue、可选 sessionId。
    • 若 content 为空,则立即向调用端推送 "SystemError" 事件并返回。
    • 方法标注了 [Authorize],要求用户已认证(此外代码中有注释掉的 IsLogin 校验)。
  2. 创建并注册取消令牌源

    • 创建 CancellationTokenSource cts。
    • 用 sessionId + ConnectionId 作为键(tokenKey)把 cts 存入 WebToolsExtensions.CancellationTokens(这是一个全局并发字典,用于后续取消/清理)。
    • 在存入后立即检查是否已被取消(cts.Token.ThrowIfCancellationRequested)。
  3. 会话 ID 处理

    • 如果没有传入 sessionId,就生成新的 GUID(作为新会话),并通过 Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token) 通知前端新的 sessionId。
    • isNewSession 标记用于后续是否需要生成会话标题。
  4. 模型选择

    • 根据传入的 modelValue 解析到 AiModel 枚举,若解析失败则回退到默认 AiModel.Gpt5Mini,并写日志(Try-Catch + Enum.IsDefined)。
  5. 将用户消息保存到会话(持久化)

    • 调用 aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender)。
  6. 获取会话历史并构造 messages

    • 读取最近的会话消息(GetSessionMessagesAsync(sessionId, 1, 5)),按时间升序排列,映射成 ChatMessage 列表,Role 字段用来标识是 assistant 还是 user(判断逻辑是 x.Sender?.Id == sessionId 则认为是 assistant)。
  7. 流式调用 AI(核心)

    • 调用 chatCompletionService.SendStreamChatWithCallbackAsync,传入历史 messages、一个回调来接收增量 deltaContent、并设置 opt.Stream = true 和指定模型,以及传入 cts.Token 以支持取消。
    • 回调中:每次收到增量内容都执行 cts.Token.ThrowIfCancellationRequested()、把增量追加到 fullContent(StringBuilder),并通过 Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token) 实时推送给客户端。
  8. 处理调用结果与结束

    • 若 result.Success 为 false,发送 "SystemError" 给客户端并返回。
    • 若成功:发送 "StreamCompleted" 标记,表示流式结束。
    • 将 fullContent(最终的完整 AI 回复)保存到数据库(aiChatService.SaveAiMessageAsync),AI 用户信息(VmUserInfo)使用 sessionId 作为 AI 的 Id 并设置 Avatar、Name 等。
    • 如果这是新会话,调用 generateSessionTitleService.GenerateAsync 生成会话名称,并通过 "SessionNameGenerated" 事件将 { sessionId, sessionName } 发给客户端。

异常与取消处理

  • OperationCanceledException 捕获:记录日志并发送 "StreamCancelled" 事件给客户端。
  • 其它 Exception 捕获:记录错误日志并通过 "SystemError" 发送格式化的错误信息。
  • finally 中:从 WebToolsExtensions.CancellationTokens 移除 tokenKey,并 Dispose cts。

OnConnectedAsync / OnDisconnectedAsync 行为

  • OnConnectedAsync:读取 Cookie 中可能的 SessionId,向调用方发送 "Connected" 事件(payload 包含连接信息与 sessionId)。
  • OnDisconnectedAsync:同样发送 "Disconnected" 事件;若有异常则写 log。
  • 在断开时,会寻找并清理与当前 ConnectionId 相关的所有取消令牌条目(键以 _{ConnectionId} 结尾),TryRemove 后 Dispose,并记录日志。

向前端发送的事件(主要)

  • "SystemError":系统级错误(包含 ChatGptReceiverUser 格式的对象)。
  • "NewSessionCreated":当服务端生成新的会话 ID 时发送。
  • "StreamDelta":每次收到 AI 的增量内容时推送(payload:增量文本)。
  • "StreamCompleted":流式结束标志。
  • "SessionNameGenerated":为新会话生成的会话名和 sessionId。
  • "StreamCancelled":用户取消流式请求。
  • "Connected" / "Disconnected":连接建立与断开事件。

重要的实现细节与假设

  • Context.User.RequiredUserInfo:代码假设存在用于获取当前用户信息的扩展(RequiredUserInfo),该对象用于获取用户 Id、昵称、头像等信息。
  • WebToolsExtensions.CancellationTokens:这是一个共享的并发字典(例如 ConcurrentDictionary<string, CancellationTokenSource>),用于跨连接、跨请求管理取消令牌,使得客户端可以在其他地方基于 sessionId+connectionId 发起取消。
  • chatCompletionService.SendStreamChatWithCallbackAsync:假定这是一个支持流式回调的自定义方法,能够在接收 AI 的增量输出时调用回调并最终返回一个包含 Success 标志的结果对象。

潜在问题与改进建议

  1. C# 语法注意

    • 代码中使用的类似 [sessionId]、[ChatGptReceiverUser(...)] 的写法不是标准 C# 数组字面量(C# 要用 new object[]{ ... })。如果真实代码中也是这样,会编译失败;可能是示例中为简洁写法,但需确保实际代码使用正确的数组/集合语法。
  2. SendCoreAsync 的 await 问题

    • 在 OnConnectedAsync / OnDisconnectedAsync 中对 Clients.Caller.SendCoreAsync 没有 await,可能导致未观察的异常或异步推送未完成。建议 await 这些调用或显式忽略返回 Task(并处理异常)。
  3. 取消令牌的生命周期

    • 将 cts 存入全局字典后,必须确保在所有可能的结束路径(包括异常)都会移除并 Dispose,目前 finally 有处理,这很好。但要确保外部用于取消的逻辑(例如前端发送取消请求)能找到并调用相应的 CancellationTokenSource.Cancel()。
  4. 并发与资源清理

    • WebToolsExtensions.CancellationTokens 的实现需要是线程安全的(比如 ConcurrentDictionary)。
    • OnDisconnectedAsync 在清理 token 时查找后按键逐个 TryRemove 并 Dispose,这很合适。
  5. Role 判定逻辑

    • messages 中 Role 的判断:Role = x.Sender?.Id == sessionId ? "assistant" : "user"。这里假设 AI 的消息的 Sender.Id == sessionId(因为保存 AI 消息时使用了 sessionId 作为 AI 用户 Id),这在你当前实现下成立,但需要注意与 aiChatService.SaveAiMessageAsync 的行为保持一致。
  6. 错误信息泄露与格式

    • ErrorMessage 将原始异常消息嵌入前端 HTML(含 ...)。需注意不要把敏感异常信息直接返回给前端以免泄露内部信息,或在生产环境中限制错误内容。
  7. 日志与监控

    • 已有若干日志点(解析模型失败、异常、取消、清理 token 等),可以考虑在关键路径增加更多上下文以便排查(比如 sessionId、userId、modelValue 等,注意个人信息保护)。
  8. 可靠性

    • 如果 chatCompletionService 的回调抛出异常或 Clients.Caller.SendCoreAsync 失败(客户端断开),需要确认这里的异常如何传播以及是否会导致 finally 被执行并正确清理令牌(一般会的,但要测试断连时的行为)。

如果你希望,我可以:

  • 帮你检查并改写成可编译的 C# 版本(修复数组字面量等问题)。
  • 推荐更完善的取消/回收策略或如何从前端触发取消(例如暴露一个 CancelStream 方法来取消 token)。
  • 优化错误处理策略(比如统一错误码与本地化消息)。
  • 添加更多日志点或度量(例如每次流的长度、耗时、失败率等)。

需要我从哪个方向继续帮助?例如:修复编译问题、增加取消接口示例、或改进错误处理。

评论加载中...