using Dpz.Core.Service.Network;
using Dpz.Core.Service.Network.Models;
using Dpz.Core.Web.Library.Api.Service;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;

namespace Dpz.Core.Web.Library.Hub;

public class RobotChatHub(
    ILogger<RobotChatHub> logger,
    IAiChatService aiChatService,
    IConfiguration configuration,
    ChatCompletionService chatCompletionService,
    GenerateSessionTitleService generateSessionTitleService
) : Microsoft.AspNetCore.SignalR.Hub
{
    [Authorize]
    public async Task SendStreamMessageToChatGpt(
        string content,
        int modelValue = 1,
        string? sessionId = null
    )
    {
        if (string.IsNullOrWhiteSpace(content))
        {
            await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("请输入内容")]);
            return;
        }

        // if (!Context.User.IsLogin())
        // {
        //     await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("尚未登录")]);
        //     return;
        // }

        // 创建取消令牌源并存储
        var cts = new CancellationTokenSource();
        var tokenKey = "";

        try
        {
            var sender = Context.User.RequiredUserInfo;

            // 判断是否为新的会话
            var isNewSession = false;

            if (string.IsNullOrEmpty(sessionId))
            {
                sessionId = Guid.NewGuid().ToString("N");
                isNewSession = true;
            }

            // 使用 sessionId + ConnectionId 作为唯一标识
            tokenKey = $"{sessionId}_{Context.ConnectionId}";
            WebToolsExtensions.CancellationTokens[tokenKey] = cts;

            // 立即检查是否已被取消(在存储后立即检查)
            cts.Token.ThrowIfCancellationRequested();

            // 如果是新会话,通知前端新的 SessionId
            if (isNewSession)
            {
                await Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token);
            }

            // 解析模型枚举,如果失败则使用默认值 Gpt5Mini
            AiModel selectedModel;
            try
            {
                if (Enum.IsDefined(typeof(AiModel), modelValue))
                {
                    selectedModel = (AiModel)modelValue;
                }
                else
                {
                    selectedModel = AiModel.Gpt5Mini;
                    logger.LogWarning(
                        "无效的模型值 {Value},使用默认模型: {Model}",
                        modelValue,
                        selectedModel
                    );
                }
            }
            catch (Exception e)
            {
                selectedModel = AiModel.Gpt5Mini;
                logger.LogError(
                    e,
                    "解析模型值 {Value} 失败,使用默认模型: {Model}",
                    modelValue,
                    selectedModel
                );
            }

            // 保存用户消息到会话
            await aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender);

            // 再次检查是否已被取消
            cts.Token.ThrowIfCancellationRequested();

            // 获取会话的历史记录
            var records = await aiChatService.GetSessionMessagesAsync(sessionId, 1, 5);
            var messages = records
                .OrderBy(x => x.SendTime)
                .Select(x => new ChatMessage
                {
                    Role = x.Sender?.Id == sessionId ? "assistant" : "user",
                    Message = x.Message,
                })
                .ToList();

            // 用于收集最终的完整回复
            var fullContent = new StringBuilder();

            // 流式调用 AI,每次收到增量内容就推送给客户端
            var result = await chatCompletionService.SendStreamChatWithCallbackAsync(
                messages,
                async deltaContent =>
                {
                    // 在发送前立即检查是否已取消
                    cts.Token.ThrowIfCancellationRequested();

                    // 累积完整内容
                    fullContent.Append(deltaContent);

                    // 实时推送增量内容给客户端
                    await Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token);
                },
                opt =>
                {
                    opt.Stream = true;
                    opt.Model = selectedModel;
                },
                cts.Token
            );

            if (!result.Success)
            {
                await Clients.Caller.SendCoreAsync(
                    "SystemError",
                    [ChatGptReceiverUser(ErrorMessage(result.Message))],
                    cts.Token
                );
                return;
            }

            // 流式完成,推送完成标记
            await Clients.Caller.SendCoreAsync("StreamCompleted", [], cts.Token);

            // 保存完整回复到数据库
            var finalContent = fullContent.ToString();
            if (string.IsNullOrEmpty(finalContent))
            {
                return;
            }

            var aiUser = new VmUserInfo
            {
                Avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
                Enable = true,
                // 使用会话ID作为AI用户ID
                Id = sessionId,
                Key = "",
                LastAccessTime = DateTime.Now,
                Name = "AI Assistant",
                Permissions = null,
            };
            await aiChatService.SaveAiMessageAsync(sender.Id, sessionId, finalContent, aiUser);

            if (isNewSession)
            {
                var sessionName = await generateSessionTitleService.GenerateAsync(
                    sender.Id,
                    sessionId,
                    content,
                    finalContent,
                    selectedModel.ToString()
                );
                await Clients.Caller.SendCoreAsync(
                    "SessionNameGenerated",
                    [new { sessionId, sessionName }],
                    cts.Token
                );
            }
        }
        catch (OperationCanceledException)
        {
            logger.LogInformation(
                "Stream message cancelled by user, Token Key: {TokenKey}",
                tokenKey
            );
            await Clients.Caller.SendCoreAsync("StreamCancelled", [], cts.Token);
        }
        catch (Exception e)
        {
            logger.LogError(e, "Stream message to ChatGpt error");
            await Clients.Caller.SendCoreAsync(
                "SystemError",
                [ChatGptReceiverUser(ErrorMessage(e.Message))],
                cts.Token
            );
        }
        finally
        {
            // 清理取消令牌源
            WebToolsExtensions.CancellationTokens.TryRemove(tokenKey, out _);
            cts.Dispose();
        }
    }

    private object ChatGptReceiverUser(string message)
    {
        return new
        {
            id = ApplicationTools.ChatGptUserId,
            avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
            username = "ChatGPT",
            type = "friend",
            content = message,
            mine = false,
        };
    }

    private static string ErrorMessage(string? message)
    {
        return $"调用API出错,错误信息:<span style='color:red;'>{(message ?? "Call API fail")}</span>";
    }

    public override Task OnConnectedAsync()
    {
        var sessionId = Context
            .GetHttpContext()
            ?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
        Clients.Caller.SendCoreAsync("Connected", ["websocket connected.", sessionId]);
        return base.OnConnectedAsync();
    }

    public override Task OnDisconnectedAsync(Exception? exception)
    {
        var sessionId = Context
            .GetHttpContext()
            ?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
        Clients.Caller.SendCoreAsync("Disconnected", ["websocket disconnected.", sessionId]);
        if (exception != null)
        {
            logger.LogError(exception, "signalR disconnected");
        }

        // 清理当前连接的所有取消令牌
        var connectionId = Context.ConnectionId;
        var keysToRemove = WebToolsExtensions
            .CancellationTokens.Keys.Where(k => k.EndsWith($"_{connectionId}"))
            .ToList();

        foreach (var key in keysToRemove)
        {
            if (WebToolsExtensions.CancellationTokens.TryRemove(key, out var cts))
            {
                cts.Dispose();
                logger.LogInformation(
                    "Cleaned up cancellation token on disconnect, Token Key: {TokenKey}",
                    key
                );
            }
        }

        return base.OnDisconnectedAsync(exception);
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

上述代码定义了一个 SignalR Hub 类 RobotChatHub,用于处理与聊天机器人(如讯飞和 ChatGPT)之间的实时消息交互。以下是代码的主要功能和结构的详细解释:

1. 类的构造函数

RobotChatHub 类的构造函数接受多个依赖项,包括日志记录器、聊天记录服务、配置、讯飞服务和分析服务。这些依赖项用于处理聊天请求、记录聊天记录和与外部服务交互。

2. 发送消息

SendMessage 方法

  • 功能:处理用户发送的消息,调用讯飞的聊天服务,并将响应返回给用户。
  • 流程
    • 生成一个会话 ID,优先使用用户的 ID。
    • 调用 GetXunFeiRequestAsync 方法构建请求。
    • 调用 xunFeiService.ChatAsync 方法与讯飞 API 交互。
    • 如果成功,提取响应内容并发送给用户;如果失败,发送错误信息。
    • 如果用户已登录,记录聊天记录。

SendMessageToChatGpt 方法

  • 功能:处理用户发送给 ChatGPT 的消息。
  • 流程
    • 检查内容是否为空和用户是否已登录。
    • 记录用户发送的消息。
    • 获取与 ChatGPT 的聊天记录,并构建消息列表。
    • 调用 analyzeService.ChatAsync 方法与 ChatGPT 交互。
    • 处理响应并将结果发送给用户;记录 ChatGPT 的响应。

3. 构建请求

GetXunFeiRequestAsync 方法

  • 功能:构建发送给讯飞 API 的请求对象。
  • 流程
    • 获取用户的聊天记录,并构建消息列表。
    • 将用户的问题添加到消息列表中。
    • 返回构建好的 XunFeiRequest 对象。

4. 用户信息和消息格式化

  • XunFeiReceiverUserChatGptReceiverUser 方法用于格式化发送给客户端的消息,包含用户的头像、用户名和消息内容。
  • XunFeiUserChatGptUser 方法返回代表讯飞和 ChatGPT 的用户信息对象。

5. 错误处理

  • ErrorMessage 方法用于格式化错误信息,以便在客户端显示。

6. 连接和断开连接

  • OnConnectedAsyncOnDisconnectedAsync 方法用于处理用户连接和断开连接事件,记录连接状态并向客户端发送相应的消息。

总结

RobotChatHub 类实现了一个实时聊天系统的核心功能,允许用户与讯飞和 ChatGPT 进行交互。它处理消息的发送和接收,记录聊天记录,并在出现错误时提供反馈。通过 SignalR,用户可以实时接收来自聊天机器人的响应。

评论加载中...