using Dpz.Core.Service.Network;
using Dpz.Core.Service.Network.Models;
using Dpz.Core.Web.Library.Api.Service;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
namespace Dpz.Core.Web.Library.Hub;
public class RobotChatHub(
ILogger<RobotChatHub> logger,
IAiChatService aiChatService,
IConfiguration configuration,
ChatCompletionService chatCompletionService,
GenerateSessionTitleService generateSessionTitleService
) : Microsoft.AspNetCore.SignalR.Hub
{
[Authorize]
public async Task SendStreamMessageToChatGpt(
string content,
int modelValue = 1,
string? sessionId = null
)
{
if (string.IsNullOrWhiteSpace(content))
{
await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("请输入内容")]);
return;
}
// if (!Context.User.IsLogin())
// {
// await Clients.Caller.SendCoreAsync("SystemError", [ChatGptReceiverUser("尚未登录")]);
// return;
// }
// 创建取消令牌源并存储
var cts = new CancellationTokenSource();
var tokenKey = "";
try
{
var sender = Context.User.RequiredUserInfo;
// 判断是否为新的会话
var isNewSession = false;
if (string.IsNullOrEmpty(sessionId))
{
sessionId = Guid.NewGuid().ToString("N");
isNewSession = true;
}
// 使用 sessionId + ConnectionId 作为唯一标识
tokenKey = $"{sessionId}_{Context.ConnectionId}";
WebToolsExtensions.CancellationTokens[tokenKey] = cts;
// 立即检查是否已被取消(在存储后立即检查)
cts.Token.ThrowIfCancellationRequested();
// 如果是新会话,通知前端新的 SessionId
if (isNewSession)
{
await Clients.Caller.SendCoreAsync("NewSessionCreated", [sessionId], cts.Token);
}
// 解析模型枚举,如果失败则使用默认值 Gpt5Mini
AiModel selectedModel;
try
{
if (Enum.IsDefined(typeof(AiModel), modelValue))
{
selectedModel = (AiModel)modelValue;
}
else
{
selectedModel = AiModel.Gpt5Mini;
logger.LogWarning(
"无效的模型值 {Value},使用默认模型: {Model}",
modelValue,
selectedModel
);
}
}
catch (Exception e)
{
selectedModel = AiModel.Gpt5Mini;
logger.LogError(
e,
"解析模型值 {Value} 失败,使用默认模型: {Model}",
modelValue,
selectedModel
);
}
// 保存用户消息到会话
await aiChatService.SaveUserMessageAsync(sender.Id, sessionId, content, sender);
// 再次检查是否已被取消
cts.Token.ThrowIfCancellationRequested();
// 获取会话的历史记录
var records = await aiChatService.GetSessionMessagesAsync(sessionId, 1, 5);
var messages = records
.OrderBy(x => x.SendTime)
.Select(x => new ChatMessage
{
Role = x.Sender?.Id == sessionId ? "assistant" : "user",
Message = x.Message,
})
.ToList();
// 用于收集最终的完整回复
var fullContent = new StringBuilder();
// 流式调用 AI,每次收到增量内容就推送给客户端
var result = await chatCompletionService.SendStreamChatWithCallbackAsync(
messages,
async deltaContent =>
{
// 在发送前立即检查是否已取消
cts.Token.ThrowIfCancellationRequested();
// 累积完整内容
fullContent.Append(deltaContent);
// 实时推送增量内容给客户端
await Clients.Caller.SendCoreAsync("StreamDelta", [deltaContent], cts.Token);
},
opt =>
{
opt.Stream = true;
opt.Model = selectedModel;
},
cts.Token
);
if (!result.Success)
{
await Clients.Caller.SendCoreAsync(
"SystemError",
[ChatGptReceiverUser(ErrorMessage(result.Message))],
cts.Token
);
return;
}
// 流式完成,推送完成标记
await Clients.Caller.SendCoreAsync("StreamCompleted", [], cts.Token);
// 保存完整回复到数据库
var finalContent = fullContent.ToString();
if (string.IsNullOrEmpty(finalContent))
{
return;
}
var aiUser = new VmUserInfo
{
Avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
Enable = true,
// 使用会话ID作为AI用户ID
Id = sessionId,
Key = "",
LastAccessTime = DateTime.Now,
Name = "AI Assistant",
Permissions = null,
};
await aiChatService.SaveAiMessageAsync(sender.Id, sessionId, finalContent, aiUser);
if (isNewSession)
{
var sessionName = await generateSessionTitleService.GenerateAsync(
sender.Id,
sessionId,
content,
finalContent,
selectedModel.ToString()
);
await Clients.Caller.SendCoreAsync(
"SessionNameGenerated",
[new { sessionId, sessionName }],
cts.Token
);
}
}
catch (OperationCanceledException)
{
logger.LogInformation(
"Stream message cancelled by user, Token Key: {TokenKey}",
tokenKey
);
await Clients.Caller.SendCoreAsync("StreamCancelled", [], cts.Token);
}
catch (Exception e)
{
logger.LogError(e, "Stream message to ChatGpt error");
await Clients.Caller.SendCoreAsync(
"SystemError",
[ChatGptReceiverUser(ErrorMessage(e.Message))],
cts.Token
);
}
finally
{
// 清理取消令牌源
WebToolsExtensions.CancellationTokens.TryRemove(tokenKey, out _);
cts.Dispose();
}
}
private object ChatGptReceiverUser(string message)
{
return new
{
id = ApplicationTools.ChatGptUserId,
avatar = $"{configuration["CDNBaseAddress"]}/images/ChatGPT.ico",
username = "ChatGPT",
type = "friend",
content = message,
mine = false,
};
}
private static string ErrorMessage(string? message)
{
return $"调用API出错,错误信息:<span style='color:red;'>{(message ?? "Call API fail")}</span>";
}
public override Task OnConnectedAsync()
{
var sessionId = Context
.GetHttpContext()
?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
Clients.Caller.SendCoreAsync("Connected", ["websocket connected.", sessionId]);
return base.OnConnectedAsync();
}
public override Task OnDisconnectedAsync(Exception? exception)
{
var sessionId = Context
.GetHttpContext()
?.Request.Cookies[Program.AuthorizeCookieName + ".SessionId"];
Clients.Caller.SendCoreAsync("Disconnected", ["websocket disconnected.", sessionId]);
if (exception != null)
{
logger.LogError(exception, "signalR disconnected");
}
// 清理当前连接的所有取消令牌
var connectionId = Context.ConnectionId;
var keysToRemove = WebToolsExtensions
.CancellationTokens.Keys.Where(k => k.EndsWith($"_{connectionId}"))
.ToList();
foreach (var key in keysToRemove)
{
if (WebToolsExtensions.CancellationTokens.TryRemove(key, out var cts))
{
cts.Dispose();
logger.LogInformation(
"Cleaned up cancellation token on disconnect, Token Key: {TokenKey}",
key
);
}
}
return base.OnDisconnectedAsync(exception);
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
上述代码定义了一个 SignalR Hub 类 RobotChatHub,用于处理与聊天机器人(如讯飞和 ChatGPT)之间的实时消息交互。以下是代码的主要功能和结构的详细解释:
1. 类的构造函数
RobotChatHub 类的构造函数接受多个依赖项,包括日志记录器、聊天记录服务、配置、讯飞服务和分析服务。这些依赖项用于处理聊天请求、记录聊天记录和与外部服务交互。
2. 发送消息
SendMessage 方法
- 功能:处理用户发送的消息,调用讯飞的聊天服务,并将响应返回给用户。
- 流程:
- 生成一个会话 ID,优先使用用户的 ID。
- 调用
GetXunFeiRequestAsync方法构建请求。 - 调用
xunFeiService.ChatAsync方法与讯飞 API 交互。 - 如果成功,提取响应内容并发送给用户;如果失败,发送错误信息。
- 如果用户已登录,记录聊天记录。
SendMessageToChatGpt 方法
- 功能:处理用户发送给 ChatGPT 的消息。
- 流程:
- 检查内容是否为空和用户是否已登录。
- 记录用户发送的消息。
- 获取与 ChatGPT 的聊天记录,并构建消息列表。
- 调用
analyzeService.ChatAsync方法与 ChatGPT 交互。 - 处理响应并将结果发送给用户;记录 ChatGPT 的响应。
3. 构建请求
GetXunFeiRequestAsync 方法
- 功能:构建发送给讯飞 API 的请求对象。
- 流程:
- 获取用户的聊天记录,并构建消息列表。
- 将用户的问题添加到消息列表中。
- 返回构建好的
XunFeiRequest对象。
4. 用户信息和消息格式化
XunFeiReceiverUser和ChatGptReceiverUser方法用于格式化发送给客户端的消息,包含用户的头像、用户名和消息内容。XunFeiUser和ChatGptUser方法返回代表讯飞和 ChatGPT 的用户信息对象。
5. 错误处理
ErrorMessage方法用于格式化错误信息,以便在客户端显示。
6. 连接和断开连接
OnConnectedAsync和OnDisconnectedAsync方法用于处理用户连接和断开连接事件,记录连接状态并向客户端发送相应的消息。
总结
RobotChatHub 类实现了一个实时聊天系统的核心功能,允许用户与讯飞和 ChatGPT 进行交互。它处理消息的发送和接收,记录聊天记录,并在出现错误时提供反馈。通过 SignalR,用户可以实时接收来自聊天机器人的响应。
评论加载中...