Spring Cloud Stream如何实现服务之间的通讯

 更新时间:2019年10月15日 11:37:59   作者:维晟  
这篇文章主要介绍了Spring Cloud Stream如何实现服务之间的通讯,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

Spring Cloud Stream

Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构。Spring Cloud Stream本身对Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个服务之间的通讯来演示Spring Cloud Stream的使用方法。

整体概述

服务要想与其他服务通讯要定义通道,一般会定义输出通道和输入通道,输出通道用于发送消息,输入通道用于接收消息,每个通道都会有个名字(输入和输出只是通道类型,可以用不同的名字定义很多很多通道),不同通道的名字不能相同否则会报错(输入通道和输出通道不同类型的通道名称也不能相同),绑定器是操作RabbitMQ或Kafka的抽象层,为了屏蔽操作这些消息中间件的复杂性和不一致性,绑定器会用通道的名字在消息中间件中定义主题,一个主题内的消息生产者来自多个服务,一个主题内消息的消费者也是多个服务,也就是说消息的发布和消费是通过主题进行定义和组织的,通道的名字就是主题的名字,在RabbitMQ中主题使用Exchanges实现,在Kafka中主题使用Topic实现。

准备环境

创建两个项目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我们用Spring Cloud Stream实现通讯,spring-cloud-stream-b我们用Spring Cloud Stream的底层模块Spring Integration实现通讯。

两个项目的POM文件依赖都是:

<dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-test-support</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

spring-cloud-stream-binder-rabbit是指绑定器的实现使用RabbitMQ。

项目配置内容application.properties:

spring.application.name=spring-cloud-stream-a
server.port=9010

#设置默认绑定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b
server.port=9011

#设置默认绑定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

启动一个rabbitmq:

docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

编写A项目代码

在A项目中定义一个输入通道一个输出通道,定义通道在接口中使用@Input和@Output注解定义,程序启动的时候Spring Cloud Stream会根据接口定义将实现类自动注入(Spring Cloud Stream自动实现该接口不需要写代码)。

A服务输入通道,通道名称ChatExchanges.A.Input,接口定义输入通道必须返回SubscribableChannel:

public interface ChatInput {
  String INPUT = "ChatExchanges.A.Input";
  @Input(ChatInput.INPUT)
  SubscribableChannel input();
}

A服务输出通道,通道名称ChatExchanges.A.Output,输出通道必须返回MessageChannel:

public interface ChatOutput {

  String OUTPUT = "ChatExchanges.A.Output";

  @Output(ChatOutput.OUTPUT)
  MessageChannel output();
}

定义消息实体类:

public class ChatMessage implements Serializable {

  private String name;
  private String message;
  private Date chatDate;

  //没有无参数的构造函数并行化会出错
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);
  }
}

在业务处理类上用@EnableBinding注解绑定输入通道和输出通道,这个绑定动作其实就是创建并注册输入和输出通道的实现类到Bean中,所以可以直接是使用@Autowired进行注入使用,另外消息的串行化默认使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解进行指定通道消息的监听:

//ChatInput.class的输入通道不在这里绑定,监听到数据会找不到AClient类的引用。
//Input和Output通道定义的名字不能一样,否则程序启动会抛异常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {

  private static Logger logger = LoggerFactory.getLogger(AClient.class);

  @Autowired
  private ChatOutput chatOutput;

  //StreamListener自带了Json转对象的能力,收到B的消息打印并回复B一个新的消息。
  @StreamListener(ChatInput.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());

    ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

    chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
  }
}

到此A项目代码编写完成。

编写B项目代码

B项目使用Spring Integration实现消息的发布和消费,定义通道时我们要交换输入通道和输出通道的名称:

public interface ChatProcessor {

  String OUTPUT = "ChatExchanges.A.Input";
  String INPUT = "ChatExchanges.A.Output";

  @Input(ChatProcessor.INPUT)
  SubscribableChannel input();

  @Output(ChatProcessor.OUTPUT)
  MessageChannel output();
}

消息实体类:

public class ChatMessage {
  private String name;
  private String message;
  private Date chatDate;

  //没有无参数的构造函数并行化会出错
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);
  }
}

业务处理类用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解发布消息:

@EnableBinding(ChatProcessor.class)
public class BClient {

  private static Logger logger = LoggerFactory.getLogger(BClient.class);

  //@ServiceActivator没有Json转对象的能力需要借助@Transformer注解
  @ServiceActivator(inputChannel=ChatProcessor.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());
  }

  @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
  public ChatMessage transform(String message) throws Exception{
    ObjectMapper objectMapper = new ObjectMapper();
    return objectMapper.readValue(message,ChatMessage.class);
  }

  //每秒发出一个消息给A
  @Bean
  @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
  public GenericMessage<ChatMessage> SendChatMessage(){
    ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
    GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
    return gm;
  }
}

运行程序

启动A项目和B项目:


源码

Github仓库:https://github.com/sunweisheng/spring-cloud-example

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • Java 运算符 动力节点Java学院整理

    Java 运算符 动力节点Java学院整理

    这篇文章主要介绍了Java 运算符 动力节点Java学院整理,需要的朋友可以参考下
    2017-04-04
  • MybatisPlus的MetaObjectHandler与@TableLogic使用

    MybatisPlus的MetaObjectHandler与@TableLogic使用

    这篇文章主要介绍了MybatisPlus的MetaObjectHandler与@TableLogic使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • 详解Spring Boot 项目启动时执行特定方法

    详解Spring Boot 项目启动时执行特定方法

    这篇文章主要介绍了详解Spring Boot 项目启动时执行特定方法,Springboot给我们提供了两种“开机启动”某些方法的方式:ApplicationRunner和CommandLineRunner。感兴趣的小伙伴们可以参考一下
    2018-06-06
  • SpringBoot获取配置文件内容的几种方式总结

    SpringBoot获取配置文件内容的几种方式总结

    大家都知道SpringBoot获取配置文件的方法有很多,下面这篇文章主要给大家介绍了关于SpringBoot获取配置文件内容的几种方式,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-02-02
  • java编程实现求解八枚银币代码分享

    java编程实现求解八枚银币代码分享

    这篇文章主要介绍了java编程实现求解八枚银币代码分享,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

    SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

    分布式事务是在微服务开发中经常会遇到的一个问题,之前的文章中我们已经实现了利用Seata来实现强一致性事务,其实还有一种广为人知的方案就是利用消息队列来实现分布式事务,保证数据的最终一致性,也就是我们常说的柔性事务
    2022-09-09
  • 你什么是Elastic Stack(ELK)

    你什么是Elastic Stack(ELK)

    这篇文章主要介绍了你什么是Elastic Stack(ELK),ELK是三款软件的简称,分别是Elasticsearch、Logstash、Kibana组成,需要的朋友可以参考下
    2023-04-04
  • SpringMvc @Valid如何抛出拦截异常

    SpringMvc @Valid如何抛出拦截异常

    这篇文章主要介绍了SpringMvc @Valid如何抛出拦截异常,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09
  • java jar包后台运行的两种方式详解

    java jar包后台运行的两种方式详解

    后台运行jar的方法有多种方法可以实现Java后台运行jar文件,下面介绍其中两种常见的方法,下面这篇文章主要给大家介绍了关于java jar包后台运行的两种方式,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-07-07
  • java String[]字符串数组自动排序的简单实现

    java String[]字符串数组自动排序的简单实现

    下面小编就为大家带来一篇java String[]字符串数组自动排序的简单实现。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-09-09

最新评论