Spring Cloud Stream简单用法
Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在这个基础上提供了RocketMQ的支持
简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者
生产者
pom文件中加入依赖
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.1.0.RELEASE</version> </dependency> </dependencies>
配置文件中增加关于Spring Cloud Stream binder和bindings的配置
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON
其中destination代表生产的数据发送到的topic 然后定义一个channel用于数据发送
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface TestChannel { @Output("output") MessageChannel output(); }
最后构造数据发送的接口
@Controller public class SendMessageController { @Resource private TestChannel testChannel; @ResponseBody @RequestMapping(value = "send", method = RequestMethod.GET) public String sendMessage() { String messageId = UUID.randomUUID().toString(); Message<String> message = MessageBuilder .withPayload("this is a test:" + messageId) .setHeader(MessageConst.PROPERTY_TAGS, "test") .build(); try { testChannel.output().send(message); return messageId + "发送成功"; } catch (Exception e) { return messageId + "发送失败,原因:" + e.getMessage(); } } }
消费者
消费者的pom引入与生产者相同,在此不再赘述,配置时需要将stream的output修改为input并修改对应属性
spring: application: name: zhao-cloud-stream-consumer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input: consumer: tags: test bindings: input: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON group: test
另外关于channel的构造也要做同样的修改
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface TestChannel { @Input("input") SubscribableChannel input(); }
最后我在启动类中对收到的消息进行了监听
@StreamListener("input") public void receiveInput(@Payload Message message) throws ValidationException { System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); }
测试结果
Stream其他特性
消息发送失败的处理
消息发送失败后悔发送到默认的一个“topic.errors"的channel中(topic是配置的destination)。要配置消息发送失败的处理,需要将错误消息的channel打开 消费者配置如下
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON producer: errorChannelEnabled: true
在启动类中配置错误消息的Channel信息
@Bean("stream-test-topic.errors") MessageChannel testoutPutErrorChannel(){ return new PublishSubscribeChannel(); }
新建异常处理service
import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; @Service public class ErrorProducerService { @ServiceActivator(inputChannel = "stream-test-topic.errors") public void receiveProducerError(Message message){ System.out.println("receive error msg :"+message); } }
当发生异常时,由于测试类中已经将异常捕获,处理发送异常主要是在这里进行。模拟,应用与rocketMq断开的场景。可见
消费者错误处理
首先增加配置为
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON producer: errorChannelEnabled: true
增加相应的模拟异常的操作
@StreamListener("input") public void receiveInput(@Payload Message message) throws ValidationException { //System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); throw new RuntimeException("oops"); } @ServiceActivator(inputChannel = "stream-test-topic.test.errors") public void receiveConsumeError(Message message){ System.out.println("receive error msg"+message.getPayload()); }
代码地址https://github.com/zhendiao/deme-code/tree/main/zp
到此这篇关于Spring Cloud Stream简单用法的文章就介绍到这了,更多相关Spring Cloud Stream使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java使用DateUtils对日期进行数学运算经典应用示例【附DateUtils相关包文件下载】
这篇文章主要介绍了Java使用DateUtils对日期进行数学运算的方法,可实现针对日期时间的各种常见运算功能,并附带DateUtils的相关包文件供读者下载使用,需要的朋友可以参考下2017-11-11将springboot项目生成可依赖的jar并引入到项目中的方法
SpringBoot项目默认打包的是可运行jar包,也可以打包成不可运行的jar包,本文给大家介绍将springboot项目生成可依赖的jar并引入到项目中的方法,感兴趣的朋友一起看看吧2023-11-11java -jar命令及SpringBoot通过java -jav启动项目的过程
本篇文章将为大家讲述关于 SpringBoot 项目工程完成后,是如何通过 java-jar 命令来启动的,以及介绍 java-jar 命令的详细内容,对SpringBoot java -jav启动过程感兴趣的朋友跟随小编一起看看吧2023-05-05springBoot server.port=-1的含义说明
这篇文章主要介绍了springBoot server.port=-1的含义说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-08-08
最新评论