using Dpz.Core.Service.Network;
using Dpz.Core.Service.Network.Models;
using Dpz.Core.Web.Library.Api.Service;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;

namespace Dpz.Core.Web.Library.Hub;

public class RobotChatHub(
    ILogger<RobotChatHub> logger,
    IAiChatService aiChatService,
    IConfiguration configuration,
    ChatCompletionService chatCompletionService,
    GenerateSessionTitleService generateSessionTitleService
) : Microsoft.AspNetCore.SignalR.Hub
{
    [Authorize]
    public async Task SendStreamMessageToChatGpt(
        string content,
        int modelValue = 1,
        string? sessionId = null
    )
    {
        if (string.IsNullOrWhiteSpace(content))
        {
            await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("请输入内容")]);
            return;
        }

        // if (!Context.User.IsLogin())
        // {
        //     await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("尚未登录")]);
        //     return;
        // }

        // 创建取消令牌源并存储
        var cts = new CancellationTokenSource();
        var tokenKey = "";

        try
        {
            var sender = Context.User.RequiredUserInfo;

            // 判断是否为新的会话
            var isNewSession = false;

            if (string.IsNullOrEmpty(sessionId))
            {
                sessionId = Guid.NewGuid().ToString("N");
                isNewSession = true;
            }

            // 使用 sessionId + ConnectionId 作为唯一标识
            tokenKey = $"{sessionId}_{Context.ConnectionId}";
            WebToolsExtensions.CancellationTokens[tokenKey] = cts;

            // 立即检查是否已被取消(在存储后立即检查)
            cts.Token.ThrowIfCancellationRequested();

            // 如果是新会话,通知前端新的 SessionId
            if (isNewSession)
            {
                await Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token);
            }

            // 解析模型枚举,如果失败则使用默认值 Gpt5Mini
            AiModel selectedModel;
            try
            {
                if (Enum.IsDefined(typeof(AiModel), modelValue))
                {
                    selectedModel = (AiModel)modelValue;
                }
                else
                {
                    selectedModel = AiModel.Gpt5Mini;
                    logger.LogWarning(
                        "无效的模型值 {Value},使用默认模型: {Model}",
                        modelValue,
                        selectedModel
                    );
                }
            }
            catch (Exception e)
            {
                selectedModel = AiModel.Gpt5Mini;
                logger.LogError(
                    e,
                    "解析模型值 {Value} 失败,使用默认模型: {Model}",
                    modelValue,
                    selectedModel
                );
            }

            // 保存用户消息到会话
            await aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender);

            // 再次检查是否已被取消
            cts.Token.ThrowIfCancellationRequested();

            // 获取会话的历史记录
            var records = await aiChatService.GetSessionMessagesAsync(sessionId, 1, 5);
            var messages = records
                .OrderBy(x => x.SendTime)
                .Select(x => new ChatMessage
                {
                    Role = x.Sender?.Id == sessionId ? "assistant" : "user",
                    Message = x.Message,
                })
                .ToList();

            // 用于收集最终的完整回复
            var fullContent = new StringBuilder();

            // 流式调用 AI,每次收到增量内容就推送给客户端
            var result = await chatCompletionService.SendStreamChatWithCallbackAsync(
                messages,
                async deltaContent =>
                {
                    // 在发送前立即检查是否已取消
                    cts.Token.ThrowIfCancellationRequested();

                    // 累积完整内容
                    fullContent.Append(deltaContent);

                    // 实时推送增量内容给客户端
                    await Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token);
                },
                opt =>
                {
                    opt.Stream = true;
                    opt.Model = selectedModel;
                },
                cts.Token
            );

            if (!result.Success)
            {
                await Clients.Caller.SendCoreAsync(
                    "SystemError",
                    [ChatGptReceiverUser(ErrorMessage(result.Message))],
                    cts.Token
                );
                return;
            }

            // 流式完成,推送完成标记
            await Clients.Caller.SendCoreAsync("StreamCompleted", [], cts.Token);

            // 保存完整回复到数据库
            var finalContent = fullContent.ToString();
            if (string.IsNullOrEmpty(finalContent))
            {
                return;
            }

            var aiUser = new VmUserInfo
            {
                Avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
                Enable = true,
                // 使用会话ID作为AI用户ID
                Id = sessionId,
                Key = "",
                LastAccessTime = DateTime.Now,
                Name = "AI Assistant",
                Permissions = null,
            };
            await aiChatService.SaveAiMessageAsync(sender.Id, sessionId, finalContent, aiUser);

            if (isNewSession)
            {
                var sessionName = await generateSessionTitleService.GenerateAsync(
                    sender.Id,
                    sessionId,
                    content,
                    finalContent,
                    selectedModel.ToString(),
                    cts.Token
                );
                await Clients.Caller.SendCoreAsync(
                    "SessionNameGenerated",
                    [new { sessionId, sessionName }],
                    cts.Token
                );
            }
        }
        catch (OperationCanceledException)
        {
            logger.LogInformation(
                "Stream message cancelled by user, Token Key: {TokenKey}",
                tokenKey
            );
            await Clients.Caller.SendCoreAsync("StreamCancelled", [], cts.Token);
        }
        catch (Exception e)
        {
            logger.LogError(e, "Stream message to ChatGpt error");
            await Clients.Caller.SendCoreAsync(
                "SystemError",
                [ChatGptReceiverUser(ErrorMessage(e.Message))],
                cts.Token
            );
        }
        finally
        {
            // 清理取消令牌源
            WebToolsExtensions.CancellationTokens.TryRemove(tokenKey, out _);
            cts.Dispose();
        }
    }

    private object ChatGptReceiverUser(string message)
    {
        return new
        {
            id = ApplicationTools.ChatGptUserId,
            avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
            username = "ChatGPT",
            type = "friend",
            content = message,
            mine = false,
        };
    }

    private static string ErrorMessage(string? message)
    {
        return $"调用API出错,错误信息:<span style='color:red;'>{(message ?? "Call API fail")}</span>";
    }

    public override Task OnConnectedAsync()
    {
        var sessionId = Context
            .GetHttpContext()
            ?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
        Clients.Caller.SendCoreAsync("Connected", ["websocket connected.", sessionId]);
        return base.OnConnectedAsync();
    }

    public override Task OnDisconnectedAsync(Exception? exception)
    {
        var sessionId = Context
            .GetHttpContext()
            ?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
        Clients.Caller.SendCoreAsync("Disconnected", ["websocket disconnected.", sessionId]);
        if (exception != null)
        {
            logger.LogError(exception, "signalR disconnected");
        }

        // 清理当前连接的所有取消令牌
        var connectionId = Context.ConnectionId;
        var keysToRemove = WebToolsExtensions
            .CancellationTokens.Keys.Where(k => k.EndsWith($"_{connectionId}"))
            .ToList();

        foreach (var key in keysToRemove)
        {
            if (WebToolsExtensions.CancellationTokens.TryRemove(key, out var cts))
            {
                cts.Dispose();
                logger.LogInformation(
                    "Cleaned up cancellation token on disconnect, Token Key: {TokenKey}",
                    key
                );
            }
        }

        return base.OnDisconnectedAsync(exception);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于 ASP.NET Core SignalR 的聊天机器人 Hub 类,用于实现实时的 AI 聊天功能。让我详细解释这个代码的主要功能和结构:

类概述

RobotChatHub 继承自 Microsoft.AspNetCore.SignalR.Hub,通过依赖注入获取所需的服务:

  • ILogger<RobotChatHub>: 日志记录
  • IAiChatService: AI 聊天服务
  • IConfiguration: 配置信息
  • ChatCompletionService: 聊天完成服务
  • GenerateSessionTitleService: 生成会话标题服务

主要功能

1. 流式 AI 聊天 (SendStreamMessageToChatGpt)

这是核心方法,实现了以下功能:

输入验证

  • 检查消息内容是否为空
  • 验证用户登录状态(代码中被注释)

会话管理

  • 如果没有提供 sessionId,自动创建新会话
  • 使用 GUID 生成唯一会话标识

取消令牌管理

  • 创建 CancellationTokenSource 用于取消长时间运行的操作
  • 使用 sessionId + ConnectionId 作为唯一键存储取消令牌

AI 模型处理

  • 解析传入的模型值,默认使用 Gpt5Mini
  • 包含错误处理机制

消息存储和检索

  • 保存用户消息到数据库
  • 获取会话历史记录(最近5条)
  • 将消息转换为 AI 服务所需的格式

流式响应处理

  • 使用回调函数实时接收 AI 的增量响应
  • 立即将增量内容推送给客户端
  • 累积完整响应内容

完成处理

  • 保存 AI 的完整回复到数据库
  • 为新会话生成标题
  • 发送完成通知给客户端

2. 连接管理

连接建立 (OnConnectedAsync):

  • 获取会话 ID(从 Cookie 中)
  • 向客户端发送连接确认

连接断开 (OnDisconnectedAsync):

  • 清理与该连接相关的所有取消令牌
  • 记录断开连接的异常信息
  • 发送断开通知

3. 辅助方法

ChatGptReceiverUser

  • 创建标准化的 ChatGPT 用户消息格式
  • 包含头像、用户名等显示信息

ErrorMessage

  • 格式化错误消息,添加红色样式

关键特性

  1. 实时通信:使用 SignalR 实现客户端与服务器的实时双向通信

  2. 流式响应:支持 AI 的流式回复,用户可以实时看到 AI 的回答过程

  3. 取消机制:用户可以取消正在进行的 AI 请求

  4. 会话管理:支持多会话聊天,自动生成会话标题

  5. 错误处理:全面的异常处理和用户友好的错误消息

  6. 资源管理:正确处理取消令牌的生命周期,避免内存泄漏

这个 Hub 类是一个完整的 AI 聊天解决方案的核心组件,提供了现代 AI 聊天应用所需的主要功能。

评论加载中...