spring boot项目中集成rocketmq详细步骤

 更新时间:2023年09月13日 09:07:39   作者:orton777  
这篇文章主要给大家介绍了关于spring boot项目中集成rocketmq的相关资料,springboot集成rocketmq的方法非常简单,文中直接上代码,需要的朋友可以参考下

集成Spring Boot和RocketMQ

在现代的微服务架构中,消息队列已经成为一种常见的异步处理模式,它能解决服务间的同步调用、耦合度高、流量高峰等问题。RocketMQ是阿里巴巴开源的一款消息中间件,性能优秀,功能齐全,被广泛应用在各种业务场景。

本文将详细介绍如何在Spring Boot项目中集成RocketMQ,实现消息的生产和消费。

开发环境

  • JDK 1.8 或更高
  • RocketMQ 4.8.0 或更高
  • Spring Boot 2.3.1.RELEASE 或更高
  • Maven 3.0 或更高

RocketMQ服务器部署

首先,我们需要在本地或服务器上部署RocketMQ。具体的部署步骤可以参考RocketMQ官方文档。为了简化部署,我们可以使用Docker进行部署。

Spring Boot项目创建

我们使用Spring Initializr创建一个新的Spring Boot项目,选择Web、Lombok和RocketMQ Spring Boot Starter为项目依赖。

pom.xml示例:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

配置RocketMQ

application.properties文件中配置RocketMQ的服务器地址和其他相关参数。

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

在这里,rocketmq.name-server是RocketMQ服务器的地址,rocketmq.producer.group是生产者的组名。

消息生产者

接下来,我们创建一个消息生产者。在Spring Boot项目中,我们可以使用RocketMQTemplate来发送消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("/send")
    public String send(String message) {
        rocketMQTemplate.convertAndSend("test-topic", message);
        return "Message: '" + message + "' sent.";
    }
}

上述代码中,我们创建了一个RESTful接口/send,当接口被调用时,它将发送一个消息到test-topic主题。

消息消费者

接下来,我们创建一个消息消费者。在Spring Boot项目中,我们可以使用@RocketMQMessageListener注解来定义一个消息消费者。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumer received: %s \n", message);
    }
}

上述代码中,我们定义了一个消息消费者,它将监听test-topic主题的消息,当有新的消息时,它将打印消息内容。

测试

至此,我们已经完成了Spring Boot集成RocketMQ的所有代码。接下来,我们就可以运行Spring Boot项目,并通过访问/send接口来发送消息,查看控制台的输出来验证消息消费者是否可以正常接收消息。

这就是Spring Boot集成RocketMQ的全过程。RocketMQ作为一款功能强大的消息中间件,不仅支持基本的消息生产和消费,还支持许多高级特性,如事务消息、顺序消息、延迟消息等。在实际的项目开发中,我们可以根据业务需求选择合适的消息模型,提高系统的可用性和可靠性。

事务消息

RocketMQ支持发送事务消息,也就是说,在发送消息的同时,我们可以执行本地的数据库操作,只有当本地的数据库操作成功时,消息才会真正被发送出去。

下面是一个发送事务消息的例子:

import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.*;
@RestController
public class TransactionProducerController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("/sendTransaction")
    public String sendTransaction(String message) {
        ExecutorService executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5000), r -> {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        });
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = rocketMQTemplate.createAndStartTransactionMQProducer("transaction-group",transactionListener,executor);
        producer.sendMessageInTransaction("test-topic", "TagA", message, null);
        return "Transaction Message: '" + message + "' sent.";
    }
}

在上述代码中,我们创建了一个TransactionMQProducer,并设置了一个TransactionListener来处理事务的提交和回滚。当发送事务消息时,我们需要调用sendMessageInTransaction方法。

顺序消息

RocketMQ支持发送顺序消息,也就是说,消息会按照发送的顺序被消费。

下面是一个发送顺序消息的例子:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;
@RestController
public class OrderlyProducerController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("/sendOrderly")
    public String sendOrderly(String message) {
        for (int i = 0; i < 100; i++) {
            rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload(message + i).build(), "hashkey");
        }
        return "Orderly Message: '" + message + "' sent.";
    }
}

在上述代码中,我们调用syncSendOrderly方法发送顺序消息。该方法的第三个参数是hashkey,RocketMQ会根据这个key来决定消息发送到哪个队列,具有相同hashkey的消息会发送到同一个队列。

延迟消息

RocketMQ支持发送延迟消息,也就是说,消息不会立即被消费,而是会在指定的时间后被消费。

下面是一个发送延迟消息的例子:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;
@RestController
public class DelayProducerController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("/sendDelay")
    public String sendDelay(String message) {
        rocketMQTemplate.syncSend("delay_topic", MessageBuilder.withPayload(message).build(), 1000, 4);
        return "Delay Message: '" + message + "' sent.";
    }
}

在上述代码中,我们调用syncSend方法发送延迟消息。该方法的第三个参数是延迟时间,第四个参数是延迟级别。

总结

到此这篇关于spring boot项目中集成rocketmq详细步骤的文章就介绍到这了,更多相关springboot集成rocketmq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 基于logback.xml不生效问题的解决

    基于logback.xml不生效问题的解决

    这篇文章主要介绍了基于logback.xml不生效问题的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • 浅谈springboot项目中定时任务如何优雅退出

    浅谈springboot项目中定时任务如何优雅退出

    这篇文章主要介绍了浅谈springboot项目中定时任务如何优雅退出?具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Java中将String类型转换为int类型的几种常见方法

    Java中将String类型转换为int类型的几种常见方法

    在java中经常会遇到需要对数据进行类型转换的场景,这篇文章主要给大家介绍了关于Java中将String类型转换为int类型的几种常见方法,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-07-07
  • Springboot中的@ComponentScan注解使用解析

    Springboot中的@ComponentScan注解使用解析

    这篇文章主要介绍了Springboot中的@ComponentScan注解使用解析,@ComponentScan用于类或接口上主要是指定扫描路径,spring会把指定路径下带有指定注解的类注册到IOC容器中,需要的朋友可以参考下
    2024-01-01
  • spring 整合mybatis后用不上session缓存的原因分析

    spring 整合mybatis后用不上session缓存的原因分析

    因为一直用spring整合了mybatis,所以很少用到mybatis的session缓存。什么原因呢?下面小编给大家介绍spring 整合mybatis后用不上session缓存的原因分析,需要的朋友可以参考下
    2017-02-02
  • JVM Metaspace内存溢出问题解决方案

    JVM Metaspace内存溢出问题解决方案

    这篇文章主要介绍了JVM Metaspace内存溢出排查总结过程图解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • mybatis中返回主键一直为1的问题

    mybatis中返回主键一直为1的问题

    这篇文章主要介绍了mybatis中返回主键一直为1的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • idea2020.1.3 手把手教你创建web项目的方法步骤

    idea2020.1.3 手把手教你创建web项目的方法步骤

    这篇文章主要介绍了idea 2020.1.3 手把手教你创建web项目的方法步骤,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • springboot启动过程中常用的回调示例详解

    springboot启动过程中常用的回调示例详解

    springboot提供非常丰富回调接口,利用这些接口可以做非常多的事情,本文通过实例代码给大家介绍springboot启动过程中常用的回调知识感兴趣的朋友跟随小编一起看看吧
    2022-01-01
  • @PathVariable获取路径中带有 / 斜杠的解决方案

    @PathVariable获取路径中带有 / 斜杠的解决方案

    这篇文章主要介绍了@PathVariable获取路径中带有 / 斜杠的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10

最新评论