SpringCloudStream原理和深入使用小结

 更新时间:2024年06月20日 12:29:59   作者:7仔要加油  
Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架,本文给大家介绍SpringCloudStream原理和深入使用,感兴趣的朋友跟随小编一起看看吧

简单概述

Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。

应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:Spring Cloud Stream能够屏蔽底层消息中间件【RabbitMQ,kafka等】的差异,降低切换成本,统一消息的编程模型

相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  • Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息
  • Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

**消费者组:**在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

Spring Message

Spring Message是Spring Framework的一个模块,其作用就是统一消息的编程模型。

package org.springframework.messaging;
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

@FunctionalInterface
public interface MessageChannel {
	long INDEFINITE_TIMEOUT = -1;
	default boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}
	boolean send(Message<?> message, long timeout);
}

消息通道里的消息由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅

public interface SubscribableChannel extends MessageChannel {
	boolean subscribe(MessageHandler handler);
	boolean unsubscribe(MessageHandler handler);
}

MessageHandler真正地消费/处理消息

@FunctionalInterface
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

Spring-Cloud-Stream的架构

img

快速入门

引入依赖

        <!--stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

增加配置文件

spring:
    cloud:
        stream:
            # 定义消息中间件
            binders:
              MyRabbit:
                  type: rabbit
                  environment:
                    spring:
                        rabbitmq:
                            host: localhost
                            port: 5672
                            username: root
                            password: root
                            vhost: /
            bindings:
            # 生产者中定义,定义发布对象
              myInput:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
            # 消费者中定义,定义订阅的对象
              myOutput-in-0:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
        # 消费者中定义,定义输出的函数
        function:
            definition: myOutput

生产者

@Resource
	private StreamBridge streamBridge;
	public void sendNormal() {
		streamBridge.send("myInput", "hello world");
	}

消费者

@Bean("myOutput")
	public Consumer<Message<String>> myOutput() {
		return (message) -> {
			MessageHeaders headers = message.getHeaders();
			System.out.println("myOutput head is : " + headers);
			String payload = message.getPayload();
			System.out.println("myOutput payload is : " + payload);
		};
	}

如何自定义Binder

  • 添加spring-cloud-stream依赖
  • 提供ProvisioningProvider的实现提供
  • MessageProducer的实现提供
  • MessageHandler的实现提供
  • Binder的实现创建Binder的配置
  • META-INF/spring.binders中定义绑定器

添加spring-cloud-stream依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供ProvisioningProvider的实现

ProvisioningProvider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

public class FileProvisioningProvider implements ProvisioningProvider<
	ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> {
	public FileProvisioningProvider() {
		super();
	}
	@Override
	public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}
	@Override
	public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}
	private static class FileMessageDestination implements ProducerDestination, ConsumerDestination {
		private final String destination;
		private FileMessageDestination(final String destination) {
			this.destination = destination;
		}
		@Override
		public String getName() {
			return destination.trim();
		}
		@Override
		public String getNameForPartition(int partition) {
			throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
		}
	}
}

提供MessageProducer的实现

MessageProducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。

super.onInit();
		executorService = Executors.newScheduledThreadPool(1);
	}
	@Override
	public void doStart() {
		executorService.scheduleWithFixedDelay(() -> {
			String payload = getPayload();
			if (payload != null) {
				Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
				sendMessage(receivedMessage);
			}
		}, 0, 50, TimeUnit.MILLISECONDS);
	}
	@Override
	protected void doStop() {
		executorService.shutdownNow();
	}
	private String getPayload() {
		try {
			List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
			String currentPayload = allLines.get(allLines.size() - 1);
			if (!currentPayload.equals(previousPayload)) {
				previousPayload = currentPayload;
				return currentPayload;
			}
		} catch (IOException e) {
			FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
		}
		return null;
	}
}

提供MessageHandler的实现

MessageHandler提供产生事件所需的逻辑。

