简介
Server-Sent Events 服务器推送事件,简称 SSE,是一种服务端实时主动向浏览器推送消息的技术。
SSE运行在HTTP协议之上,它允许服务器以事件流(Event Stream)的形式将数据发送给客户端。客户端通过建立持久化的HTTP连接,并监听这个事件流,从而可以实时接收到服务器推送的数据。
SSE用途
ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢的多,普通 http 接口等待时间过长,显然并不合适。对于这种单项对话场景,ChagtGPT 将先计算出的数据“推送”给用户,边计算边返回,避免用户因为等待时间过长关闭页面。而这,可以采用 SSE 技术。
SSE 可以在 Web 应用程序中实现诸如股票在线数据、日志推送、聊天室实时人数等即时数据推送功能。股票📈k线的实时变化,彩票趋势走向,以及工业数据实时监控(受限于页面的数据获取范围面)和日志推送和CI/CD工作流进度的推送等相关单向推送需求的场景。
需要注意的是,SSE 并不是适用于所有的实时推送场景。在需要高并发、高吞吐量和低延迟的场景下,WebSockets 可能更加适合。而在需要更轻量级的推送场景下,SSE 可能更加适合。因此,在选择即时更新方案时,需要根据具体的需求和场景进行选择。
SSE与 WebSocket 比较
| WebSocket |
SSE |
| 基于TCP长连接通讯 |
基于HTTP协议 |
| 全双工,可以同时发送和接收消息 |
单工,只能服务端单向发送信息 |
| 相对复杂 |
轻量级,使用简单 |
| 不在协议范围内,需手动实现 |
内置断线重连和消息追踪功能 |
| 类型广泛 |
文本或使用 Base64 编码和 gzip 压缩的二进制消息 |
| 不支持自定义事件类型 |
支持自定义事件类型 |
| 连接数无限制 |
连接数 HTTP/1.1 6个,HTTP/2 可协商(默认100) |
SSE的实现原理
以下是SSE(Server-Sent Events)的实现原理:
- 连接建立:通常情况下,客户端(如浏览器)通过发送HTTP GET请求到服务器来请求建立一个SSE连接。
- 服务器响应:一旦服务器接收到请求,它将返回一个HTTP响应,该响应的状态码为200,内容类型(Content-Type)设置为”text/event-stream”。
- 数据推送:服务器可以通过已经建立的连接向客户端推送数据。每次推送的数据被称作一个事件(Event)。每个事件由一个或多个以”\n\n”分隔的数据块组成。每个数据块都是一行文本,可能包含一个以”:”开头的注释行、以”data:”开头的数据行,或者以”id:”和”event:”开头的行来指定事件ID和事件类型。
- 客户端处理:当客户端接收到服务器推送的事件后,它会触发相应的JavaScript事件处理器来处理这些事件。
- 重连:如果连接断开,客户端会自动尝试重新连接。如果服务器在事件中指定了ID,那么在重新连接时,客户端会发送一个”Last-Event-ID”的HTTP头部信息到服务器,告诉服务器客户端接收到的最后一个事件的ID。根据这个信息,服务器可以决定从哪个事件开始重新发送数据。
总结起来,SSE使用了基于文本和HTTP协议的简单机制,使得服务器能够实时地将数据推送到客户端,而无需客户端频繁地发起新的请求。
使用SSE的注意事项
以下是在使用SSE(Server-Sent Events)技术进行实时数据推送时需要注意的几个关键点:
- 异步处理:由于SSE基于长连接的机制,因此数据推送过程可能会持续较长时间。为了防止服务器线程被阻塞,建议采用异步方式处理SSE请求。例如,可以在控制器方法中使用@Async注解或利用CompletableFuture等异步编程方式。
- 超时处理:SSE连接可能会因网络中断、客户端关闭等原因而超时。为了避免无效连接占据服务器资源,建议设置超时时间并处理超时情况。例如,可以利用SseEmitter对象的setTimeout()方法设定超时时间,并通过onTimeout()方法处理超时逻辑。
- 异常处理:在实际应用中,可能会遇到网络异常、数据推送失败等问题。这种情况下,可以使用SseEmitter对象的completeWithError()方法将异常信息发送给客户端,并在客户端通过eventSource.onerror事件进行处理。
- 内存管理:在使用SseEmitter时,需要特别注意内存管理问题,尤其是在大量并发连接的场景下。当客户端断开连接后,务必及时释放SseEmitter对象,以避免资源泄漏和内存溢出。
- 并发性能:SSE的并发连接数可能对服务器性能产生影响。如果需要处理大量并发连接,可以考虑使用线程池或其他异步处理方式,以最大化服务器资源利用。
- 客户端兼容性:虽然大多数现代浏览器都支持SSE,但一些旧版本的浏览器可能不支持。因此,在使用SSE时,需要确保目标客户端对其有良好的支持,或者提供备选的实时数据推送机制。
SpringBoot 集成 SSE
下面给出一个 SpringBoot 集成 SSE 的例子,这是一个 stream 流聊天的实现。
对于AI网关的实现,可以总结以下几个步骤:
找到合适的代理(一致性哈希)
找不到代理——配置兜底模型
调用相应的api,如果各个模型的接口地址不同需及时转换兼容
获取模型输出结果,如果没发生系统的特定异常(超过上下文限制、token限额、触发限流……),自动重试。这个流程需记录调用日志,发mq实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Override public SseEmitter streamChatCompletions(ChatCompletionRequest completionRequest, String tenantToken) { Tenant tenantInfo = getTenant(tenantToken); String modelCode = completionRequest.getModel(); Node node = openaiProxyService.getAppProxy(tenantInfo.getTenantId(), tenantToken, modelCode); completionRequest.setModel(node.getModel()); AigcApiCallLog apiCallLog = buildLog(completionRequest, Constants.GPT_COMPLETIONS_ENDPOINT, tenantInfo, node, modelCode); ChatCompletionResponseFormat responseFormat = completionRequest.getResponseFormat(); if ((completionRequest.getModel().contains("deepseek") || completionRequest.getModel().contains("moonshot")) && responseFormat != null && ResponseFormatType.JSON_SCHEMA.name().equalsIgnoreCase(responseFormat.getType())) { String schema = "\n\n## 你需要使用json格式输出,json_schema如下:\n" + JSON.toJSONString(responseFormat.getJsonSchema().getSchema()); List<ChatMessage> messages = completionRequest.getMessages(); if (CollectionUtils.isNotEmpty(messages)) { for (ChatMessage message : messages) { if (message.getContent() instanceof String) { message.setContent(message.getContent() + schema); } } } responseFormat.setType(ResponseFormatType.JSON_OBJECT.name().toLowerCase()); responseFormat.setJsonSchema(null); log.info("当前模型不支持json_schema模式,已转为json_object模式:{}", JSON.toJSONString(completionRequest)); } SseEmitter sseEmitter = new SseEmitter(300000L); AtomicBoolean isCompleted = new AtomicBoolean(false); sseEmitter.onTimeout(() -> { throw new BusinessException(OpenApiErrorEnum.SYSTEM_TIMEOUT); }); tryExecuteStreamChatCompletion(completionRequest, node, apiCallLog, sseEmitter, isCompleted, 0); return sseEmitter;
|
这里做了一些流数据的处理,监听到流数据停止的时候会计算 token 消耗,并发送mq保存日志。如果发送某些特定异常会进行重试,并保证抛出异常也确保执行器正常关闭。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
private void tryExecuteStreamChatCompletion(ChatCompletionRequest completionRequest, Node node, AigcApiCallLog apiCallLog, SseEmitter sseEmitter, AtomicBoolean isCompleted, Integer tryTimes) { Integer nextTryTimes = tryTimes + 1; ForwardOpenAiService service = OpenAiServiceFactory.createForward(node.getToken(), node.getHost());
try { Flowable<ChatCompletionChunk> flowable = service.streamChatCompletion(completionRequest); List<ChatCompletionChunk> chunks = Lists.newArrayList(); String callId = IdUtil.fastSimpleUUID(); flowable.subscribe(chatCompletionChunk -> { if (!Objects.isNull(chatCompletionChunk.getModel())) { if (Objects.isNull(chatCompletionChunk.getId())) { chatCompletionChunk.setId(callId); } chunks.add(chatCompletionChunk); boolean isStop = false; if (chatCompletionChunk.getChoices() != null) { isStop = chatCompletionChunk.getChoices().stream().anyMatch(choice -> choice != null && StringUtils.isNotEmpty(choice.getFinishReason())); for (ChatCompletionChoice choice : chatCompletionChunk.getChoices()) { if (choice != null && StringUtils.isNotEmpty(choice.getFinishReason()) && choice.getMessage() instanceof AssistantMessage) { AssistantMessage message = (AssistantMessage) choice.getMessage(); if (CollectionUtils.isNotEmpty(message.getToolCalls())) { for (ToolCall toolCall : message.getToolCalls()) { if (StrUtil.isEmpty(toolCall.getId())) { toolCall.setId(IdUtil.fastSimpleUUID()); } } } } } } if (isStop) { String content = chunks.stream().map(item -> item.getChoices().stream().map(choice -> choice.getMessage().getContentText()).filter(Objects::nonNull).collect(Collectors.joining())).collect(Collectors.joining()); List<ChatMessage> messages = completionRequest.getMessages(); String contentFromMessages = messages.stream().map(ChatMessage::getContentText).filter(Objects::nonNull).collect(Collectors.joining()); String combinedContent = contentFromMessages + content; int tokens = tokens(combinedContent);
chatCompletionChunk.setTokenCost(new BigDecimal(tokens).setScale(2, BigDecimal.ROUND_HALF_UP)); apiCallLog.setResponse(content); apiCallLog.setCallStatus(true); apiCallLog.setResponseTime(LocalDateTime.now()); apiCallLog.setTokenCost((long) tokens); pushApiCallLogToQueue(apiCallLog); } sseEmitter.send(chatCompletionChunk); } else { Thread.sleep(100); sseEmitter.send(JROpenaiConstant.SSE_DONE); if (isCompleted.compareAndSet(false, true)) { sseEmitter.complete(); } } }, throwable -> { try { if (throwable instanceof OpenAiHttpException) { OpenAiHttpException openAiHttpException = (OpenAiHttpException) throwable; OpenApiErrorEnum errorEnum = handleOpenAiHttpException(openAiHttpException, node, apiCallLog, tryTimes, MAX_TRY_TIMES, "流式聊天");
if (errorEnum == null) { node.setHost(backupProxyIp); tryExecuteStreamChatCompletion(completionRequest, node, apiCallLog, sseEmitter, isCompleted, nextTryTimes); return; }
sseEmitter.completeWithError(new BusinessException(errorEnum));
OpenAiError openAiError = OpenAiError.fromOpenAiException(openAiHttpException); String errMsg = StrUtil.format("{}:{}", openAiError.getError().getType(), openAiError.getError().getMessage()); apiCallLog.setResponse(errMsg); pushApiCallLogToQueue(apiCallLog); } else { if (handleGeneralException((Exception) throwable, node, tryTimes, "流式聊天")) { tryExecuteStreamChatCompletion(completionRequest, node, apiCallLog, sseEmitter, isCompleted, nextTryTimes); return; }
String message = throwable.getMessage(); if (throwable instanceof org.apache.catalina.connector.ClientAbortException || (message != null && message.contains("Broken pipe"))) { log.debug("客户端主动断开连接,无需处理: {}", message); if (isCompleted.compareAndSet(false, true)) { try { sseEmitter.complete(); } catch (Exception ex) { log.debug("关闭SSE连接时发生异常,可以忽略", ex); } } return; }
apiCallLog.setResponse(message); apiCallLog.setCallStatus(false); apiCallLog.setResponseTime(LocalDateTime.now()); pushApiCallLogToQueue(apiCallLog); sseEmitter.completeWithError(new BusinessException(OpenApiErrorEnum.SYSTEM_UNKNOWN_ERROR)); } } finally { service.shutdownExecutor(); } }); } catch (Exception e) { log.error("初始化流式聊天请求时发生异常:", e); apiCallLog.setResponse(e.getMessage()); apiCallLog.setCallStatus(false); apiCallLog.setResponseTime(LocalDateTime.now()); pushApiCallLogToQueue(apiCallLog); sseEmitter.completeWithError(new BusinessException(OpenApiErrorEnum.SYSTEM_UNKNOWN_ERROR)); service.shutdownExecutor(); } }
|
一致性哈希查找代理节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| @Override public Node getAppProxy(Integer tenantId, String account, String modelCode) { ModelConfig modelConfig = selectModelVersion(modelCode); String gptModelVersion = modelConfig.getGptModelVersion(); List<OpenaiProxy> openaiProxies = selectAppProxiesByTenantIdAndModelCode(tenantId, modelConfig);
if (CollectionUtil.isEmpty(openaiProxies)) { if (modelConfig.getModelType() == 1) { log.warn("找不到指定的代理:模型信息{} | {} \n将使用默认模型完成该次请求", modelConfig, gptModelVersion); String fallbackVersion; if (gptModelVersion.startsWith("gpt-")) { fallbackVersion = fallbackModelCode; } else if ("deepseek-chat".equalsIgnoreCase(gptModelVersion)) { fallbackVersion = "gpt-4o-mini"; } else { fallbackVersion = fallbackModelCode; } if (fallbackVersion.equalsIgnoreCase(gptModelVersion)) { throw new BusinessException(OpenApiErrorEnum.SYSTEM_ACCOUNT_ERROR); }
gptModelVersion = fallbackVersion; OpenaiProxy openaiProxy = OpenaiProxy.builder() .host(fallbackModelHost) .token(fallbackModelApiKey) .gptModelVersion(gptModelVersion) .build(); openaiProxies = Stream.of(openaiProxy).collect(Collectors.toList()); } else { throw new BusinessException(OpenApiErrorEnum.SYSTEM_ACCOUNT_ERROR); } }
ConsistentHash<Node> consistentHash = ProxyUtil.makeProxyPool(openaiProxies); Node node = consistentHash.get(account);
List<Node> tokenNodes = openaiProxies.stream() .filter(openaiProxy -> openaiProxy.getHost().equals(node.getHost())) .map(openaiProxy -> new Node(openaiProxy.getHost(), openaiProxy.getToken())) .collect(Collectors.toList());
int randomKey = (int) (Math.random() * tokenNodes.size()); Node proxyNode = tokenNodes.get(randomKey); proxyNode.setModel(gptModelVersion);
return proxyNode; }
|
创建 node 节点代理池方法。
1 2 3 4
| public static ConsistentHash<Node> makeProxyPool(List<OpenaiProxy> openaiProxies) { List<Node> realNodes = openaiProxies.stream().map(item -> new Node(item.getHost(), item.getToken())).collect(Collectors.toList()); return new ConsistentHash<>(500, realNodes); }
|