简介

Server-Sent Events 服务器推送事件,简称 SSE,是一种服务端实时主动向浏览器推送消息的技术。

SSE运行在HTTP协议之上,它允许服务器以事件流(Event Stream)的形式将数据发送给客户端。客户端通过建立持久化的HTTP连接,并监听这个事件流,从而可以实时接收到服务器推送的数据。

SSE用途

ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢的多,普通 http 接口等待时间过长,显然并不合适。对于这种单项对话场景,ChagtGPT 将先计算出的数据“推送”给用户,边计算边返回,避免用户因为等待时间过长关闭页面。而这,可以采用 SSE 技术。

SSE 可以在 Web 应用程序中实现诸如股票在线数据、日志推送、聊天室实时人数等即时数据推送功能。股票📈k线的实时变化,彩票趋势走向,以及工业数据实时监控(受限于页面的数据获取范围面)和日志推送和CI/CD工作流进度的推送等相关单向推送需求的场景。

需要注意的是,SSE 并不是适用于所有的实时推送场景。在需要高并发、高吞吐量和低延迟的场景下,WebSockets 可能更加适合。而在需要更轻量级的推送场景下,SSE 可能更加适合。因此,在选择即时更新方案时,需要根据具体的需求和场景进行选择。

SSE与 WebSocket 比较

WebSocket SSE
基于TCP长连接通讯 基于HTTP协议
全双工,可以同时发送和接收消息 单工,只能服务端单向发送信息
相对复杂 轻量级,使用简单
不在协议范围内,需手动实现 内置断线重连和消息追踪功能
类型广泛 文本或使用 Base64 编码和 gzip 压缩的二进制消息
不支持自定义事件类型 支持自定义事件类型
连接数无限制 连接数 HTTP/1.1 6个,HTTP/2 可协商(默认100)

SSE的实现原理

以下是SSE(Server-Sent Events)的实现原理:

  • 连接建立:通常情况下,客户端(如浏览器)通过发送HTTP GET请求到服务器来请求建立一个SSE连接。
  • 服务器响应:一旦服务器接收到请求,它将返回一个HTTP响应,该响应的状态码为200,内容类型(Content-Type)设置为”text/event-stream”。
  • 数据推送:服务器可以通过已经建立的连接向客户端推送数据。每次推送的数据被称作一个事件(Event)。每个事件由一个或多个以”\n\n”分隔的数据块组成。每个数据块都是一行文本,可能包含一个以”:”开头的注释行、以”data:”开头的数据行,或者以”id:”和”event:”开头的行来指定事件ID和事件类型。
  • 客户端处理:当客户端接收到服务器推送的事件后,它会触发相应的JavaScript事件处理器来处理这些事件。
  • 重连:如果连接断开,客户端会自动尝试重新连接。如果服务器在事件中指定了ID,那么在重新连接时,客户端会发送一个”Last-Event-ID”的HTTP头部信息到服务器,告诉服务器客户端接收到的最后一个事件的ID。根据这个信息,服务器可以决定从哪个事件开始重新发送数据。

总结起来,SSE使用了基于文本和HTTP协议的简单机制,使得服务器能够实时地将数据推送到客户端,而无需客户端频繁地发起新的请求。

使用SSE的注意事项

以下是在使用SSE(Server-Sent Events)技术进行实时数据推送时需要注意的几个关键点:

  • 异步处理:由于SSE基于长连接的机制,因此数据推送过程可能会持续较长时间。为了防止服务器线程被阻塞,建议采用异步方式处理SSE请求。例如,可以在控制器方法中使用@Async注解或利用CompletableFuture等异步编程方式。
  • 超时处理:SSE连接可能会因网络中断、客户端关闭等原因而超时。为了避免无效连接占据服务器资源,建议设置超时时间并处理超时情况。例如,可以利用SseEmitter对象的setTimeout()方法设定超时时间,并通过onTimeout()方法处理超时逻辑。
  • 异常处理:在实际应用中,可能会遇到网络异常、数据推送失败等问题。这种情况下,可以使用SseEmitter对象的completeWithError()方法将异常信息发送给客户端,并在客户端通过eventSource.onerror事件进行处理。
  • 内存管理:在使用SseEmitter时,需要特别注意内存管理问题,尤其是在大量并发连接的场景下。当客户端断开连接后,务必及时释放SseEmitter对象,以避免资源泄漏和内存溢出。
  • 并发性能:SSE的并发连接数可能对服务器性能产生影响。如果需要处理大量并发连接,可以考虑使用线程池或其他异步处理方式,以最大化服务器资源利用。
  • 客户端兼容性:虽然大多数现代浏览器都支持SSE,但一些旧版本的浏览器可能不支持。因此,在使用SSE时,需要确保目标客户端对其有良好的支持,或者提供备选的实时数据推送机制。

