SpringBoot手动实现流式输出方案整理以及SSE规范输出详解

article/2025/7/13 5:42:17

背景:

最近做流式输出时,一直使用python实现的,应需求方的要求,需要通过java应用做一次封装并在java侧完成系统鉴权、模型鉴权等功能后才能真正去调用智能体应用,基于此调研java实现流式输出的几种方式,并完成与python服务对接的方案。

方案:

  • 使用Servlet原生API实现流式输出
  • 使用ResponseBodyEmitter实现异步流式输出
  • 使用SseEmitter实现服务器发送事件(SSE)
  • 使用WebFlux实现响应式流式输出
  • 使用Spring MVC的StreamingResponseBody
  • websockt

说一下我的业务场景,我原本的前后端适配已经按照SSE规范完成了功能,因此新写接口时也采用SSE规范,避免同一个系统中前端出现多种方式的调用,而且我的python微服务采用SSE规范,当时第一反应采用Feign去调用接口返回即可,但是使用后发现Openfeign支持这种调用不友好,因此接口对接这里采用的是WebClient。因此本文着重说一下SSE规范调用

一、SSE是什么

SSE (Server-Sent Events) 是一种基于HTTP的服务器向客户端推送数据的Web技术规范,它允许服务器单向地向客户端发送事件流。以下是SSE规范的全面解析:

1.基本概念

SSE是HTML5标准的一部分,主要特点包括:

  • 单向通信:仅服务器→客户端方向

  • 基于HTTP:使用普通HTTP连接

  • 文本协议:事件以纯文本格式传输

  • 自动重连:内置连接恢复机制

  • 简单易用:比WebSocket更轻量级

2. 协议格式

SSE事件流是一个UTF-8编码的文本流,包含以下字段(每个字段以\n结尾):

event: message\n
id: 123\n
retry: 5000\n
data: {\n
data: "name": "John",\n
data: "age": 30\n
data: }\n\n

  • data: 有效载荷内容(可多行,每行需加"data: "前缀)

  • event: 自定义事件类型(默认"message")

  • id: 事件ID(用于断线重连时定位)

  • retry: 重连时间(毫秒)

服务器响为:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive 

3.客户端API

浏览器端JavaScript使用EventSource接口:

const eventSource = new EventSource('/sse-endpoint');// 监听默认事件
eventSource.onmessage = (e) => {console.log('Message:', e.data);
};// 监听自定义事件
eventSource.addEventListener('customEvent', (e) => {console.log('Custom event:', e.data);
});// 错误处理
eventSource.onerror = (e) => {console.error('SSE error:', e);
};

4.与相关技术的对比

特性SSEWebSocketLong Polling
方向单向(服务器→客户端)双向单向(轮询)
协议HTTPWS/WSSHTTP
连接管理自动重连需手动处理每次请求新建连接
数据格式文本二进制/文本文本
复杂度

5. 适用场景

SSE特别适合:

  • 实时通知(新闻、股价、天气)

  • 日志流监控

  • 进度报告(文件处理、任务执行)

  • 社交媒体动态更新

  • 需要简单实时功能但不需要双向通信的场景

虽然WebSocket更强大,但SSE仍有很多优势:

  • 更简单的实现

  • 自动利用HTTP/2的多路复用

  • 不需要额外的协议升级

  • 被所有现代浏览器支持(IE除外)

二、WebClient ‌

1.概念

WebClient ‌是 Spring Framework 5中引入的一个基于响应式编程模型的 HTTP客户端 ,主要用于执行HTTP请求。相比传统的 RestTemplate ,WebClient采用了 Reactor库 ,支持非阻塞式(异步)调用,能够充分利用多核CPU资源,特别适合高并发场景。

2.与OpenFeign比较

推荐方案:优先使用WebClient + Service分层架构
原因:WebClient原生支持响应式流处理,更适合SSE场景,而OpenFeign更适合普通REST调用

备选方案:使用OpenFeign(需要特殊配置)
注意:需要Spring Cloud 2020.0.3+版本和响应式Feign支持

特性WebClient方案OpenFeign方案
响应式支持✅ 原生支持⚠️ 需要特殊配置
代码复杂度简单较复杂
维护性
性能高(非阻塞IO)中等
连接池管理自动需要手动配置
适合场景高并发流式处理简单接口调用

