简介

Server-Sent Events 服务器推送事件,简称 SSE,是一种服务端实时主动向浏览器推送消息的技术。

SSE运行在HTTP协议之上,它允许服务器以事件流(Event Stream)的形式将数据发送给客户端。客户端通过建立持久化的HTTP连接,并监听这个事件流,从而可以实时接收到服务器推送的数据。

SSE用途

ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢的多,普通 http 接口等待时间过长,显然并不合适。对于这种单项对话场景,ChagtGPT 将先计算出的数据“推送”给用户,边计算边返回,避免用户因为等待时间过长关闭页面。而这,可以采用 SSE 技术。

SSE 可以在 Web 应用程序中实现诸如股票在线数据、日志推送、聊天室实时人数等即时数据推送功能。股票📈k线的实时变化,彩票趋势走向,以及工业数据实时监控(受限于页面的数据获取范围面)和日志推送和CI/CD工作流进度的推送等相关单向推送需求的场景。

需要注意的是,SSE 并不是适用于所有的实时推送场景。在需要高并发、高吞吐量和低延迟的场景下,WebSockets 可能更加适合。而在需要更轻量级的推送场景下,SSE 可能更加适合。因此,在选择即时更新方案时,需要根据具体的需求和场景进行选择。

SSE与 WebSocket 比较

WebSocket SSE
基于TCP长连接通讯 基于HTTP协议
全双工,可以同时发送和接收消息 单工,只能服务端单向发送信息
相对复杂 轻量级,使用简单
不在协议范围内,需手动实现 内置断线重连和消息追踪功能
类型广泛 文本或使用 Base64 编码和 gzip 压缩的二进制消息
不支持自定义事件类型 支持自定义事件类型
连接数无限制 连接数 HTTP/1.1 6个,HTTP/2 可协商(默认100)

SSE的实现原理

以下是SSE(Server-Sent Events)的实现原理:

  • 连接建立:通常情况下,客户端(如浏览器)通过发送HTTP GET请求到服务器来请求建立一个SSE连接。
  • 服务器响应:一旦服务器接收到请求,它将返回一个HTTP响应,该响应的状态码为200,内容类型(Content-Type)设置为”text/event-stream”。
  • 数据推送:服务器可以通过已经建立的连接向客户端推送数据。每次推送的数据被称作一个事件(Event)。每个事件由一个或多个以”\n\n”分隔的数据块组成。每个数据块都是一行文本,可能包含一个以”:”开头的注释行、以”data:”开头的数据行,或者以”id:”和”event:”开头的行来指定事件ID和事件类型。
  • 客户端处理:当客户端接收到服务器推送的事件后,它会触发相应的JavaScript事件处理器来处理这些事件。
  • 重连:如果连接断开,客户端会自动尝试重新连接。如果服务器在事件中指定了ID,那么在重新连接时,客户端会发送一个”Last-Event-ID”的HTTP头部信息到服务器,告诉服务器客户端接收到的最后一个事件的ID。根据这个信息,服务器可以决定从哪个事件开始重新发送数据。

总结起来,SSE使用了基于文本和HTTP协议的简单机制,使得服务器能够实时地将数据推送到客户端,而无需客户端频繁地发起新的请求。

使用SSE的注意事项

以下是在使用SSE(Server-Sent Events)技术进行实时数据推送时需要注意的几个关键点:

  • 异步处理:由于SSE基于长连接的机制,因此数据推送过程可能会持续较长时间。为了防止服务器线程被阻塞,建议采用异步方式处理SSE请求。例如,可以在控制器方法中使用@Async注解或利用CompletableFuture等异步编程方式。
  • 超时处理:SSE连接可能会因网络中断、客户端关闭等原因而超时。为了避免无效连接占据服务器资源,建议设置超时时间并处理超时情况。例如,可以利用SseEmitter对象的setTimeout()方法设定超时时间,并通过onTimeout()方法处理超时逻辑。
  • 异常处理:在实际应用中,可能会遇到网络异常、数据推送失败等问题。这种情况下,可以使用SseEmitter对象的completeWithError()方法将异常信息发送给客户端,并在客户端通过eventSource.onerror事件进行处理。
  • 内存管理:在使用SseEmitter时,需要特别注意内存管理问题,尤其是在大量并发连接的场景下。当客户端断开连接后,务必及时释放SseEmitter对象,以避免资源泄漏和内存溢出。
  • 并发性能:SSE的并发连接数可能对服务器性能产生影响。如果需要处理大量并发连接,可以考虑使用线程池或其他异步处理方式,以最大化服务器资源利用。
  • 客户端兼容性:虽然大多数现代浏览器都支持SSE,但一些旧版本的浏览器可能不支持。因此,在使用SSE时,需要确保目标客户端对其有良好的支持,或者提供备选的实时数据推送机制。

SpringBoot 集成 SSE

下面给出一个 SpringBoot 集成 SSE 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public SseEmitter streamChatCompletions(ChatCompletionRequest completionRequest, String tenantToken) {
// 发起OpenaiStream请求,并使用sse发送给客户端
List<ChatCompletionChunk> chunks = Lists.newArrayList();
Flowable<ChatCompletionChunk> flowable = service.streamChatCompletion(completionRequest);
SseEmitter sseEmitter = new SseEmitter(120000L);
AtomicBoolean isCompleted = new AtomicBoolean(false);
// CAS 处理超时情况,避免无效连接持续占据服务器资源
sseEmitter.onTimeout(() -> {
if (isCompleted.compareAndSet(false, true)) {
sseEmitter.complete();
}
});

// 流数据处理
flowable.subscribe(chatCompletionChunk -> {
// 监听数据流
if (!Objects.isNull(chatCompletionChunk.getId())) {
boolean isStop = false;
if (chatCompletionChunk != null && chatCompletionChunk.getChoices() != null) {
isStop = chatCompletionChunk.getChoices().stream().anyMatch(choice -> choice != null && "stop".equals(choice.getFinishReason()));
}
if (isStop) {
String content = chunks.stream().map(item -> item.getChoices().stream().map(choice -> choice.getMessage().getContent()).filter(Objects::nonNull).collect(Collectors.joining())).collect(Collectors.joining());
List<ChatMessage> messages = completionRequest.getMessages();
}
chunks.add(chatCompletionChunk);
sseEmitter.send(chatCompletionChunk);
} else {
Thread.sleep(100);
sseEmitter.send(JROpenaiConstant.SSE_DONE);
if (isCompleted.compareAndSet(false, true)) {
sseEmitter.complete();
}
}
},throwable -> {
// 异常处理
if (throwable instanceof OpenAiHttpException) {
log.error("OpenAi异常");
sseEmitter.send(Result.failed(throwable.getMessage()));
} else {
// other throwable error
log.error(throwable.getMessage(), throwable);
sseEmitter.send(Result.failed(throwable.getMessage()));
}
Thread.sleep(100);
sseEmitter.send(JROpenaiConstant.SSE_DONE);
if (isCompleted.compareAndSet(false, true)) {
sseEmitter.complete();
}
});
// shutdown okhttp3.Dispatcher
service.shutdownExecutor();

return sseEmitter;
}