springboot -sse -flux 服务器推送消息的方法
先说BUG处理,遇到提示异步问题 Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "<async-supported>true</async-supported>" to servlet and filter declarations in web.xml.
springboot在@WebFilter注解处,加入urlPatterns = { "/*" },asyncSupported = true
springmvc在web.xml处理
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0"> <filter-mapping> <filter-name>shiroFilter</filter-name> <url-pattern>/*</url-pattern> <dispatcher>REQUEST</dispatcher> <dispatcher>ASYNC</dispatcher> </filter-mapping>
demo1,服务器间隔一定时间推送内容
接口方法
@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE ) public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) { // 每两秒推送一次 return Flux.interval(Duration.ofSeconds(2)).map(seq-> Tuples.of(seq, LocalDateTime.now())).log()//序号和时间 .map(data-> ServerSentEvent.<String>builder().id(userId).data(data.getT1().toString()).build());//推送内容 }
2.前端代码
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="UTF-8"/> <title>服务器推送事件</title> </head> <body> <div> <div id="data"></div> <div id="result"></div><br/> </div> <script th:inline="javascript" > //服务器推送事件 if (typeof (EventSource) !== "undefined") { var source1 = new EventSource("http://localhost:9000/api/admin/test/sse/1"); //当抓取到消息时 source1.onmessage = function (evt) { document.getElementById("data").innerHTML = document.getElementById("data").innerHTML+"股票行情:" + evt.data; }; } else { //注意:ie浏览器不支持 document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件..."; var xhr; var xhr2; if (window.XMLHttpRequest){ //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法 xhr=new XMLHttpRequest(); xhr2=new XMLHttpRequest(); }else{ //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替 xhr=new ActiveXObject("Microsoft.XMLHTTP"); xhr2=new ActiveXObject("Microsoft.XMLHTTP"); } console.log(xhr); console.log(xhr2); xhr.open('GET', '/sse/countDown'); xhr.send(null);//发送请求 xhr.onreadystatechange = function() { console.log("s响应状态:" + xhr.readyState); //2是空响应,3是响应一部分,4是响应完成 if (xhr.readyState > 2) { //这儿可以使用response(对应json)与responseText(对应text) var newData = xhr.response.substr(xhr.seenBytes); newData = newData.replace(/\n/g, "#"); newData = newData.substring(0, newData.length - 1); var data = newData.split("#"); console.log("获取到的数据:" + data); document.getElementById("result").innerHTML = data; //长度重新赋值,下次截取时需要使用 xhr.seenBytes = xhr.response.length; } } xhr2.open('GET', '/sse/retrieve'); xhr2.send(null);//发送请求 xhr2.onreadystatechange = function() { console.log("s响应状态:" + xhr2.readyState); //0: 请求未初始化,2 请求已接收,3 请求处理中,4 请求已完成,且响应已就绪 if (xhr2.readyState > 2) { //这儿可以使用response(对应json)与responseText(对应text) var newData1 = xhr2.response.substr(xhr2.seenBytes); newData1 = newData1.replace(/\n/g, "#"); newData1 = newData1.substring(0, newData1.length - 1); var data1 = newData1.split("#"); console.log("获取到的数据:" + data1); document.getElementById("data").innerHTML = data1; //长度重新赋值,下次截取时需要使用 xhr2.seenBytes = xhr2.response.length; } } } </script> </body> </html>
demo2 订阅服务器消息,服务器send推送消息完成后,关闭sse.close
1.接口方法以及工具类
@GetMapping(path = "/sse/sub",produces = MediaType.TEXT_EVENT_STREAM_VALUE ) public SseEmitter subscribe(@RequestParam String questionId,HttpServletResponse response) { // 简单异步发消息 ==== //questionId 订阅id,id对应了sse对象 new Thread(() -> { try { Thread.sleep(1000); for (int i = 0; i < 10; i++) { Thread.sleep(500); SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i); } } catch (Exception e) { e.printStackTrace(); } finally { // 消息发送完关闭订阅 SSEUtils.closeSub(questionId); } }).start(); // ================= return SSEUtils.addSub(questionId); }
工具类
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class SSEUtils { // timeout private static Long DEFAULT_TIME_OUT = 2*60*1000L; // 订阅表 private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>(); /** 添加订阅 */ public static SseEmitter addSub(String questionId) { if (null == questionId || "".equals(questionId)) { return null; } SseEmitter emitter = subscribeMap.get(questionId); if (null == emitter) { emitter = new SseEmitter(DEFAULT_TIME_OUT); subscribeMap.put(questionId, emitter); } return emitter; } /** 发消息 */ public static void pubMsg(String questionId, String msg) { SseEmitter emitter = subscribeMap.get(questionId); if (null != emitter) { try { // 更规范的消息结构看源码 emitter.send(SseEmitter.event().data(msg)); } catch (Exception e) { // e.printStackTrace(); } } } /** * 关闭订阅 * @param questionId */ public static void closeSub(String questionId) { SseEmitter emitter = subscribeMap.get(questionId); if (null != emitter) { try { emitter.complete(); subscribeMap.remove(questionId); } catch (Exception e) { e.printStackTrace(); } } } }
2.前端代码
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>sse</title> </head> <body> <div> <label>问题id</label> <input type="text" id="questionId"> <button onclick="subscribe()">订阅</button> <hr> <label>F12-console控制台查看消息</label> </div> <script> function subscribe() { let questionId = document.getElementById('questionId').value; let url = 'http://localhost:9000/api/admin/test/sse/sub?questionId=' + questionId; let eventSource = new EventSource(url); eventSource.onmessage = function (e) { console.log(e.data); }; eventSource.onopen = function (e) { console.log(e,1); // todo }; eventSource.onerror = function (e) { // todo console.log(e,2); eventSource.close() }; } </script> </body> </html>
到此这篇关于springboot -sse -flux 服务器推送消息的文章就介绍到这了,更多相关springboot sse flux 服务器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例
这篇文章主要介绍了Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例,读写分离,基本的原理是让主数据库处理事务性增、改、删操作,而从数据库处理SELECT查询操作,数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库,需要的朋友可以参考下2023-10-10springboot @Value实现获取计算机中绝对路径文件的内容
这篇文章主要介绍了springboot @Value实现获取计算机中绝对路径文件的内容,具有很好的参考价值,希望对大家有所帮助。2021-09-09SpringData JPA快速上手之关联查询及JPQL语句书写详解
JPA都有SpringBoot的官方直接提供的starter,而Mybatis没有,直到SpringBoot 3才开始加入到官方模版中,这篇文章主要介绍了SpringData JPA快速上手,关联查询,JPQL语句书写的相关知识,感兴趣的朋友一起看看吧2023-09-09
最新评论