Spring Cloud Stream简单用法

 更新时间:2021年07月23日 14:50:36   作者:微瞰技术  
Spring cloud stream是为构建微服务消息驱动而产生的一种框架。Spring Cloud Stream基于Spring boot的基础上,可创建独立的、生产级别的Spring应用,并采用Spring Integration来连接消息中间件提供消息事件驱动,一起看看吧

Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在这个基础上提供了RocketMQ的支持

简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者

生产者

pom文件中加入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

    </dependencies>

配置文件中增加关于Spring Cloud Stream binder和bindings的配置

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 JSON

其中destination代表生产的数据发送到的topic 然后定义一个channel用于数据发送

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TestChannel {
    @Output("output")
    MessageChannel output();
}

最后构造数据发送的接口

@Controller
public class SendMessageController {
    @Resource
    private TestChannel testChannel;

    @ResponseBody
    @RequestMapping(value = "send", method = RequestMethod.GET)
    public String sendMessage() {
        String messageId = UUID.randomUUID().toString();
        Message<String> message = MessageBuilder
                .withPayload("this is a test:" + messageId)
                .setHeader(MessageConst.PROPERTY_TAGS, "test")
                .build();
        try {
            testChannel.output().send(message);
            return messageId + "发送成功";
        } catch (Exception e) {
            return messageId + "发送失败,原因:" + e.getMessage();
        }
    }
}

消费者

消费者的pom引入与生产者相同,在此不再赘述,配置时需要将stream的output修改为input并修改对应属性

spring:
  application:
    name: zhao-cloud-stream-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          input:
            consumer:
              tags: test
      bindings:
        input:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 JSON
          group: test

另外关于channel的构造也要做同样的修改

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface TestChannel {
    @Input("input")
    SubscribableChannel input();
}

最后我在启动类中对收到的消息进行了监听

   @StreamListener("input")
    public void receiveInput(@Payload Message message) throws ValidationException {
        System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
    }

测试结果

file

file

Stream其他特性

消息发送失败的处理

消息发送失败后悔发送到默认的一个“topic.errors"的channel中(topic是配置的destination)。要配置消息发送失败的处理,需要将错误消息的channel打开 消费者配置如下

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 JSON
          producer:
            errorChannelEnabled: true

在启动类中配置错误消息的Channel信息

 @Bean("stream-test-topic.errors")
    MessageChannel testoutPutErrorChannel(){
        return new PublishSubscribeChannel();
    }

新建异常处理service

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
public class ErrorProducerService {

    @ServiceActivator(inputChannel = "stream-test-topic.errors")
    public void receiveProducerError(Message message){
        System.out.println("receive error msg :"+message);
    }
}

当发生异常时,由于测试类中已经将异常捕获,处理发送异常主要是在这里进行。模拟,应用与rocketMq断开的场景。可见

file file

消费者错误处理

首先增加配置为

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 JSON
          producer:
            errorChannelEnabled: true

增加相应的模拟异常的操作

 @StreamListener("input")
    public void receiveInput(@Payload Message message) throws ValidationException {
        //System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
        throw new RuntimeException("oops");
    }
    @ServiceActivator(inputChannel = "stream-test-topic.test.errors")
    public void receiveConsumeError(Message message){
        System.out.println("receive error msg"+message.getPayload());
    }

file

代码地址https://github.com/zhendiao/deme-code/tree/main/zp

到此这篇关于Spring Cloud Stream简单用法的文章就介绍到这了,更多相关Spring Cloud Stream使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 邻接表无向图的Java语言实现完整源码

    邻接表无向图的Java语言实现完整源码

    这篇文章主要介绍了邻接表无向图的Java语言实现完整源码,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • SpringBoot3解决跨域请求的方案小结

    SpringBoot3解决跨域请求的方案小结

    解决跨域请求,主要有JSONP,iframe,window.name,CORS等方式,其中CORS方式是最常用的跨域实现方式,而且是对各种请求方法、各种数据请求类型都是完美支持的,本文介绍了SpringBoot3解决跨域请求的方案小结,需要的朋友可以参考下
    2024-07-07
  • Java实现文件的加密解密功能示例

    Java实现文件的加密解密功能示例

    这篇文章主要介绍了Java实现文件的加密解密功能,结合具体实例形式详细分析了java针对文件的读取、判断、加密、解密等相关步骤与操作技巧,需要的朋友可以参考下
    2017-10-10
  • 一文深入理解Java中的深拷贝机制

    一文深入理解Java中的深拷贝机制

    在Java编程中,我们经常需要处理对象的复制问题,深拷贝和浅拷贝是两种常见的复制方式,它们在内存管理和对象引用方面存在不同特点,本文将带大家深入探究Java中的深拷贝机制,需要的朋友可以参考下
    2023-09-09
  • Java swing 图像处理多种效果实现教程

    Java swing 图像处理多种效果实现教程

    这篇文章主要介绍了Java swing 图像处理多种效果实现教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Java使用DateUtils对日期进行数学运算经典应用示例【附DateUtils相关包文件下载】

    Java使用DateUtils对日期进行数学运算经典应用示例【附DateUtils相关包文件下载】

    这篇文章主要介绍了Java使用DateUtils对日期进行数学运算的方法,可实现针对日期时间的各种常见运算功能,并附带DateUtils的相关包文件供读者下载使用,需要的朋友可以参考下
    2017-11-11
  • 教你利用SpringBoot写一个属于自己的Starter

    教你利用SpringBoot写一个属于自己的Starter

    如果我们将可独立于业务代码之外的功配置模块封装成一个个starter,复用的时候只需要将其在pom中引用依赖即可,SpringBoot为我们完成自动装配,简直不要太爽,这篇文章主要给大家介绍了关于如何利用SpringBoot写一个属于自己的Starter,需要的朋友可以参考下
    2022-03-03
  • 将springboot项目生成可依赖的jar并引入到项目中的方法

    将springboot项目生成可依赖的jar并引入到项目中的方法

    SpringBoot项目默认打包的是可运行jar包,也可以打包成不可运行的jar包,本文给大家介绍将springboot项目生成可依赖的jar并引入到项目中的方法,感兴趣的朋友一起看看吧
    2023-11-11
  • java -jar命令及SpringBoot通过java -jav启动项目的过程

    java -jar命令及SpringBoot通过java -jav启动项目的过程

    本篇文章将为大家讲述关于 SpringBoot 项目工程完成后,是如何通过 java-jar 命令来启动的,以及介绍 java-jar 命令的详细内容,对SpringBoot java -jav启动过程感兴趣的朋友跟随小编一起看看吧
    2023-05-05
  • springBoot server.port=-1的含义说明

    springBoot server.port=-1的含义说明

    这篇文章主要介绍了springBoot server.port=-1的含义说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08

最新评论