springboot整合mqtt客户端示例分享
更新时间:2023年07月29日 09:27:29 作者:__FY
这篇文章主要介绍了springboot整合mqtt客户端示例分享的相关资料,需要的朋友可以参考下
用到的工具:
EMQX , mqttx , idea
工具使用都很简单,自己看看就能会。
订阅端config代码:
package com.example.demo.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * @Author: xct * @Date: 2021/7/30 17:06 * @Description: */ @Configuration public class MqttConsumerConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 * @author xct * @param * @return void * @date 2021/7/30 16:48 */ @PostConstruct public void init(){ connect(); } /** * 客户端连接服务端 * @author xct * @param * @return void * @date 2021/7/30 16:01 */ public void connect(){ try { //创建MQTT客户端对象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接到服务端都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false); //设置回调 client.setCallback(new MqttConsumerCallBack()); client.connect(options); //订阅主题 //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 int[] qos = {1,1}; //主题 String[] topics = {"topic1","topic2"}; //订阅主题 client.subscribe(topics,qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 断开连接 * @author xct * @param * @return void * @date 2021/8/2 09:30 */ public void disConnect(){ try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅主题 * @author xct * @param topic * @param qos * @return void * @date 2021/7/30 17:12 */ public void subscribe(String topic,int qos){ try { client.subscribe(topic,qos); } catch (MqttException e) { e.printStackTrace(); } } }
订阅端回调代码:
package com.example.demo.config; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * @Author: xct * @Date: 2021/7/30 17:06 * @Description: */ public class MqttConsumerCallBack implements MqttCallback { /** * 客户端断开连接的回调 * @author xct * @param throwable * @return void * @date 2021/7/30 17:14 */ @Override public void connectionLost(Throwable throwable) { System.out.println("与服务器断开连接,可重连"); } /** * 消息到达的回调 * @author xct * @param topic * @param message * @return void * @date 2021/7/30 17:14 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主题 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); //TODO 可以将消息持久化到数据库中,然后在进行其他操作。 } /** * 消息发布成功的回调 * @author xct * @param iMqttDeliveryToken * @return void * @date 2021/7/30 17:14 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
测试控制器:
package com.example.demo.controller; import com.example.demo.config.MqttConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; /** * @Author: xct * @Date: 2021/7/30 17:20 * @Description: */ @Controller public class TestController { @Autowired private MqttConsumerConfig client; @Value("${spring.mqtt.client.id}") private String clientId; @RequestMapping("connect") @ResponseBody public String connect(){ client.connect(); return clientId + "连接到服务器"; } @RequestMapping("disConnect") @ResponseBody public String disConnect(){ client.disConnect(); return clientId + "与服务器断开连接"; } }
配置文件:
spring: application: name: consumer #MQTT配置信息 mqtt: #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883 url: tcp://0.0.0.0:1883 #用户名 username: admin #密码 password: public #客户端id(不能重复) client: id: consumer-id #MQTT默认的消息推送主题,实际可在调用接口时指定 default: topic: topic server: port: 8082
启动订阅端代码,将订阅端和mqttx都连接到EMQX
确认主题是否正确 发送即可。
到此这篇关于springboot整合mqtt客户端示例分享的文章就介绍到这了,更多相关springboot整合mqtt内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
SpringBoot项目整合Log4j2实现自定义日志打印失效问题解决
这篇文章主要介绍了SpringBoot项目整合Log4j2实现自定义日志打印失效问题解决,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2024-01-01Java并发容器之ConcurrentLinkedQueue详解
这篇文章主要介绍了Java并发容器之ConcurrentLinkedQueue详解,加锁队列的实现较为简单,这里就略过,我们来重点来解读一下非阻塞队列,2023-12-12
从点到面, 下面我们来看下非阻塞队列经典实现类ConcurrentLinkedQueue,需要的朋友可以参考下解析Java的Spring框架的BeanPostProcessor发布处理器
这篇文章主要介绍了Java的Spring框架的BeanPostProcessor发布处理器,Spring是Java的SSH三大web开发框架之一,需要的朋友可以参考下2015-12-12
最新评论