最新SpringCloud Stream消息驱动讲解

 更新时间:2022年11月02日 11:16:09   作者:小钟要学习!!!  
SpringCloud Stream 是一个构建消息驱动微服务的框架,通过 SpringCloud Stream 连接消息中间件,以实现消息事件驱动,这篇文章主要介绍了SpringCloud Stream消息驱动,需要的朋友可以参考下

SpringCloud Stream消息驱动

1、SpringCloud Stream概述

官方地址:https://spring.io/projects/spring-cloud-stream#overview

中文指导手册地址:https://m.wang1314.com/doc/webapp/topic/20971999.html

SpringCloud Stream 是一个构建消息驱动微服务的框架
应用程序通过 outputs 或 inputs 来与 SpringCloud Stream 中的 binder 对象交互
SpringCloud Stream 中的 binder 对象负责与消息中间件交互
通过 SpringCloud Stream 连接消息中间件,以实现消息事件驱动

什么是SpringCloudStream官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka。

1.1、设计思想

1、标注的MQ流程

生产者/消费者之间靠消息媒介传递信息内容【massage】

消息必须走特定的通道【消息通道MessageChannel】

消息通道里的消息如何被消费呢,谁负责收发处理

消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

2、Cloud Stream的作用

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

3、什么是Binder

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。 4、Stream中的消息通信方式遵循了发布-订阅模式

使用Topic主题进行广播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

1.2、标准的流程套路

1、Binder:很方便的连接中间件,屏蔽不同的差异

2、Channel

通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置

3、Source和Sink

简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

1.3、编码API和常用注解

组成和注解描述
Middleware中间件,目前只支持RabbitM和Kafka
BinderBinder是应用与消息中间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过通道离开应用程序
@StreamListener监听队列,用户消费者的队列的消息接收
@EnableBinding指通道channel和exchange绑定在一起

2、消息驱动之生产者(output)

2.1、新建模块cloud-stream-rabbitmq-provider8801

2.2、引入pom.xml配置文件

如果是需要Stream整合的就将依赖改为spring-cloud-starter-stream-kafka

<dependencies>
    <!--stream整合rabbit依赖-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.3、YAML配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称,消息生产者
          destination: studyExchange # 表示要使用的Exchange名称定义【自定义】
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置【上面的配置】

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

2.4、生产者启动类

 package com.zcl.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 描述:消息生产者启动类
 *
 * @author zhong
 * @date 2022-09-22 12:19
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

2.5、业务实现

2.5.1、服务接口实现类

自己创建一个实现的接口以及里面的方法

注意:在这个服务实现类里面不是使用@Service注解了,因为不是web应用,而是Stream消息驱动,是与中间件进行打交道的不是与数据库

package com.zcl.springcloud.service.Impl;

import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 描述:发送接口实现类
 * 必须使用@EnableBinding(Source.class)注解开启消息推送管道
 *
 * @author zhong
 * @date 2022-09-22 12:24
 */
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    /**
     * 发送消息
     * @return
     */
    @Override
    public String send() {
        // 定义消息
        String serial = UUID.randomUUID().toString();
        // 构建并发送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        log.info("-------------- " + serial + " ----------------");
        return serial;
    }
}

2.5.2、控制器实现

package com.zcl.springcloud.controller;

import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 描述:消息发送控制器
 *
 * @author zhong
 * @date 2022-09-22 12:37
 */
@RestController
public class SendMessageController {

    /**
     * 注入消息发送管道接口
     */
    @Resource
    private IMessageProvider messageProvider;

    /**
     * 每调用一次接口发送一次消息
     * @return
     */
    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

2.6、启动测试

  • 启动7001Eureka访问中心
  • 启动8801消息发送者,启动成功以及观察RabbitMQ的管理界面

3.访问接口发送消息,查看MQ的管理页面波峰情况

3、消息驱动之消费者(input)

同样的参考如下流程图

3.1、新建cloud-stream-rabbitmq-consumer8802模块

3.2、引入pom.xml依赖

与8801一样

3.3、添加YAML配置文件

配置文件与消息生产的区别在于:

output: # 这个名字是一个通道的名称
	destination: studyExchange # 表示要使用的Exchange名称定义
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

3.4、添加启动类StreamMQMain8802

与消息生产者一样

3.5、业务实现

必须要有@Component注解注入到Spring容器中

package com.zcl.springcloud.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 描述:消息消费者控制器
 *
 * @author zhong
 * @date 2022-09-22 13:18
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    /**
     * 注入消费者的端口号
     */
    @Value("${server.port}")
    private String port;