SpringBoot 集成 SSE

下面给出一个 SpringBoot 集成 SSE 的例子,这是一个 stream 流聊天的实现。

对于AI网关的实现,可以总结以下几个步骤:

  1. 找到合适的代理(一致性哈希)

  2. 找不到代理——配置兜底模型

  3. 调用相应的api,如果各个模型的接口地址不同需及时转换兼容

  4. 获取模型输出结果,如果没发生系统的特定异常(超过上下文限制、token限额、触发限流……),自动重试。这个流程需记录调用日志,发mq实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public SseEmitter streamChatCompletions(ChatCompletionRequest completionRequest, String tenantToken) {
Tenant tenantInfo = getTenant(tenantToken);
String modelCode = completionRequest.getModel();
// 查找可用代理(token + proxy)
Node node = openaiProxyService.getAppProxy(tenantInfo.getTenantId(), tenantToken, modelCode);
// 设置模型
completionRequest.setModel(node.getModel());
//日志初始化
AigcApiCallLog apiCallLog = buildLog(completionRequest, Constants.GPT_COMPLETIONS_ENDPOINT, tenantInfo, node, modelCode);
// 如果当前模型是:deepseek、moonshot等不支持json_schema模式的模型,需要将json_schema模式改为json模式
ChatCompletionResponseFormat responseFormat = completionRequest.getResponseFormat();
if ((completionRequest.getModel().contains("deepseek") || completionRequest.getModel().contains("moonshot"))
&& responseFormat != null && ResponseFormatType.JSON_SCHEMA.name().equalsIgnoreCase(responseFormat.getType())) {
String schema = "\n\n## 你需要使用json格式输出,json_schema如下:\n" + JSON.toJSONString(responseFormat.getJsonSchema().getSchema());
List<ChatMessage> messages = completionRequest.getMessages();
if (CollectionUtils.isNotEmpty(messages)) {
for (ChatMessage message : messages) {
if (message.getContent() instanceof String) {
message.setContent(message.getContent() + schema);
}
}
}
responseFormat.setType(ResponseFormatType.JSON_OBJECT.name().toLowerCase());
responseFormat.setJsonSchema(null);
log.info("当前模型不支持json_schema模式,已转为json_object模式:{}", JSON.toJSONString(completionRequest));
}
// 发起OpenaiStream请求,并使用sse发送给客户端
SseEmitter sseEmitter = new SseEmitter(300000L);
AtomicBoolean isCompleted = new AtomicBoolean(false);
sseEmitter.onTimeout(() -> {
throw new BusinessException(OpenApiErrorEnum.SYSTEM_TIMEOUT);
});
// 尝试执行:特定已知异常会进行重试
tryExecuteStreamChatCompletion(completionRequest, node, apiCallLog, sseEmitter, isCompleted, 0);
return sseEmitter;

这里做了一些流数据的处理,监听到流数据停止的时候会计算 token 消耗,并发送mq保存日志。如果发送某些特定异常会进行重试,并保证抛出异常也确保执行器正常关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
* 尝试请求streamChatCompletions接口
*/
private void tryExecuteStreamChatCompletion(ChatCompletionRequest completionRequest, Node node, AigcApiCallLog apiCallLog, SseEmitter sseEmitter, AtomicBoolean isCompleted, Integer tryTimes) {
Integer nextTryTimes = tryTimes + 1;
ForwardOpenAiService service = OpenAiServiceFactory.createForward(node.getToken(), node.getHost());

try {
Flowable<ChatCompletionChunk> flowable = service.streamChatCompletion(completionRequest);
List<ChatCompletionChunk> chunks = Lists.newArrayList();
String callId = IdUtil.fastSimpleUUID();
// 流数据处理
flowable.subscribe(chatCompletionChunk -> {
// 监听数据流
if (!Objects.isNull(chatCompletionChunk.getModel())) {
if (Objects.isNull(chatCompletionChunk.getId())) {
chatCompletionChunk.setId(callId);
}
chunks.add(chatCompletionChunk);
boolean isStop = false;
if (chatCompletionChunk.getChoices() != null) {
isStop = chatCompletionChunk.getChoices().stream().anyMatch(choice -> choice != null && StringUtils.isNotEmpty(choice.getFinishReason()));
// Gemini补充tool call id
for (ChatCompletionChoice choice : chatCompletionChunk.getChoices()) {
if (choice != null && StringUtils.isNotEmpty(choice.getFinishReason()) && choice.getMessage() instanceof AssistantMessage) {
AssistantMessage message = (AssistantMessage) choice.getMessage();
if (CollectionUtils.isNotEmpty(message.getToolCalls())) {
for (ToolCall toolCall : message.getToolCalls()) {
if (StrUtil.isEmpty(toolCall.getId())) {
toolCall.setId(IdUtil.fastSimpleUUID());
}
}
}
}
}
}
if (isStop) {
String content = chunks.stream().map(item -> item.getChoices().stream().map(choice -> choice.getMessage().getContentText()).filter(Objects::nonNull).collect(Collectors.joining())).collect(Collectors.joining());
List<ChatMessage> messages = completionRequest.getMessages();
//计算token消耗
String contentFromMessages = messages.stream().map(ChatMessage::getContentText).filter(Objects::nonNull).collect(Collectors.joining());
String combinedContent = contentFromMessages + content;
int tokens = tokens(combinedContent);

//返回token消耗
chatCompletionChunk.setTokenCost(new BigDecimal(tokens).setScale(2, BigDecimal.ROUND_HALF_UP));
apiCallLog.setResponse(content);
apiCallLog.setCallStatus(true);
apiCallLog.setResponseTime(LocalDateTime.now());
apiCallLog.setTokenCost((long) tokens);
pushApiCallLogToQueue(apiCallLog);
}
sseEmitter.send(chatCompletionChunk);
} else {
Thread.sleep(100);
sseEmitter.send(JROpenaiConstant.SSE_DONE);
if (isCompleted.compareAndSet(false, true)) {
sseEmitter.complete();
}
}
}, throwable -> {
// 异常处理
try {
if (throwable instanceof OpenAiHttpException) {
OpenAiHttpException openAiHttpException = (OpenAiHttpException) throwable;
OpenApiErrorEnum errorEnum = handleOpenAiHttpException(openAiHttpException, node, apiCallLog, tryTimes, MAX_TRY_TIMES, "流式聊天");

if (errorEnum == null) {
// 可以重试
node.setHost(backupProxyIp);
tryExecuteStreamChatCompletion(completionRequest, node, apiCallLog, sseEmitter, isCompleted, nextTryTimes);
return;
}

sseEmitter.completeWithError(new BusinessException(errorEnum));

// 记录日志并推送
OpenAiError openAiError = OpenAiError.fromOpenAiException(openAiHttpException);
String errMsg = StrUtil.format("{}:{}", openAiError.getError().getType(), openAiError.getError().getMessage());
apiCallLog.setResponse(errMsg);
pushApiCallLogToQueue(apiCallLog);
} else {
if (handleGeneralException((Exception) throwable, node, tryTimes, "流式聊天")) {
tryExecuteStreamChatCompletion(completionRequest, node, apiCallLog, sseEmitter, isCompleted, nextTryTimes);
return;
}

// 客户端断开连接的特殊处理
String message = throwable.getMessage();
if (throwable instanceof org.apache.catalina.connector.ClientAbortException ||
(message != null && message.contains("Broken pipe"))) {
log.debug("客户端主动断开连接,无需处理: {}", message);
if (isCompleted.compareAndSet(false, true)) {
try {
sseEmitter.complete();
} catch (Exception ex) {
log.debug("关闭SSE连接时发生异常,可以忽略", ex);
}
}
return;
}

// 错误日志
apiCallLog.setResponse(message);
apiCallLog.setCallStatus(false);
apiCallLog.setResponseTime(LocalDateTime.now());
pushApiCallLogToQueue(apiCallLog);
sseEmitter.completeWithError(new BusinessException(OpenApiErrorEnum.SYSTEM_UNKNOWN_ERROR));
}
} finally {
// 即使在抛出异常时也确保执行器关闭
service.shutdownExecutor();
}
});
} catch (Exception e) {
// 捕获初始化流程中可能出现的异常
log.error("初始化流式聊天请求时发生异常:", e);
apiCallLog.setResponse(e.getMessage());
apiCallLog.setCallStatus(false);
apiCallLog.setResponseTime(LocalDateTime.now());
pushApiCallLogToQueue(apiCallLog);
sseEmitter.completeWithError(new BusinessException(OpenApiErrorEnum.SYSTEM_UNKNOWN_ERROR));
service.shutdownExecutor();
}
}

一致性哈希查找代理节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
public Node getAppProxy(Integer tenantId, String account, String modelCode) {
// 先获取模型配置
ModelConfig modelConfig = selectModelVersion(modelCode);
String gptModelVersion = modelConfig.getGptModelVersion();

// 修改:通过模型和租户id查询代理
List<OpenaiProxy> openaiProxies = selectAppProxiesByTenantIdAndModelCode(tenantId, modelConfig);

// 进行兜底逻辑处理
if (CollectionUtil.isEmpty(openaiProxies)) {
if (modelConfig.getModelType() == 1) {
log.warn("找不到指定的代理:模型信息{} | {} \n将使用默认模型完成该次请求", modelConfig, gptModelVersion);
// 添加兜底逻辑:
// 1.如果当前是gpt-前缀的模型,默认deepseek-chat兜底
// 2.如果目前是deepseek-chat,那么兜底使用gpt-4o-mini
// 3.避免死循环逻辑,如果fallback后与当前模型相同则抛异常
String fallbackVersion;
if (gptModelVersion.startsWith("gpt-")) {
fallbackVersion = fallbackModelCode; // deepseek-chat
} else if ("deepseek-chat".equalsIgnoreCase(gptModelVersion)) {
fallbackVersion = "gpt-4o-mini";
} else {
fallbackVersion = fallbackModelCode; // 默认情况
}
// 避免死循环
if (fallbackVersion.equalsIgnoreCase(gptModelVersion)) {
throw new BusinessException(OpenApiErrorEnum.SYSTEM_ACCOUNT_ERROR);
}

gptModelVersion = fallbackVersion;
OpenaiProxy openaiProxy = OpenaiProxy.builder()
.host(fallbackModelHost)
.token(fallbackModelApiKey)
.gptModelVersion(gptModelVersion)
.build();
openaiProxies = Stream.of(openaiProxy).collect(Collectors.toList());
} else {
// 否则,抛出异常
throw new BusinessException(OpenApiErrorEnum.SYSTEM_ACCOUNT_ERROR);
}
}

// 使用一致性Hash选取一个合适的代理节点
ConsistentHash<Node> consistentHash = ProxyUtil.makeProxyPool(openaiProxies);
Node node = consistentHash.get(account);

// 找到所有与该代理host相同的代理,以实现多token轮询
List<Node> tokenNodes = openaiProxies.stream()
.filter(openaiProxy -> openaiProxy.getHost().equals(node.getHost()))
.map(openaiProxy -> new Node(openaiProxy.getHost(), openaiProxy.getToken()))
.collect(Collectors.toList());

// 从相同host的代理列表中随机选取一个token
int randomKey = (int) (Math.random() * tokenNodes.size());
Node proxyNode = tokenNodes.get(randomKey);
proxyNode.setModel(gptModelVersion);

return proxyNode;
}

创建 node 节点代理池方法。

1
2
3
4
public static ConsistentHash<Node> makeProxyPool(List<OpenaiProxy> openaiProxies) {
List<Node> realNodes = openaiProxies.stream().map(item -> new Node(item.getHost(), item.getToken())).collect(Collectors.toList());
return new ConsistentHash<>(500, realNodes);
}