SpringBoot 集成MQTT实现消息订阅的详细代码

 更新时间:2024年11月28日 14:35:49   作者:不甘平凡--liang  
本文介绍了如何在SpringBoot中集成MQTT并实现消息订阅,主要步骤包括添加依赖、配置文件设置、启动类注解、MQTT配置类、消息处理器配置、主题缓存、动态数据库主题配置以及消息处理服务,感兴趣的朋友跟随小编一起看看吧

1、引入依赖

  <!--MQTT start-->
 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
  </dependency>
  <dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
     <version>5.4.4</version>
 </dependency>
 <!--MQTT end-->
 <dependency>
     <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
      <optional>true</optional>
  </dependency>

2、增加yml配置

  spring:
    mqtt:
      username: test
      password: test
      url: tcp://127.0.0.1:8080
      subClientId: singo_sub_client_id_888 #订阅 客户端id
      pubClientId: singo_pub_client_id_888 #发布 客户端id
      connectionTimeout: 30
      keepAlive: 60

3、资源配置类

@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {
    private String username;
    private String password;
    private String url;
    private String subClientId;
    private String pubClientId;
    private int connectionTimeout;
    private int keepAlive;
}

注意启动类需要增加注解

@EnableConfigurationProperties(MqttConfigurationProperties.class)

4、MQTT配置类

@Configuration
public class MqttConfig {
    @Autowired
    private MqttConfigurationProperties mqttConfigurationProperties;
    /**
     * 连接参数
     *
     * @return
     */
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttConfigurationProperties.getUsername());
        options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());
        options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});
        options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive());
        options.setCleanSession(true); // 设置为false以便断线重连后恢复会话
        options.setAutomaticReconnect(true);
        return options;
    }
    /**
     * 连接工厂
     *
     * @param options
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        return factory;
    }
    /**
     * 消息输入通道
     * 每次只有一个消息处理器可以消费消息。
     * 当前消息的处理完成之前,新消息需要排队等待,无法并行处理。
     * 默认是:单线程、顺序执行的
     * @return
     */
    // @Bean
    // public DirectChannel mqttInputChannel() {
    //     return new DirectChannel();
    // }
    /**
     * 支持多线程并发处理消息的输入通道
     *
     * @return
     */
    @Bean
    public ExecutorChannel mqttInputChannel() {
        return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 线程池大小可以调整
    }
    /**
     * 配置入站适配器
     *
     * @param mqttClientFactory
     * @return
     */
    @Bean
    public MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory);
        // adapter.addTopic("pub/300119110099"); 订阅主题,也可以放在初始化动态配置
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    /**
     * 配置消息处理器
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道
    public MessageHandler messageHandler() {
        return new MqttReceiverMessageHandler();
    }
}

5、消息处理器配置

@Slf4j
@Component
public class MqttReceiverMessageHandler implements MessageHandler {
    @Autowired
    private MqttMessageProcessingService mqttMessageProcessingService;
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        MessageHeaders headers = message.getHeaders();
        log.info("线程名称:{},收到消息,主题:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());
        // log.info("收到消息主题:{}", headers.get("mqtt_receivedTopic").toString());
        // log.info("收到消息:{}", message.getPayload());
        // 消息保存到内存队列里面,定时批量入库,也可以在这里直接入库
        mqttMessageProcessingService.addMessage(message.getPayload().toString());
    }
}

6、消息主题缓存对象

@Component
public class MqttTopicStore {
    private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>();
    public ConcurrentHashMap<String, String> getTopics() {
        return topics;
    }
}

7、动态订阅数据库主题配置

@Slf4j
@Component
public class MqttInit {
    @Autowired
    private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;
    @Autowired
    private MqttTopicStore mqttTopicStore;
    @PostConstruct
    public void init() {
        subscribeAllTopics();
    }
    public void subscribeAllTopics() {
        // List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled();
        // for (MqttTopicConfig topic : topics) {
        //     subscribeTopic(topic);
        // }
        log.info("===================>从数据库里获取并初始化订阅所有主题");
        List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100");
        topics.stream().forEach(t -> {
            messageDrivenChannelAdapter.addTopic(t);
            // 同时往MqttTopicStore.topics中增加一条记录用于缓存
        });
    }
}

8、消息处理服务

@Service
public class MqttMessageProcessingService {
    @Autowired
    private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;
    @Autowired
    private MqttTopicStore mqttTopicStore;
    // 内存队列,用于暂存消息
    private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
    // 添加消息到队列
    public void addMessage(String message) {
        messageQueue.add(message);
    }
    /**
     * 可以放到定时任务里面去,注入后取队列方便维护
     * 定时任务,每5秒执行一次 ,建议2分钟一次 理想的触发间隔应略小于数据到达间隔,以确保及时处理和插入
     * 如果每 5 分钟收到一条数据,可以设置任务执行周期为4 分钟或更短,以便任务有足够的时间处理数据,同时减少积压的可能性。
     */
    @Scheduled(fixedRate = 1 * 60 * 1000)
    public void batchInsertToDatabase() {
        System.out.println("定时任务执行中,当前队列大小:" + messageQueue.size());
        List<String> batch = new ArrayList<>();
        messageQueue.drainTo(batch, 500); // 一次性取最多500条消息
        if (!batch.isEmpty()) {
            // 批量插入数据库
            saveMessagesToDatabase(batch);
        }
    }
    private void saveMessagesToDatabase(List<String> messages) {
        // 假设这是批量插入逻辑
        System.out.println("批量插入数据库,条数:" + messages.size());
        for (String message : messages) {
            System.out.println("插入消息:" + message);
        }
        // 实际数据库操作代码
    }
    /**
     * 订阅与取消订阅定时任务
     */
    public void subscribeAndUnsubscribeTask() {
        // 从数据库获取所有主题,正常状态、删除状态
        // 正常状态:判断mqttTopicStore.topics中是否存在,不存在则订阅,并在mqttTopicStore.topics中增加
        // 删除状态: 判断mqttTopicStore.topics中是否存在,存在则取消订阅,并在mqttTopicStore.topics中删除
        // messageDrivenChannelAdapter.addTopic(t);
    }
}

