using Dpz.Core.Service.Network;
using Dpz.Core.Service.Network.Models;
using Dpz.Core.Web.Library.Api.Service;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
namespace Dpz.Core.Web.Library.Hub;
public class RobotChatHub(
ILogger<RobotChatHub> logger,
IAiChatService aiChatService,
IConfiguration configuration,
ChatCompletionService chatCompletionService,
GenerateSessionTitleService generateSessionTitleService
) : Microsoft.AspNetCore.SignalR.Hub
{
[Authorize]
public async Task SendStreamMessageToChatGpt(
string content,
int modelValue = 1,
string? sessionId = null
)
{
if (string.IsNullOrWhiteSpace(content))
{
await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("请输入内容")]);
return;
}
// if (!Context.User.IsLogin())
// {
// await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("尚未登录")]);
// return;
// }
// 创建取消令牌源并存储
var cts = new CancellationTokenSource();
var tokenKey = "";
try
{
var sender = Context.User.RequiredUserInfo;
// 判断是否为新的会话
var isNewSession = false;
if (string.IsNullOrEmpty(sessionId))
{
sessionId = Guid.NewGuid().ToString("N");
isNewSession = true;
}
// 使用 sessionId + ConnectionId 作为唯一标识
tokenKey = $"{sessionId}_{Context.ConnectionId}";
WebToolsExtensions.CancellationTokens[tokenKey] = cts;
// 立即检查是否已被取消(在存储后立即检查)
cts.Token.ThrowIfCancellationRequested();
// 如果是新会话,通知前端新的 SessionId
if (isNewSession)
{
await Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token);
}
// 解析模型枚举,如果失败则使用默认值 Gpt5Mini
AiModel selectedModel;
try
{
if (Enum.IsDefined(typeof(AiModel), modelValue))
{
selectedModel = (AiModel)modelValue;
}
else
{
selectedModel = AiModel.Gpt5Mini;
logger.LogWarning(
"无效的模型值 {Value},使用默认模型: {Model}",
modelValue,
selectedModel
);
}
}
catch (Exception e)
{
selectedModel = AiModel.Gpt5Mini;
logger.LogError(
e,
"解析模型值 {Value} 失败,使用默认模型: {Model}",
modelValue,
selectedModel
);
}
// 保存用户消息到会话
await aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender);
// 再次检查是否已被取消
cts.Token.ThrowIfCancellationRequested();
// 获取会话的历史记录
var records = await aiChatService.GetSessionMessagesAsync(sessionId, 1, 5);
var messages = records
.OrderBy(x => x.SendTime)
.Select(x => new ChatMessage
{
Role = x.Sender?.Id == sessionId ? "assistant" : "user",
Message = x.Message,
})
.ToList();
// 用于收集最终的完整回复
var fullContent = new StringBuilder();
// 流式调用 AI,每次收到增量内容就推送给客户端
var result = await chatCompletionService.SendStreamChatWithCallbackAsync(
messages,
async deltaContent =>
{
// 在发送前立即检查是否已取消
cts.Token.ThrowIfCancellationRequested();
// 累积完整内容
fullContent.Append(deltaContent);
// 实时推送增量内容给客户端
await Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token);
},
opt =>
{
opt.Stream = true;
opt.Model = selectedModel;
},
cts.Token
);
if (!result.Success)
{
await Clients.Caller.SendCoreAsync(
"SystemError",
[ChatGptReceiverUser(ErrorMessage(result.Message))],
cts.Token
);
return;
}
// 流式完成,推送完成标记
await Clients.Caller.SendCoreAsync("StreamCompleted", [], cts.Token);
// 保存完整回复到数据库
var finalContent = fullContent.ToString();
if (string.IsNullOrEmpty(finalContent))
{
return;
}
var aiUser = new VmUserInfo
{
Avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
Enable = true,
// 使用会话ID作为AI用户ID
Id = sessionId,
Key = "",
LastAccessTime = DateTime.Now,
Name = "AI Assistant",
Permissions = null,
};
await aiChatService.SaveAiMessageAsync(sender.Id, sessionId, finalContent, aiUser);
if (isNewSession)
{
var sessionName = await generateSessionTitleService.GenerateAsync(
sender.Id,
sessionId,
content,
finalContent,
selectedModel.ToString()
);
await Clients.Caller.SendCoreAsync(
"SessionNameGenerated",
[new { sessionId, sessionName }],
cts.Token
);
}
}
catch (OperationCanceledException)
{
logger.LogInformation(
"Stream message cancelled by user, Token Key: {TokenKey}",
tokenKey
);
await Clients.Caller.SendCoreAsync("StreamCancelled", [], cts.Token);
}
catch (Exception e)
{
logger.LogError(e, "Stream message to ChatGpt error");
await Clients.Caller.SendCoreAsync(
"SystemError",
[ChatGptReceiverUser(ErrorMessage(e.Message))],
cts.Token
);
}
finally
{
// 清理取消令牌源
WebToolsExtensions.CancellationTokens.TryRemove(tokenKey, out _);
cts.Dispose();
}
}
private object ChatGptReceiverUser(string message)
{
return new
{
id = ApplicationTools.ChatGptUserId,
avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
username = "ChatGPT",
type = "friend",
content = message,
mine = false,
};
}
private static string ErrorMessage(string? message)
{
return $"调用API出错,错误信息:<span style='color:red;'>{(message ?? "Call API fail")}</span>";
}
public override Task OnConnectedAsync()
{
var sessionId = Context
.GetHttpContext()
?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
Clients.Caller.SendCoreAsync("Connected", ["websocket connected.", sessionId]);
return base.OnConnectedAsync();
}
public override Task OnDisconnectedAsync(Exception? exception)
{
var sessionId = Context
.GetHttpContext()
?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
Clients.Caller.SendCoreAsync("Disconnected", ["websocket disconnected.", sessionId]);
if (exception != null)
{
logger.LogError(exception, "signalR disconnected");
}
// 清理当前连接的所有取消令牌
var connectionId = Context.ConnectionId;
var keysToRemove = WebToolsExtensions
.CancellationTokens.Keys.Where(k => k.EndsWith($"_{connectionId}"))
.ToList();
foreach (var key in keysToRemove)
{
if (WebToolsExtensions.CancellationTokens.TryRemove(key, out var cts))
{
cts.Dispose();
logger.LogInformation(
"Cleaned up cancellation token on disconnect, Token Key: {TokenKey}",
key
);
}
}
return base.OnDisconnectedAsync(exception);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
下面是对你这段 SignalR Hub 代码的逐步解释、关键行为说明以及一些可能的注意点和改进建议。整体上这段代码的目的是:接收前端对话请求,向 AI(ChatGPT 风格的服务)发起流式对话请求,把增量响应实时推送给客户端,并在会话完成后把最终回复保存到数据库,按需生成会话标题,同时支持客户端中途取消(通过取消令牌)以及在连接断开时清理相关资源。
总体结构
- 类名:RobotChatHub,继承自 SignalR 的 Hub。
- 构造函数注入:ILogger、IAiChatService、IConfiguration、ChatCompletionService、GenerateSessionTitleService。
- 核心方法:
- SendStreamMessageToChatGpt:处理来自客户端的消息并以流式方式将 AI 的增量回复推给客户端。
- OnConnectedAsync / OnDisconnectedAsync:处理连接/断开事件并做一些通知与清理。
- 辅助方法:ChatGptReceiverUser(构造一个表示 AI 机器人的响应对象)、ErrorMessage(格式化错误信息)。
SendStreamMessageToChatGpt 的执行流程(逐步):
参数与校验
- 接收 content、modelValue、可选 sessionId。
- 若 content 为空,则立即向调用端推送 "SystemError" 事件并返回。
- 方法标注了 [Authorize],要求用户已认证(此外代码中有注释掉的 IsLogin 校验)。
创建并注册取消令牌源
- 创建 CancellationTokenSource cts。
- 用 sessionId + ConnectionId 作为键(tokenKey)把 cts 存入 WebToolsExtensions.CancellationTokens(这是一个全局并发字典,用于后续取消/清理)。
- 在存入后立即检查是否已被取消(cts.Token.ThrowIfCancellationRequested)。
会话 ID 处理
- 如果没有传入 sessionId,就生成新的 GUID(作为新会话),并通过 Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token) 通知前端新的 sessionId。
- isNewSession 标记用于后续是否需要生成会话标题。
模型选择
- 根据传入的 modelValue 解析到 AiModel 枚举,若解析失败则回退到默认 AiModel.Gpt5Mini,并写日志(Try-Catch + Enum.IsDefined)。
将用户消息保存到会话(持久化)
- 调用 aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender)。
获取会话历史并构造 messages
- 读取最近的会话消息(GetSessionMessagesAsync(sessionId, 1, 5)),按时间升序排列,映射成 ChatMessage 列表,Role 字段用来标识是 assistant 还是 user(判断逻辑是 x.Sender?.Id == sessionId 则认为是 assistant)。
流式调用 AI(核心)
- 调用 chatCompletionService.SendStreamChatWithCallbackAsync,传入历史 messages、一个回调来接收增量 deltaContent、并设置 opt.Stream = true 和指定模型,以及传入 cts.Token 以支持取消。
- 回调中:每次收到增量内容都执行 cts.Token.ThrowIfCancellationRequested()、把增量追加到 fullContent(StringBuilder),并通过 Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token) 实时推送给客户端。
处理调用结果与结束
- 若 result.Success 为 false,发送 "SystemError" 给客户端并返回。
- 若成功:发送 "StreamCompleted" 标记,表示流式结束。
- 将 fullContent(最终的完整 AI 回复)保存到数据库(aiChatService.SaveAiMessageAsync),AI 用户信息(VmUserInfo)使用 sessionId 作为 AI 的 Id 并设置 Avatar、Name 等。
- 如果这是新会话,调用 generateSessionTitleService.GenerateAsync 生成会话名称,并通过 "SessionNameGenerated" 事件将 { sessionId, sessionName } 发给客户端。
异常与取消处理
- OperationCanceledException 捕获:记录日志并发送 "StreamCancelled" 事件给客户端。
- 其它 Exception 捕获:记录错误日志并通过 "SystemError" 发送格式化的错误信息。
- finally 中:从 WebToolsExtensions.CancellationTokens 移除 tokenKey,并 Dispose cts。
OnConnectedAsync / OnDisconnectedAsync 行为
- OnConnectedAsync:读取 Cookie 中可能的 SessionId,向调用方发送 "Connected" 事件(payload 包含连接信息与 sessionId)。
- OnDisconnectedAsync:同样发送 "Disconnected" 事件;若有异常则写 log。
- 在断开时,会寻找并清理与当前 ConnectionId 相关的所有取消令牌条目(键以 _{ConnectionId} 结尾),TryRemove 后 Dispose,并记录日志。
向前端发送的事件(主要)
- "SystemError":系统级错误(包含 ChatGptReceiverUser 格式的对象)。
- "NewSessionCreated":当服务端生成新的会话 ID 时发送。
- "StreamDelta":每次收到 AI 的增量内容时推送(payload:增量文本)。
- "StreamCompleted":流式结束标志。
- "SessionNameGenerated":为新会话生成的会话名和 sessionId。
- "StreamCancelled":用户取消流式请求。
- "Connected" / "Disconnected":连接建立与断开事件。
重要的实现细节与假设
- Context.User.RequiredUserInfo:代码假设存在用于获取当前用户信息的扩展(RequiredUserInfo),该对象用于获取用户 Id、昵称、头像等信息。
- WebToolsExtensions.CancellationTokens:这是一个共享的并发字典(例如 ConcurrentDictionary<string, CancellationTokenSource>),用于跨连接、跨请求管理取消令牌,使得客户端可以在其他地方基于 sessionId+connectionId 发起取消。
- chatCompletionService.SendStreamChatWithCallbackAsync:假定这是一个支持流式回调的自定义方法,能够在接收 AI 的增量输出时调用回调并最终返回一个包含 Success 标志的结果对象。
潜在问题与改进建议
C# 语法注意
- 代码中使用的类似 [sessionId]、[ChatGptReceiverUser(...)] 的写法不是标准 C# 数组字面量(C# 要用 new object[]{ ... })。如果真实代码中也是这样,会编译失败;可能是示例中为简洁写法,但需确保实际代码使用正确的数组/集合语法。
SendCoreAsync 的 await 问题
- 在 OnConnectedAsync / OnDisconnectedAsync 中对 Clients.Caller.SendCoreAsync 没有 await,可能导致未观察的异常或异步推送未完成。建议 await 这些调用或显式忽略返回 Task(并处理异常)。
取消令牌的生命周期
- 将 cts 存入全局字典后,必须确保在所有可能的结束路径(包括异常)都会移除并 Dispose,目前 finally 有处理,这很好。但要确保外部用于取消的逻辑(例如前端发送取消请求)能找到并调用相应的 CancellationTokenSource.Cancel()。
并发与资源清理
- WebToolsExtensions.CancellationTokens 的实现需要是线程安全的(比如 ConcurrentDictionary)。
- OnDisconnectedAsync 在清理 token 时查找后按键逐个 TryRemove 并 Dispose,这很合适。
Role 判定逻辑
- messages 中 Role 的判断:Role = x.Sender?.Id == sessionId ? "assistant" : "user"。这里假设 AI 的消息的 Sender.Id == sessionId(因为保存 AI 消息时使用了 sessionId 作为 AI 用户 Id),这在你当前实现下成立,但需要注意与 aiChatService.SaveAiMessageAsync 的行为保持一致。
错误信息泄露与格式
- ErrorMessage 将原始异常消息嵌入前端 HTML(含 ...)。需注意不要把敏感异常信息直接返回给前端以免泄露内部信息,或在生产环境中限制错误内容。
日志与监控
- 已有若干日志点(解析模型失败、异常、取消、清理 token 等),可以考虑在关键路径增加更多上下文以便排查(比如 sessionId、userId、modelValue 等,注意个人信息保护)。
可靠性
- 如果 chatCompletionService 的回调抛出异常或 Clients.Caller.SendCoreAsync 失败(客户端断开),需要确认这里的异常如何传播以及是否会导致 finally 被执行并正确清理令牌(一般会的,但要测试断连时的行为)。
如果你希望,我可以:
- 帮你检查并改写成可编译的 C# 版本(修复数组字面量等问题)。
- 推荐更完善的取消/回收策略或如何从前端触发取消(例如暴露一个 CancelStream 方法来取消 token)。
- 优化错误处理策略(比如统一错误码与本地化消息)。
- 添加更多日志点或度量(例如每次流的长度、耗时、失败率等)。
需要我从哪个方向继续帮助?例如:修复编译问题、增加取消接口示例、或改进错误处理。
评论加载中...