RabbitMQ Stream插件使用案例代码
2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。
- RabbitStreamTemplate
- StreamListener容器
将spring rabbit流依赖项添加到项目中:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-stream</artifactId> <version>3.1.4</version> </dependency>
您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:
@Bean Queue stream() { return QueueBuilder.durable("stream.queue1") .stream() .build(); }
然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:
@Bean StreamAdmin streamAdmin(Environment env) { return new StreamAdmin(env, sc -> { sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create(); sc.stream("stream.queue2").create(); }); }
一、Sending Messages
RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。
public interface RabbitStreamOperations extends AutoCloseable { CompletableFuture<Boolean> send(Message message); CompletableFuture<Boolean> convertAndSend(Object message); CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp); CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message); MessageBuilder messageBuilder(); MessageConverter messageConverter(); StreamMessageConverter streamMessageConverter(); @Override void close() throws AmqpException; }
RabbitStreamTemplate实现具有以下构造函数和属性:
public RabbitStreamTemplate(Environment environment, String streamName) { } public void setMessageConverter(MessageConverter messageConverter) { } public void setStreamConverter(StreamMessageConverter streamConverter) { } public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) { }
MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。
StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。
您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。
ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。
二、Receiving Messages
异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。
侦听器容器需要一个Environment以及一个流名称。
您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:
public interface StreamMessageListener extends MessageListener { void onStreamMessage(Message message, Context context); }
有关支持的属性的信息,请参阅消息侦听器容器配置。
与模板类似,容器具有ConsumerCustomizer属性。
有关自定义环境和使用者的信息,请参阅Java客户端文档。
使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。
三、Examples
@Bean RabbitStreamTemplate streamTemplate(Environment env) { RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1"); template.setProducerCustomizer((name, builder) -> builder.name("test")); return template; } @Bean RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) { return new StreamRabbitListenerContainerFactory(env); } @RabbitListener(queues = "test.stream.queue1") void listen(String in) { ... } @Bean RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) { StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env); factory.setNativeListener(true); factory.setConsumerCustomizer((id, builder) -> { builder.name("myConsumer") .offset(OffsetSpecification.first()) .manualTrackingStrategy(); }); return factory; } @RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory") void nativeMsg(Message in, Context context) { ... context.storeOffset(); } @Bean Queue stream() { return QueueBuilder.durable("test.stream.queue1") .stream() .build(); } @Bean Queue stream() { return QueueBuilder.durable("test.stream.queue2") .stream() .build(); }
2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。
@Bean public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) { StreamRetryOperationsInterceptorFactoryBean rfb = new StreamRetryOperationsInterceptorFactoryBean(); rfb.setRetryOperations(retryTemplate); rfb.setStreamMessageRecoverer((msg, context, throwable) -> { ... }); return rfb; }
四、Super Streams
超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。
1、调配
为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。
@Bean SuperStream superStream() { return new SuperStream("my.super.stream", 3); }
RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。
如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:
@Bean SuperStream superStream() { return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i) .mapToObj(j -> "rk-" + j) .collect(Collectors.toList())); }
key 的数量必须等于分区的数量。
2、向超级流生产消息
你必须向 RabbitStreamTemplate
添加一个 superStreamRoutingFunction
:
@Bean RabbitStreamTemplate streamTemplate(Environment env) { RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1"); template.setSuperStreamRouting(message -> { // some logic to return a String for the client's hashing algorithm }); return template; }
你也可以通过AMQP发布,使用 RabbitTemplate
。
到此这篇关于RabbitMQ Stream插件使用详解的文章就介绍到这了,更多相关RabbitMQ Stream插件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
IntelliJ IDEA 2023.1.4 无法刷新Maven项目模块的问题及解决方法
这篇文章主要介绍了如何排查 IDEA 自身报错问题,本文以IntelliJ IDEA 2023.1.4无法刷新项目Maven模块的问题为例,给大家详细讲解,需要的朋友可以参考下2023-08-08springboot+thymeleaf+shiro标签的实例
这篇文章主要介绍了springboot+thymeleaf+shiro标签的实例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-01-01输出java进程的jstack信息示例分享 通过线程堆栈信息分析java线程
通过ps到java进程号将进程的jstack信息输出。jstack信息是java进程的线程堆栈信息,通过该信息可以分析java的线程阻塞等问题。2014-01-01Spring Boot集成Shiro实现动态加载权限的完整步骤
这篇文章主要给大家介绍了关于Spring Boot集成Shiro实现动态加载权限的完整步骤,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧2019-09-09
最新评论