以上是简单的对接步骤,部分类、方法可以根据实际情况进行合并处理!!!!

9、定时任务

@Slf4j
@Configuration
@EnableScheduling
public class MqttJob {
    @Value("${schedule.enable}")
    private boolean enable;
    @Autowired
    private MqttMessageProcessingService mqttMessageProcessingService;
    /**
     * 定时订阅与取消订阅主题,从共享主题对象MqttTopicStore里面取出主题列表,然后进行订阅或取消订阅
     * 每分钟一次
     */
    public void subscribeAndUnsubscribe() {
        if (!enable) return;
        mqttMessageProcessingService.subscribeAndUnsubscribeTask();
    }
    /**
     * 定时处理队列里面的订阅消息,会有丢失风险,宕机时会丢失队列里面的消息
     * 每分钟一次 要考虑一次消息处理的时间;也可先不使用队列,每次收到消息直接实时入库,有性能问题时在启用
     */
    public void batchSaveSubscribeMessage() {
    }
}

到此这篇关于SpringBoot 集成MQTT实现消息订阅的文章就介绍到这了,更多相关SpringBoot MQTT消息订阅内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Boot管理用户数据的操作步骤

    Spring Boot管理用户数据的操作步骤

    SpringBoot结合Thymeleaf模板引擎,可以快速搭建Web应用,介绍了使用SpringBoot处理JSON数据的基本过程,包括创建实体类、视图页面和控制器,通过这些步骤,即可完成基于SpringBoot和Thymeleaf的简单Web开发,感兴趣的朋友跟随小编一起看看吧
    2024-09-09
  • 深入理解Java序列化与反序列化

    深入理解Java序列化与反序列化

    今天教大家深入理解Java的序列化与反序列化,文中介绍的非常详细,有很多代码示例,对正在学习Java的小伙伴们很有帮助,需要的朋友可以参考下
    2021-05-05
  • Java操作数据库(行级锁,for update)

    Java操作数据库(行级锁,for update)

    这篇文章主要介绍了Java操作数据库(行级锁,for update),文章围绕Java操作数据库的相关资料展开详细内容,需要的小伙伴可以参考一下,希望对你有所帮助
    2021-12-12
  • DynamicDataSource怎样解决多数据源的事务问题

    DynamicDataSource怎样解决多数据源的事务问题

    这篇文章主要介绍了DynamicDataSource怎样解决多数据源的事务问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • java将String字符串转换为List<Long>类型实例方法

    java将String字符串转换为List<Long>类型实例方法

    在本篇文章里小编给大家整理的是关于java将String字符串转换为List<Long>类型实例方法,需要的朋友们可以参考下。
    2020-03-03
  • 使用Java操作TensorFlow的方法

    使用Java操作TensorFlow的方法

    TensorFlow是一个功能强大且广泛使用的框架,它不断得到改进,并最近被引入新语言包括Java和JavaScript,这篇文章主要介绍了如何使用Java操作TensorFlow,需要的朋友可以参考下
    2023-05-05
  • java调用微信现金红包接口的心得与体会总结

    java调用微信现金红包接口的心得与体会总结

    这篇文章主要介绍了java调用微信现金红包接口的心得与体会总结,有需要的朋友可以了解一下。
    2016-11-11
  • Java遍历集合的三种方式

    Java遍历集合的三种方式

    本文主要对于遍历集合获取其对象,总结的三种简单的方式进行介绍,文章中举了两个案例进行对比,具有很好的参考价值,下面就跟小编一起来看下吧
    2016-12-12
  • SpringBoot disruptor高性能队列使用

    SpringBoot disruptor高性能队列使用

    这篇文章主要介绍了SpringBoot disruptor高性能队列使用,Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题
    2023-02-02
  • IDEA断点调试,断点不起作用的解决

    IDEA断点调试,断点不起作用的解决

    这篇文章主要介绍了IDEA断点调试,断点不起作用的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03

最新评论