using Dpz.Core.Service.Network;
using Dpz.Core.Service.Network.Models;
using Dpz.Core.Web.Library.Api.Service;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
namespace Dpz.Core.Web.Library.Hub;
public class RobotChatHub(
ILogger<RobotChatHub> logger,
IAiChatService aiChatService,
IConfiguration configuration,
ChatCompletionService chatCompletionService,
GenerateSessionTitleService generateSessionTitleService
) : Microsoft.AspNetCore.SignalR.Hub
{
[Authorize]
public async Task SendStreamMessageToChatGpt(
string content,
int modelValue = 1,
string? sessionId = null
)
{
if (string.IsNullOrWhiteSpace(content))
{
await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("请输入内容")]);
return;
}
// if (!Context.User.IsLogin())
// {
// await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("尚未登录")]);
// return;
// }
// 创建取消令牌源并存储
var cts = new CancellationTokenSource();
var tokenKey = "";
try
{
var sender = Context.User.RequiredUserInfo;
// 判断是否为新的会话
var isNewSession = false;
if (string.IsNullOrEmpty(sessionId))
{
sessionId = Guid.NewGuid().ToString("N");
isNewSession = true;
}
// 使用 sessionId + ConnectionId 作为唯一标识
tokenKey = $"{sessionId}_{Context.ConnectionId}";
WebToolsExtensions.CancellationTokens[tokenKey] = cts;
// 立即检查是否已被取消(在存储后立即检查)
cts.Token.ThrowIfCancellationRequested();
// 如果是新会话,通知前端新的 SessionId
if (isNewSession)
{
await Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token);
}
// 解析模型枚举,如果失败则使用默认值 Gpt5Mini
AiModel selectedModel;
try
{
if (Enum.IsDefined(typeof(AiModel), modelValue))
{
selectedModel = (AiModel)modelValue;
}
else
{
selectedModel = AiModel.Gpt5Mini;
logger.LogWarning(
"无效的模型值 {Value},使用默认模型: {Model}",
modelValue,
selectedModel
);
}
}
catch (Exception e)
{
selectedModel = AiModel.Gpt5Mini;
logger.LogError(
e,
"解析模型值 {Value} 失败,使用默认模型: {Model}",
modelValue,
selectedModel
);
}
// 保存用户消息到会话
await aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender);
// 再次检查是否已被取消
cts.Token.ThrowIfCancellationRequested();
// 获取会话的历史记录
var records = await aiChatService.GetSessionMessagesAsync(sessionId, 1, 5);
var messages = records
.OrderBy(x => x.SendTime)
.Select(x => new ChatMessage
{
Role = x.Sender?.Id == sessionId ? "assistant" : "user",
Message = x.Message,
})
.ToList();
// 用于收集最终的完整回复
var fullContent = new StringBuilder();
// 流式调用 AI,每次收到增量内容就推送给客户端
var result = await chatCompletionService.SendStreamChatWithCallbackAsync(
messages,
async deltaContent =>
{
// 在发送前立即检查是否已取消
cts.Token.ThrowIfCancellationRequested();
// 累积完整内容
fullContent.Append(deltaContent);
// 实时推送增量内容给客户端
await Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token);
},
opt =>
{
opt.Stream = true;
opt.Model = selectedModel;
},
cts.Token
);
if (!result.Success)
{
await Clients.Caller.SendCoreAsync(
"SystemError",
[ChatGptReceiverUser(ErrorMessage(result.Message))],
cts.Token
);
return;
}
// 流式完成,推送完成标记
await Clients.Caller.SendCoreAsync("StreamCompleted", [], cts.Token);
// 保存完整回复到数据库
var finalContent = fullContent.ToString();
if (string.IsNullOrEmpty(finalContent))
{
return;
}
var aiUser = new VmUserInfo
{
Avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
Enable = true,
// 使用会话ID作为AI用户ID
Id = sessionId,
Key = "",
LastAccessTime = DateTime.Now,
Name = "AI Assistant",
Permissions = null,
};
await aiChatService.SaveAiMessageAsync(sender.Id, sessionId, finalContent, aiUser);
if (isNewSession)
{
var sessionName = await generateSessionTitleService.GenerateAsync(
sender.Id,
sessionId,
content,
finalContent,
selectedModel.ToString(),
cts.Token
);
await Clients.Caller.SendCoreAsync(
"SessionNameGenerated",
[new { sessionId, sessionName }],
cts.Token
);
}
}
catch (OperationCanceledException)
{
logger.LogInformation(
"Stream message cancelled by user, Token Key: {TokenKey}",
tokenKey
);
await Clients.Caller.SendCoreAsync("StreamCancelled", [], cts.Token);
}
catch (Exception e)
{
logger.LogError(e, "Stream message to ChatGpt error");
await Clients.Caller.SendCoreAsync(
"SystemError",
[ChatGptReceiverUser(ErrorMessage(e.Message))],
cts.Token
);
}
finally
{
// 清理取消令牌源
WebToolsExtensions.CancellationTokens.TryRemove(tokenKey, out _);
cts.Dispose();
}
}
private object ChatGptReceiverUser(string message)
{
return new
{
id = ApplicationTools.ChatGptUserId,
avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
username = "ChatGPT",
type = "friend",
content = message,
mine = false,
};
}
private static string ErrorMessage(string? message)
{
return $"调用API出错,错误信息:<span style='color:red;'>{(message ?? "Call API fail")}</span>";
}
public override Task OnConnectedAsync()
{
var sessionId = Context
.GetHttpContext()
?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
Clients.Caller.SendCoreAsync("Connected", ["websocket connected.", sessionId]);
return base.OnConnectedAsync();
}
public override Task OnDisconnectedAsync(Exception? exception)
{
var sessionId = Context
.GetHttpContext()
?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
Clients.Caller.SendCoreAsync("Disconnected", ["websocket disconnected.", sessionId]);
if (exception != null)
{
logger.LogError(exception, "signalR disconnected");
}
// 清理当前连接的所有取消令牌
var connectionId = Context.ConnectionId;
var keysToRemove = WebToolsExtensions
.CancellationTokens.Keys.Where(k => k.EndsWith($"_{connectionId}"))
.ToList();
foreach (var key in keysToRemove)
{
if (WebToolsExtensions.CancellationTokens.TryRemove(key, out var cts))
{
cts.Dispose();
logger.LogInformation(
"Cleaned up cancellation token on disconnect, Token Key: {TokenKey}",
key
);
}
}
return base.OnDisconnectedAsync(exception);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于 ASP.NET Core SignalR 的聊天机器人 Hub 类,用于实现实时的 AI 聊天功能。让我详细解释这个代码的主要功能和结构:
类概述
RobotChatHub 继承自 Microsoft.AspNetCore.SignalR.Hub,通过依赖注入获取所需的服务:
ILogger<RobotChatHub>: 日志记录IAiChatService: AI 聊天服务IConfiguration: 配置信息ChatCompletionService: 聊天完成服务GenerateSessionTitleService: 生成会话标题服务
主要功能
1. 流式 AI 聊天 (SendStreamMessageToChatGpt)
这是核心方法,实现了以下功能:
输入验证:
- 检查消息内容是否为空
- 验证用户登录状态(代码中被注释)
会话管理:
- 如果没有提供
sessionId,自动创建新会话 - 使用 GUID 生成唯一会话标识
取消令牌管理:
- 创建
CancellationTokenSource用于取消长时间运行的操作 - 使用
sessionId + ConnectionId作为唯一键存储取消令牌
AI 模型处理:
- 解析传入的模型值,默认使用
Gpt5Mini - 包含错误处理机制
消息存储和检索:
- 保存用户消息到数据库
- 获取会话历史记录(最近5条)
- 将消息转换为 AI 服务所需的格式
流式响应处理:
- 使用回调函数实时接收 AI 的增量响应
- 立即将增量内容推送给客户端
- 累积完整响应内容
完成处理:
- 保存 AI 的完整回复到数据库
- 为新会话生成标题
- 发送完成通知给客户端
2. 连接管理
连接建立 (OnConnectedAsync):
- 获取会话 ID(从 Cookie 中)
- 向客户端发送连接确认
连接断开 (OnDisconnectedAsync):
- 清理与该连接相关的所有取消令牌
- 记录断开连接的异常信息
- 发送断开通知
3. 辅助方法
ChatGptReceiverUser:
- 创建标准化的 ChatGPT 用户消息格式
- 包含头像、用户名等显示信息
ErrorMessage:
- 格式化错误消息,添加红色样式
关键特性
实时通信:使用 SignalR 实现客户端与服务器的实时双向通信
流式响应:支持 AI 的流式回复,用户可以实时看到 AI 的回答过程
取消机制:用户可以取消正在进行的 AI 请求
会话管理:支持多会话聊天,自动生成会话标题
错误处理:全面的异常处理和用户友好的错误消息
资源管理:正确处理取消令牌的生命周期,避免内存泄漏
这个 Hub 类是一个完整的 AI 聊天解决方案的核心组件,提供了现代 AI 聊天应用所需的主要功能。
评论加载中...