最近在处理项目中,处理关于大模型流式接口的对接,并将处理后的数据,按照流式方式发送给前端。在了解实现方案中,发现基于 OkHttp Sse + Spring SseEmitter 的技术方案。OkHttp Sse 实现对大模型输出的流式结果进行监控处理,并通过 Spring SseEmitter 将处理后的结果输出到前端。
Spring SseEmitter
服务端主动推送:SSE (Server Send Event)。html5新标准,用来从服务端实时推送数据到浏览器端,直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议简单的服务器数据推送的场景,使用服务器推送事件, SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
SseEmitter 是SpringMVC(4.2+)提供的一种技术,它是基于Http协议的,相比WebSocket,它更轻量,但是它只能从服务端向客户端单向发送信息。
案例
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@GetMapping(value = "stream_query", produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) public SseEmitter streamQuery() { SseEmitter emitter = new SseEmitter(60000L); Flux.interval(Duration.ofSeconds(1)) .map(data -> "Server Message # " + data) .doOnCancel(emitter::complete) .subscribe(data -> { try { emitter.send(SseEmitter.event().data(data)); } catch (Exception e) { log.error("发送数据异常", e); } }); return emitter; }
|
与 WebSocket 比较
Spring SseEmitter 和 WebSocket 都实现了服务端向客户端推送数据,不过 WebSocket 是 全双工通信,客户端和服务端可以互相通信。而 SseEmitter 只能从服务端向 客户端推送(该功能以及包含在 spring boot jar 中)。
OkHttp + Spring SseEmitter 实现大模型数据处理推送
不多说,直接先上代码,先看 OkHttp 相关的两块代码。第一部分是 OkHttp 建立 Http Sse 监听连接,第二部分是 OkHttp 关于事件监听的具体代码实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Component public class OkHttpStreamClient {
private static final OkHttpClient HTTP_CLIENT;
static { HTTP_CLIENT = new OkHttpClient.Builder() .connectTimeout(60, TimeUnit.SECONDS) .readTimeout(60, TimeUnit.SECONDS) .writeTimeout(60, TimeUnit.SECONDS) .retryOnConnectionFailure(true) .build(); }
public void stream(SseEmitter emitter, String url) { Request request = new Request.Builder() .url(url) .get() .build(); ChatEventSourceListener listener = new ChatEventSourceListener(emitter); RealEventSource realEventSource = new RealEventSource(request, listener); realEventSource.connect(HTTP_CLIENT); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Slf4j public class ChatEventSourceListener extends EventSourceListener {
private final SseEmitter emitter;
public ChatEventSourceListener(SseEmitter emitter) { this.emitter = emitter; }
@Override public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) { log.info("建立sse连接..."); }
@Override public void onEvent(@NotNull EventSource eventSource, String id, final String type, @NotNull String data) { log.info("{}: {}", id, data); try { emitter.send(SseEmitter.event().data(data)); } catch (IOException e) { log.error("数据发送异常"); } }
@Override public void onClosed(@NotNull EventSource eventSource) { log.info("sse连接关闭..."); emitter.complete(); }
@Override public void onFailure(@NotNull EventSource eventSource, Throwable t, Response response) { log.error("出现异常,返回结果:{}", response, t); } }
|
OkHttp 的 EventSourceListener 有几种方法,入代码所示,可以在相关里面实现对代码的数据处理,最后将处理后的数据发送到客户端。
最终需要将接收到的数据,向客户端推送,推送的具体代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Slf4j @RestController @RequestMapping(value = "/chat") public class ChatController {
private final OkHttpStreamClient okHttpStreamClient;
@Autowired public ChatController(OkHttpStreamClient okHttpStreamClient) { this.okHttpStreamClient = okHttpStreamClient; }
/** * 模拟流式结果输出 */ @GetMapping(value = "chat", produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) public SseEmitter chat() { SseEmitter emitter = new SseEmitter(60000L); okHttpStreamClient.stream(emitter, "http://localhost:8888/chat/stream_query"); return emitter; } }
|
上面的代码简单的展示了,如果通过 okhttp sse 建立监听,并将处理后的数据推送到客户端的完整连接。