    /**
     * 监听消息
     * @param message
     * @return
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消费者1号接收到的消息 ----- " + message.getPayload() + " -----,port: " + port);
    }
}

3.6、启动项目测试

  • 启动7001
  • 启动8801,消息发送者
  • 启动8802,消息消费者
  • 8801发送消息,8802消费消息,并查看具体的MQ波峰图

控制器输出

4、分组消费与持久化

4.1、完整参考cloud-stream-rabbitmq-consumer8802,创建8803项目

除了启动的端口号不一样之外其他的配置都一样

4.2、启动项目发现问题

  • 启动7001(Eureka服务中心)
  • 启动8801(生产)、8802(消费)、8803(消费)
  • 测试发送消失是否两个消费者都可以接收到

4.2.1、重复消费

目前是8802/8803同时都收到了,存在重复消费问题

解决方案:分组和持久化属性group

常见案例

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

4.2.2、分组

自定义配置分组,自定义分为同一个组,解决重复消费问题

配置文件分组

分别给8801、8802进行分组【orderA】

重启项目查看MQ管理

orderB是历史记录,上面的配置以及都分为了ordeerA组,进入orderA组可以查看实际的消费者数量

同一组内会发生竞争关系,只有其中一个可以消费,启动项目测试是否为真

4.2.3、持久化

通过上述,解决了重复消费问题,再看看持久化

  • 停止8802/8803并去除掉8802的分组group: atguiguA,8803保留

  • 8801先发送7条消息到rabbitmq

3.先启动8802,无分组属性配置,后台没有打出来消息

8802因为取消了groupA的分组所以获取不到持久化的数据(如果重启mq也会消失)

4.再启动8803,有分组属性配置,后台打出来了MQ上的消息

8803保存groupA的分组所以在启动的时候就会将持久化的数据消费

到此这篇关于SpringCloud Stream消息驱动的文章就介绍到这了,更多相关SpringCloud Stream消息驱动内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java使用http实现文件下载学习示例

    java使用http实现文件下载学习示例

    这篇文章主要介绍了java使用http实现文件下载学习示例,需要的朋友可以参考下
    2014-04-04
  • @Autowired注解注入的xxxMapper报错问题及解决

    @Autowired注解注入的xxxMapper报错问题及解决

    这篇文章主要介绍了@Autowired注解注入的xxxMapper报错问题及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • 详解Java中的静态代理模式

    详解Java中的静态代理模式

    这篇文章主要为大家介绍了Java中的静态代理模式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2022-12-12
  • 解决Spring Cloud Gateway获取body内容,不影响GET请求的操作

    解决Spring Cloud Gateway获取body内容,不影响GET请求的操作

    这篇文章主要介绍了解决Spring Cloud Gateway获取body内容,不影响GET请求的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • SpringCloud hystrix服务降级概念介绍

    SpringCloud hystrix服务降级概念介绍

    什么是服务降级?当服务器压力剧增的情况下,根据实际业务情况及流量,对一些服务和页面有策略的不处理或换种简单的方式处理,从而释放服务器资源以保证核心交易正常运作或高效运作
    2022-09-09
  • java数独游戏完整版分享

    java数独游戏完整版分享

    这篇文章主要为大家分享了java数独游戏的完整版,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-12-12
  • Spring Security单项目权限设计过程解析

    Spring Security单项目权限设计过程解析

    这篇文章主要介绍了Spring Security单项目权限设计过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • 使用SpringBoot开发Restful服务实现增删改查功能

    使用SpringBoot开发Restful服务实现增删改查功能

    Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。这篇文章主要介绍了基于SpringBoot开发一个Restful服务,实现增删改查功能,需要的朋友可以参考下
    2018-01-01
  • 一文详解Java中Stream流的使用

    一文详解Java中Stream流的使用

    JDK8新增了Stream(流操作)处理集合的数据,可执行查找、过滤和映射数据等操作.本文将通过一些实例介绍stream流的使用,需要的可以参考一下
    2022-05-05
  • java非法字符‘\ufeff‘解决方法

    java非法字符‘\ufeff‘解决方法

    本文主要介绍了java非法字符‘\ufeff‘解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07

最新评论