spring中websocket定时任务实现实时推送

 更新时间:2023年01月06日 09:06:22   作者:C~LOVER  
本文主要介绍了spring中websocket定时任务实现实时推送,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。

TaskScheduler定时任务实现

TaskScheduler接口提供了多种调度方法来实现运行任务的执行。

public interface TaskScheduler {
 
 	//通过触发器来决定task是否执行
    ScheduledFuture schedule(Runnable task, Trigger trigger); 
 
 	//在starttime的时候执行一次
    ScheduledFuture schedule(Runnable task, Date startTime);  
    ScheduledFuture schedule(Runnable task, Instant startTime); 
 
 	//从starttime开始每个period时间段执行一次task
    ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); 
    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); 
 
 	//每隔period执行一次
    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);  
    ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);  
 
 	//从startTime开始每隔delay长时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); 
 
 	//每隔delay时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); 
}

简单测试一下

import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;

/**
 * The type Task scheduler test.
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:45:17
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {

    private final TaskScheduler taskScheduler;

    @Bean
    public void test() {
        //每隔3秒执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //每隔1秒执行一次
        //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
        taskScheduler.schedule(new MyThread(), trigger);
    }

    private class MyThread implements Runnable {
        @Override
        public void run() {
            log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
        }
    }

}

效果就是每个3秒执行一次

在这里插入图片描述

websocket+定时任务实时推送

实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现

TestWebsocket.java

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;

/**
 * 测试websocket
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 14:55:29
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {

    protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();

    /**
     * 定时任务集合
     */
    Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();

    /**
     * taskScheduler
     */
    private final TaskScheduler taskScheduler;

    /**
     * 建立连接后操作
     *
     * @param session 连接session信息
     * @throws Exception exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sendMessage("连接成功~~~~~~,sessionId=" + session.getId());
        WEB_SOCKET_SESSIONS.add(session);
        //设置定时任务,每隔3s执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //开启一个定时任务
        ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
        //根据session连接id定时任务线程存到map中
        stringScheduledFutureMap.put(session.getId(), schedule);
    }

    private class CustomizeTask implements Runnable {
        private final String sessionId;

        CustomizeTask(String sessionId) {
            this.sessionId = sessionId;
        }

        @Override
        public void run() {
            try {
                String message = CharSequenceUtil.format("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
                sendMessage(JSONUtil.toJsonStr(message), sessionId);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 接收到消息后的处理
     *
     * @param session 连接session信息
     * @param message 信息
     * @throws Exception exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】");
    }

    /**
     * ws连接出错时调用
     *
     * @param session   session连接信息
     * @param exception exception
     * @throws Exception exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
    }

    /**
     * 连接关闭后调用
     *
     * @param session     session连接信息
     * @param closeStatus 关闭状态
     * @throws Exception exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
        String sessionId = session.getId();
        ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
        if (scheduledFuture != null) {
            //暂停对应session的开启的定时任务
            scheduledFuture.cancel(true);
            //集合移除
            stringScheduledFutureMap.remove(sessionId);
        }
    }

    /**
     * 是否支持分片消息
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 群发发送消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                webSocketSession.sendMessage(new TextMessage(message));
            }
        }
    }

    /**
     * 发给指定连接消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message, String sessionId) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                if (sessionId.equals(webSocketSession.getId())) {
                    webSocketSession.sendMessage(new TextMessage(message));
                }
            }
        }
    }
}

websocket绑定URL

import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import javax.annotation.Resource;

/**
 * websocket配置
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:10:11
 */
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

    @Resource
    private TestWebsocket testWebsocket;

    /**
     * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
     *
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
    }
}

websocket与定时任务同时存在时,需要加入配置定义线程池进行线程的管理

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
 * 当定时任务和websocket同时存在时报错解决
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -04-28 17:35:54
 */
@Configuration
public class ScheduledConfig {

    /**
     * Schedule本身是单线程执行的
     *
     * @return the task scheduler
     */
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
        scheduling.setPoolSize(20);
        return scheduling;
    }
}

效果如下
连接上以后服务每隔3秒会向客户端实时推送消息

在这里插入图片描述

 到此这篇关于spring中websocket定时任务实现实时推送的文章就介绍到这了,更多相关spring websocket实时推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot接口参数的默认值与必要性最佳实践记录

    SpringBoot接口参数的默认值与必要性最佳实践记录

    这篇文章主要介绍了SpringBoot接口参数的默认值与必要性,通过合理设置接口参数的默认值和必要性,我们可以创建出既健壮又灵活的 RESTful API,需要的朋友可以参考下
    2024-08-08
  • 详解SpringBoot如何优雅的进行全局异常处理

    详解SpringBoot如何优雅的进行全局异常处理

    在SpringBoot的开发中,为了提高程序运行的鲁棒性,我们经常需要对各种程序异常进行处理,但是如果在每个出异常的地方进行单独处理的话,这会引入大量业务不相关的异常处理代码,这篇文章带大家了解一下如何优雅的进行全局异常处理
    2023-07-07
  • Java泛型类型擦除

    Java泛型类型擦除

    这篇文章主要为大家详细介绍了Java泛型类型擦除,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • Spring cloud负载均衡@LoadBalanced & LoadBalancerClient

    Spring cloud负载均衡@LoadBalanced & LoadBalancerClient

    由于Spring cloud2020之后移除了Ribbon,直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件,我们讨论Spring负载均衡以Spring Cloud2020之后版本为主,学习Spring Cloud LoadBalance
    2023-11-11
  • 使用MUI框架构建App请求http接口实例代码

    使用MUI框架构建App请求http接口实例代码

    下面小编就为大家分享一篇使用MUI框架构建App请求http接口实例代码,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-01-01
  • 为什么Java要把字符串设计成不可变的

    为什么Java要把字符串设计成不可变的

    为什么Java要把字符串设计成不可变的,这篇文章给出了Java字符串设计成不可变的原因,感兴趣的小伙伴们可以参考一下
    2017-06-06
  • 浅谈Java线程间通信之wait/notify

    浅谈Java线程间通信之wait/notify

    下面小编就为大家带来一篇浅谈Java线程间通信之wait/notify。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • 使用Java编写一个图片word互转工具

    使用Java编写一个图片word互转工具

    这篇文章主要介绍了使用Java编写一个PDF Word文件转换工具的相关资料,需要的朋友可以参考下
    2023-01-01
  • BeanUtils.copyProperties复制属性失败的原因及解决方案

    BeanUtils.copyProperties复制属性失败的原因及解决方案

    这篇文章主要介绍了BeanUtils.copyProperties复制属性失败的原因及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Java简易登录注册小程序

    Java简易登录注册小程序

    这篇文章主要为大家详细介绍了Java图形界面开发,简易登录注册小程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-10-10

最新评论