0%

Spring SseEmitter 消息推送,并实现大模型流式接口

最近在处理项目中,处理关于大模型流式接口的对接,并将处理后的数据,按照流式方式发送给前端。在了解实现方案中,发现基于 OkHttp Sse + Spring SseEmitter 的技术方案。OkHttp Sse 实现对大模型输出的流式结果进行监控处理,并通过 Spring SseEmitter 将处理后的结果输出到前端。

Spring SseEmitter

服务端主动推送:SSE (Server Send Event)。html5新标准,用来从服务端实时推送数据到浏览器端,直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议简单的服务器数据推送的场景,使用服务器推送事件, SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。

SseEmitter 是SpringMVC(4.2+)提供的一种技术,它是基于Http协议的,相比WebSocket,它更轻量,但是它只能从服务端向客户端单向发送信息。

案例

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 模拟流式结果输出
*/
@GetMapping(value = "stream_query", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter streamQuery() {
SseEmitter emitter = new SseEmitter(60000L);
Flux.interval(Duration.ofSeconds(1))
.map(data -> "Server Message # " + data)
.doOnCancel(emitter::complete)
.subscribe(data -> {
try {
emitter.send(SseEmitter.event().data(data));
} catch (Exception e) {
log.error("发送数据异常", e);
}
});
return emitter;
}

与 WebSocket 比较

Spring SseEmitter 和 WebSocket 都实现了服务端向客户端推送数据,不过 WebSocket 是 全双工通信,客户端和服务端可以互相通信。而 SseEmitter 只能从服务端向 客户端推送(该功能以及包含在 spring boot jar 中)。

OkHttp + Spring SseEmitter 实现大模型数据处理推送

不多说,直接先上代码,先看 OkHttp 相关的两块代码。第一部分是 OkHttp 建立 Http Sse 监听连接,第二部分是 OkHttp 关于事件监听的具体代码实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class OkHttpStreamClient {

private static final OkHttpClient HTTP_CLIENT;

static {
HTTP_CLIENT = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.retryOnConnectionFailure(true)
.build();
}

/**
* ok http 实现 sse 监听
*
* @param emitter SseEmitter
* @param url 请求地址
*/
public void stream(SseEmitter emitter, String url) {
Request request = new Request.Builder()
.url(url)
.get()
.build();
ChatEventSourceListener listener = new ChatEventSourceListener(emitter);
RealEventSource realEventSource = new RealEventSource(request, listener);
realEventSource.connect(HTTP_CLIENT);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class ChatEventSourceListener extends EventSourceListener {

private final SseEmitter emitter;

public ChatEventSourceListener(SseEmitter emitter) {
this.emitter = emitter;
}

@Override
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
log.info("建立sse连接...");
}

@Override
public void onEvent(@NotNull EventSource eventSource, String id, final String type, @NotNull String data) {
log.info("{}: {}", id, data);
// do something,对接收到的数据进行处理,并向客户端推送
try {
emitter.send(SseEmitter.event().data(data));
} catch (IOException e) {
log.error("数据发送异常");
}
}

@Override
public void onClosed(@NotNull EventSource eventSource) {
log.info("sse连接关闭...");
emitter.complete();
}

@Override
public void onFailure(@NotNull EventSource eventSource, Throwable t, Response response) {
log.error("出现异常,返回结果:{}", response, t);
}
}

OkHttp 的 EventSourceListener 有几种方法,入代码所示,可以在相关里面实现对代码的数据处理,最后将处理后的数据发送到客户端。

最终需要将接收到的数据,向客户端推送,推送的具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@RestController
@RequestMapping(value = "/chat")
public class ChatController {

private final OkHttpStreamClient okHttpStreamClient;

@Autowired
public ChatController(OkHttpStreamClient okHttpStreamClient) {
this.okHttpStreamClient = okHttpStreamClient;
}

/**
* 模拟流式结果输出
*/
@GetMapping(value = "chat", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter chat() {
SseEmitter emitter = new SseEmitter(60000L);
okHttpStreamClient.stream(emitter, "http://localhost:8888/chat/stream_query");
return emitter;
}
}

上面的代码简单的展示了,如果通过 okhttp sse 建立监听,并将处理后的数据推送到客户端的完整连接。

客官,赏一杯coffee嘛~~~~