public class FileMessageHandler extends AbstractMessageHandler {
	FileExtendedBindingProperties fileExtendedBindingProperties;
	ProducerDestination destination;
	public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) {
		this.destination = destination;
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}
	@Override
	protected void handleMessageInternal(Message<?> message) {
		try {
			if (message.getPayload() instanceof byte[]) {
				Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload());
			} else {
				throw new RuntimeException("处理消息失败");
			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
}

提供Binder的实现

提供自己的Binder抽象实现:

  • 扩展AbstractMessageChannelBinder
  • 将自定义的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的通用参数
  • 重写createProducerMessageHandlercreateConsumerEndpoint方法
public class FileMessageChannelBinder extends AbstractMessageChannelBinder
	<ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider>
	implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> {
	FileExtendedBindingProperties fileExtendedBindingProperties;
	public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) {
		super(headersToEmbed, provisioningProvider);
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}
	@Override
	protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
		FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties);
		return fileMessageHandler;
	}
	@Override
	protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception {
		FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties);
		return fileMessageProducerAdapter;
	}
	@Override
	public FileConsumerProperties getExtendedConsumerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName);
	}
	@Override
	public FileProducerProperties getExtendedProducerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedProducerProperties(channelName);
	}
	@Override
	public String getDefaultsPrefix() {
		return fileExtendedBindingProperties.getDefaultsPrefix();
	}
	@Override
	public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
		return fileExtendedBindingProperties.getExtendedPropertiesEntryClass();
	}
}

创建Binder的配置

严格要求创建一个 Spring 配置来初始化你的绑定器实现的 bean

@EnableConfigurationProperties(FileExtendedBindingProperties.class)
@Configuration
public class FileMessageBinderConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public FileProvisioningProvider fileMessageBinderProvisioner() {
		return new FileProvisioningProvider();
	}
	@Bean
	@ConditionalOnMissingBean
	public FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) {
		return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties);
	}
	@Bean
	public FileProducerProperties fileConsumerProperties() {
		return new FileProducerProperties();
	}
}

详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream

到此这篇关于SpringCloudStream原理和深入使用的文章就介绍到这了,更多相关SpringCloudStream原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • IDEA无法识别SpringBoot项目的简单解决办法

    IDEA无法识别SpringBoot项目的简单解决办法

    今天使用idea的时候,遇到idea无法启动springboot,所以这篇文章主要给大家介绍了关于IDEA无法识别SpringBoot项目的简单解决办法,需要的朋友可以参考下
    2023-08-08
  • java实现单机版五子棋小游戏

    java实现单机版五子棋小游戏

    这篇文章主要为大家详细介绍了java实现单机版五子棋小游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-12-12
  • Spring Boot集成Seata实现基于AT模式的分布式事务的解决方案

    Spring Boot集成Seata实现基于AT模式的分布式事务的解决方案

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务,这篇文章主要介绍了Spring Boot集成Seata实现基于AT模式的分布式事务,需要的朋友可以参考下
    2024-08-08
  • Kafka是什么及如何使用SpringBoot对接Kafka(最新推荐)

    Kafka是什么及如何使用SpringBoot对接Kafka(最新推荐)

    这篇文章主要介绍了Kafka是什么,以及如何使用SpringBoot对接Kafka,今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate,需要的朋友可以参考下
    2023-11-11
  • IDEA Ultimate2020.2版本配置Tomcat详细教程

    IDEA Ultimate2020.2版本配置Tomcat详细教程

    这篇文章主要介绍了IDEA Ultimate2020.2版本配置Tomcat教程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • java 启动exe程序,传递参数和获取参数操作

    java 启动exe程序,传递参数和获取参数操作

    这篇文章主要介绍了java 启动exe程序,传递参数和获取参数操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • 使用Springboot打成jar包thymeleaf的问题

    使用Springboot打成jar包thymeleaf的问题

    这篇文章主要介绍了使用Springboot打成jar包thymeleaf的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • SpringBoot项目中的视图解析器问题(两种)

    SpringBoot项目中的视图解析器问题(两种)

    SpringBoot官网推荐使用HTML视图解析器,但是根据个人的具体业务也有可能使用到JSP视图解析器,所以本文介绍了两种视图解析器,感兴趣的可以了解下
    2020-06-06
  • AbstractQueuedSynchronizer内部类Node使用讲解

    AbstractQueuedSynchronizer内部类Node使用讲解

    这篇文章主要为大家介绍了AbstractQueuedSynchronizer内部类Node使用讲解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • 说说Spring中为何要引入Lookup注解

    说说Spring中为何要引入Lookup注解

    这篇文章主要给大家介绍了关于Spring中为何要引入Lookup注解的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01

最新评论