Spring boot 集成 MQTT详情

 更新时间:2022年04月21日 08:51:29   作者:剑圣无痕   
这篇文章主要介绍了Spring boot 集成 MQTT详情,MQTT是一种基于发布/订阅模式的"轻量级"通讯协议,可以以极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服,下文更多相关介绍,需要的小伙伴可以参考一下

一、简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,可以以极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务。目前在物联网、小型设备、移动应用等方面有较广泛的应用。

二、主要特性

  • (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
  • (2)对负载内容屏蔽的消息传输。
  • (3)使用TCP/IP提供网络连接。
  • (4)有三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

  • (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。
  • (6)使用Last Will和Testament特性通知有关各方客户端异常中断的机制。

Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。

Testament:遗嘱机制,功能类似于Last Will。

三、集成步骤

1.引入相关jar包

  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>  

2.核心配置类

@Configuration
public class MqttConfig
{
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * 连接器
     * @return
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions()
    {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间,默认30秒
        mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeOut());
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
        mqttConnectOptions.setAutomaticReconnect(true);
        // 设置连接的用户名
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        // 设置连接的密码
        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        //服务器地址
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }
    /***
     * MQTT客户端
     * @return
     */
    @Bean("mqttClientFactory")
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }
   
    /*-----------------     消息生产者的配置       ---------------------*/
    /**
     * MQTT生产端发布处理器
     *
     * @return {@link org.springframework.messaging.MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory());
        messageHandler.setAsync(true);
        return messageHandler;
    }
    /**
     *  MQTT生产端发布通道
     * @return
     */
    @Bean("mqttOutboundChannel")
    public MessageChannel mqttOutboundChannel() 
    {
        return new DirectChannel();
    }
    /*-----------------   消息消费者的配置       ---------------------*/
    
    /**
     * MQTT消费端订阅通道
     *
     * @return {@link org.springframework.messaging.MessageChannel}
     */
    @Bean(name = "mqttInboundChannel")
    public MessageChannel mqttInboundChannel() {
      return new DirectChannel();
    }

    /**
     * MQTT消费端连接配置
     *
     * @param channel {@link org.springframework.messaging.MessageChannel}
     * @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
     * @return {@link org.springframework.integration.core.MessageProducer}
     */
    @Bean
    public MessageProducer inbound(
        @Qualifier("mqttInboundChannel") MessageChannel channel,
        @Qualifier("mqttClientFactory") MqttPahoClientFactory factory) {
      MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getConsumerClientId(), factory, "test");
      adapter.setCompletionTimeout(30000);
      adapter.setConverter(new DefaultPahoMessageConverter());
      // 0 至多一次,数据可能丢失
      // 1 至少一次,数据可能重复
      // 2 只有一次,且仅有一次,最耗性能
      adapter.setQos(1);
      // 设置订阅通道
      adapter.setOutputChannel(channel);
      return adapter;
    }
}
@ConfigurationProperties("mqtt")
@Component
public class MqttProperties implements Serializable
{
    private static final long serialVersionUID = -1425980007744001158L;
    private String url;
    private String username;
    private String password;
    private int keepAlive;
    private int connectionTimeOut;
    private String producerClientId;
    private String producerQos;
    private String consumerClientId;
    private String consumerQos;
    private String consumerTopic;
    private int completionTimeout;
    private String defaultTopic;
    //get、set方法省略
  }

3.网关配置

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway
{
   void sendToMqtt(byte[] data,@Header(MqttHeaders.TOPIC) String topic);
}

4.编写测试类

  @Autowired
  private MqttGateway mqttGateway;
  @RequestMapping("/sendTest")
  public String sendMqttTest(String msg)
  {
      mqttGateway.send("test",msg);
      return "OK";
  }

5.yml配置信息

mqtt:
  url: tcp://localhost:1883
  username: test
  password: test1234
  keep-alive: 30
  connection-timeout: 3000
  producerClientId:  test-producer
  producerQos: 1
  consumerClientId: test-consumer
  consumerQos: 1
  deafultTopic : test

到此这篇关于Spring boot 集成 MQTT详情的文章就介绍到这了,更多相关Spring boot 集成 MQTT内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • web项目WEB-INF下没有web.xml的解决方法

    web项目WEB-INF下没有web.xml的解决方法

    新手如果在web项目创建后WEB-INF下面没有出现web.xml,怎么办?别慌,没有web.xml文件的原因是因为在创建web项目的时候没有把创建web.xml勾上。这篇文章主要介绍了web项目WEB-INF下没有web.xml的解决方法,需要的朋友可以参考下
    2022-12-12
  • spring-boot报错java: 程序包javax.servlet.http不存在

    spring-boot报错java: 程序包javax.servlet.http不存在

    当springboot项目从2.7.x的升级到3.0.x的时候,会遇到一个问题java: 程序包javax.servlet.http不存在,下面就来具体介绍一下,感兴趣的可以了解一下
    2024-08-08
  • 以银行取钱为例模拟Java多线程同步问题完整代码

    以银行取钱为例模拟Java多线程同步问题完整代码

    这篇文章主要介绍了以银行取钱为例模拟Java多线程同步问题完整代码,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • 关于Java中你所不知道的Integer详解

    关于Java中你所不知道的Integer详解

    这篇文章主要给大家介绍了关于Java中你所不知道的一些关于Integer的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2017-12-12
  • MyBatis3用log4j在控制台输出SQL的方法示例

    MyBatis3用log4j在控制台输出SQL的方法示例

    本篇文章主要介绍了MyBatis3用log4j在控制台输出SQL的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-01-01
  • Java数据结构篇之实现二叉搜索树的核心方法

    Java数据结构篇之实现二叉搜索树的核心方法

    二叉搜索树是一种常用的数据结构,它是一棵二叉树,且每个节点的值都大于其左子树中任何节点的值,而小于其右子树中任何节点的值,这篇文章主要给大家介绍了关于Java数据结构篇之实现二叉搜索树的核心方法,需要的朋友可以参考下
    2023-12-12
  • java实现图片的上传与展示实例代码

    java实现图片的上传与展示实例代码

    这篇文章主要给大家介绍了关于java实现图片的上传与展示的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-12-12
  • Java动态数组ArrayList实现动态原理

    Java动态数组ArrayList实现动态原理

    ArrayList是一种动态数组,它可以在运行时自动调整大小以适应元素的添加和删除,在Java中,你可以使用ArrayList类来实现动态数组,本文将给大家介绍一下ArrayList动态数组,是怎么实现动态的
    2023-08-08
  • java实现操作系统的短进程作业调度示例分享

    java实现操作系统的短进程作业调度示例分享

    java编写的实现了操作系统中的短作业进程,可以实现几道作业同时作业调度
    2014-02-02
  • 一文带你快速学会JDBC及获取连接的五种方式

    一文带你快速学会JDBC及获取连接的五种方式

    JDBC(Java Database Connectivity)是一个独立于特定数据库管理系统、通用的SQL数据库存取和操作的公共接口,下面这篇文章主要给大家介绍了关于如何通过一文带你快速学会JDBC及获取连接的五种方式,需要的朋友可以参考下
    2022-09-09

最新评论