三、代码实现

1.基础实现

@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

2.业务进阶

2.1 依赖配置

在pom.xml中添加必要依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.2 WebClient配置

  • 使用WebClient创建HTTP客户端,支持响应式流处理

  • 配置第三方SSE接口地址和必要的请求头(如认证信息)

    WebClient配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient webClient() {return WebClient.builder().baseUrl("https://api.example.com").build();}
}

2.3Service层实现

@Service
public class WebClientSseService {@Autowiredprivate WebClient webClient;public Flux<String> streamEvents() {System.out.println("前置校验。。。。");Flux<String> resFlux = null;try{resFlux = webClient.get().uri("/stream").accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).map(data -> {// 处理原始SSE数据#if (data.startsWith("data:")) {#return data.substring(5).trim();#}return data;});}catch (Exception exception){resFlux = Flux.just("{'status': 'Error', 'message': '"+exception.getMessage()+"'}");}return resFlux;}
}

2.4 Controller

// application-web模块
@RestController
public class DataStreamController {@PostMapping(value = "/stream",consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestBody StreamRequest request) {return dataProcessor.streamEvents(request);}@PostMapping(value = "/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestParam(name = "file", required = false) MultipartFile file, @RequestParam Map<String, Object> jsonObject) {return dataProcessor.streamEvents(file, jsonObject);}
}

解释一下这两个参数:

consumes = MULTIPART_FORM_DATA_VALUE,

produces = TEXT_EVENT_STREAM_VALUE

consumes、produces 两个参数的作用与区别
参数作用示例值
consumes = MULTIPART_FORM_DATA_VALUE声明接口接收的请求内容类型(客户端→服务端)multipart/form-data
produces = TEXT_EVENT_STREAM_VALUE声明接口返回的响应内容类型(服务端→客户端)text/event-stream
为什么需要同时声明?
  1. 输入输出分离原则

    • 输入(consumes):处理文件上传需要 multipart/form-data

    • 输出(produces):SSE流式响应需要 text/event-stream

  2. HTTP协议规范

POST /upload HTTP/1.1
Content-Type: multipart/form-data  ← 对应consumes
Accept: text/event-stream         ← 对应produces
内容类型对照速查表
场景客户端设置服务端声明
文件上传+JSON响应Content-Type: multipart/form-dataconsumes = MULTIPART_FORM_DATA_VALUE
文件上传+SSE流响应Accept: text/event-streamproduces = TEXT_EVENT_STREAM_VALUE
JSON上传+SSE流响应Content-Type: application/jsonconsumes = APPLICATION_JSON_VALUE

根据需要自由选择。

2.5 这里对webclient做个扩展

如果上传的是文件可以用这个方式写body的内容

.contentType(MediaType.MULTIPART_FORM_DATA)

.body(BodyInserters.fromMultipartData(formData))

如果不同的json类型的body请求体可以这么写

.body(BodyInserters.fromValue(res)) 

注意这块的细节,我就是在这里写绕了很多

四、其他方案实现

1. 使用Servlet原生API实现流式输出

@RestController
public class StreamingController {@GetMapping("/stream1")public void stream1(HttpServletResponse response) throws IOException {response.setContentType("text/plain;charset=UTF-8");try (PrintWriter writer = response.getWriter()) {for (int i = 0; i < 100; i++) {writer.write("Data line " + i + "\n");writer.flush(); // 手动刷新缓冲区Thread.sleep(100); // 模拟延迟}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

2. 使用ResponseBodyEmitter实现异步流式输出

@RestController
public class StreamingController {@GetMapping("/stream2")public ResponseBodyEmitter stream2() {ResponseBodyEmitter emitter = new ResponseBodyEmitter();CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {emitter.send("Data line " + i + "\n");Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

3. 使用SseEmitter实现服务器发送事件(SSE)

@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

4. 使用WebFlux实现响应式流式输出

@RestController
@RequestMapping("/reactive")
public class ReactiveStreamingController {@GetMapping("/stream")public Flux<String> streamData() {return Flux.interval(Duration.ofMillis(100)).map(sequence -> "Reactive data " + sequence + "\n").take(100); // 限制输出数量}@GetMapping(value = "/stream-file", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamLargeFile() {return Flux.using(() -> Files.lines(Paths.get("large-file.txt")),Flux::fromStream,Stream::close);}
}

5. 使用Spring MVC的StreamingResponseBody

@RestController
public class StreamingResponseBodyController {@GetMapping("/stream3")public StreamingResponseBody stream3() {return outputStream -> {Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));for (int i = 0; i < 100; i++) {writer.write("Streaming line " + i + "\n");writer.flush();Thread.sleep(100);}};}
}


http://www.hkcw.cn/article/AxlIjceADM.shtml

相关文章

vuex的使用

❀ ❀ ❀ ❀ ❀ ❀ ❀ vuex的官网 ❀ ❀ ❀ ❀ ❀ ❀ ❀ ❀ 这里用法不纯粹。用户toolbar页面切换时的传参。若后期有更好的方式&#xff0c;会更改。因vuex用于全局&#xff0c;在这个场景下使用有点大材小用了 其中需要注意的点就是更新、获取状态 更新状态。updateProjec…

Calendar和Datepicker

Displaystart Displayend "2024-10-8" selectedDate属性 设定选择的日期 在 C# 中&#xff0c;DateTime? date1 表示 **一个可空的 DateTime 类型变量**。 &#xff1f;.是不为零 ?是可以为零0️⃣ 多选 selectionmode none不让选 singlerange shift …

赛事获奖|TsingtaoAI荣获“雄才杯”2025创新创业大赛总决赛奖项

5月16-18日&#xff0c;由雄安新区党工委人才工作领导小组办公室主办的“雄才杯”2025创新创业大赛总决赛在雄安新区成功举办。TsingtaoAI凭借“基于DeepSeek的具身智能实训”项目荣获优胜奖&#xff0c;本项目为参赛项目中唯一的教育科技服务类获奖项目。 大赛背景 本次总决…

初识vue3(vue简介,环境配置,setup语法糖)

一&#xff0c;前言 今天学习vue3 二&#xff0c;vue简介及如何创建vue工程 Vue 3 简介 Vue.js&#xff08;读音 /vjuː/&#xff0c;类似 “view”&#xff09;是一款流行的渐进式 JavaScript 框架&#xff0c;用于构建用户界面。Vue 3 是其第三代主要版本&#xff0c;于 …

C++面向对象(二)

面向对象基础内容参考&#xff1a; C面向对象&#xff08;一&#xff09;-CSDN博客 友元函数 类的友元函数是定义在类外部&#xff0c;但有权访问类的所有私有&#xff08;private&#xff09;成员和保护&#xff08;protected&#xff09;成员。尽管友元函数的原型有在类的定…

基于AIS的海洋观测应用

知识星球&#xff1a;数据书局。打算通过知识星球将这些年积累的知识、经验分享出来&#xff0c;让各位在数据治理、数据分析的路上少走弯路&#xff0c;另外星球也方便动态更新最近的资料&#xff0c;提供各位一起讨论数据的小圈子 1.背景 船舶自动识别系统&#xff08;Aut…

imx6ull(0):烧录、启动

参考内容&#xff1a; i.MX6ULL Applications Processors for Industrial Products i.MX6ULLApplicationsProcessorReferenceManual 正点原子 I.MX6U嵌入式Linux驱动开发指南 以及 广大工程师们在互联网上分享的学习笔记(一样东西学的人多的时候所带来的优势) 例如这里我用…

CloudCompare——使用CSF算法进行点云高程归一化

目录 1.算法原理2.软件操作2.1 CSF算法2.2 生成CSF网格2.3 平滑网格(可选)2.4 计算点云到网格的距离2.5 将计算得到的距离赋值给高程 3.结果展示3.1 原始点云3.2 归一化结果 1.算法原理 点云高程归一化的关键在于获取原始点云地面数据的DEM。可选取CSF算法提取样地点云地面DEM。…

【C语言】C语言经典小游戏:贪吃蛇(下)

文章目录 一、游戏前准备二、游戏开始1、游戏开始函数&#xff08;GameStart&#xff09;1&#xff09;打印欢迎界⾯&#xff08;WelcomeToGame&#xff09;2&#xff09;创建地图&#xff08;CreateMap&#xff09;3&#xff09;初始化蛇⾝&#xff08;InitSnake&#xff09;4…

循序渐进 Android Binder(一):IPC 基本概念和 AIDL 跨进程通信的简单实例

Binder 给人的第一印象是”捆绑者“&#xff0c;即将两个需要建立关系的事物用某些工具束缚在一起。在 Android 中&#xff0c;Binder 是一种高效的跨进程通信&#xff08;IPC&#xff09;机制&#xff0c;它将可以将运行在不同进程中的组件进行绑定&#xff0c;以实现彼此通信…

ISBN书号查询接口如何用PHP实现调用?

一、什么是ISBN书号查询接口 ISBN数据查询接口是一项图书信息查询服务。它基于全球通用的ISBN编码系统&#xff0c;帮助用户快速获取图书的详细信息&#xff0c;包括书名、作者、出版社、出版时间、价格、封面等关键字段。 该接口广泛应用于电商平台、图书馆管理系统、二手书…

Linux(信号)

目录 一 什么是信号 二 Linux中的信号 1. 查看信号&#xff1a;kill -l 2. 自定义信号的处理方式 2.1 API 2.2 demo 3. 理解信号的发送 4. 信号产生的方式 三 信号保存 四 捕捉信号 1. 先来说说硬件中断&#xff1a; 1. 谁调度操作系统&#xff1f; 2. 理解时间片…

[Windows] Simple Live v1.8.3 开源聚合直播 :支持哔哩哔哩 虎牙 斗鱼 抖音

Simple Live 是一款基于 AllLive 项目 开发的开源聚合直播 APP&#xff0c;支持 哔哩哔哩、虎牙、斗鱼、抖音 等主流平台&#xff0c;具备 无广告、低占用、弹幕互动 等核心优势。其核心功能包括&#xff1a;全平台覆盖&#xff1a;一站式聚合多平台直播资源&#xff0c;无需切…

第十天:Java反射

反射 反射就是&#xff1a;加载类&#xff0c;并编写代码获取类中的成员变量&#xff0c;方法&#xff0c;构造器等。 注意&#xff1a;反射&#xff0c;注解&#xff0c;动态代理就是用来学习框架做框架的&#xff0c;在平时业务开发需求上很少用到。 1 反射学什么&#xf…

整数有约 | 刘乾专访:继续预训练策略与数据优化之道

人工智能多语言处理近年来得到了极大的关注&#xff0c;尤其是在以东南亚为代表的小语种环境中&#xff0c;其特殊的语言多样性和语料库稀缺性使得研究挑战和机遇并存。在现有的自然语言处理模型中&#xff0c;英语和中文因为有海量高质量数据的支持&#xff0c;常被作为核心语…

Google 发布的全新导航库:Jetpack Navigation 3

前言 多年来&#xff0c;Jetpack Navigation 库一直是开发者的重要工具&#xff0c;但随着 Android 用户界面领域的发展&#xff0c;特别是大屏设备的出现和 Jetpack Compose 的兴起&#xff0c;Navigation 的功能也需要与时俱进。 今年的 Google I/O 上重点介绍了 Jetpack Na…

抖音商城抓包 分析

声明 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01; 抓包展示 总结 1.出于安全考虑,本章未…

uniapp-商城-77-shop(8.2-商品列表,地址信息添加,级联选择器picker)

地址信息,在我们支付订单上有这样一个接口,就是物流方式,一个自提,我们就显示商家地址。一个是外送,就是用户自己填写的地址。 这里先说说用户的地址添加。需要使用到的一些方式方法,主要有关于地址选择器,就是uni-data-picker级联选择。 该文介绍了电商应用中地址信息处…

AlmaLinux OS 10 正式发布:兼容 RHEL 10 带来多项技术革新

AlmaLinux OS 基金会日前宣布推出 AlmaLinux OS 10&#xff0c;该版本代号代号紫色的狮子 (Purple Lion)&#xff0c;新版本带来多项新功能和技术更新&#xff0c;旨在为用户提供更强大的企业级 Linux 体验。 该系统使用与 RHEL 10 相同的源代码构建并于 RHEL 10 保持完全兼…

深入理解C# MVVM模式:从理论到实践

在现代软件开发中&#xff0c;良好的架构设计对于构建可维护、可测试和可扩展的应用程序至关重要。Model-View-ViewModel (MVVM) 是一种特别适合XAML-based应用程序&#xff08;如WPF、Xamarin和UWP&#xff09;的架构模式。本文将全面探讨MVVM模式的概念、实现细节、最佳实践以…