spring-cloud-stream的手动消息确认问题
spring-cloud-stream的手动消息确认
对于kafka-binder来说,设置autoCommitOffset为false.然后在listen中手动确认
@StreamListener(Sink.INPUT) void listen(@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment){ //...业务代码 acknowledgment.acknowledge(); }
需要注意的是autoCommitOffset的设置位置.
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false#应该在这里设置 spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false#这里设置是无效的,获取Acknowledgment时会是null
springcloud的stream消息组件的使用@StreamListener
常见问题(使用rabbitmq)
消息分组防止多实例重复消费
在一个服务多实例场景下使用默认使用@StreamListener监听消息消费,yml中没有特殊配置的话是会导致消息重复消费的,原因是此时每个实例都是匿名在rabbitmq上注册的队列,需要给消费者指定一个消费组,让消息在组里只被消费一次;
spring.cloud.stream.bindings.xxx(消费者队列名).group=xxx(组名)
在springboot下在同一个服务(项目中)使用@input和@outPut时指定的队列名是不可以重复的.会在启动编译的时候报bean定义重复。需要在yml给生产者和消费者指定同一个交换机。
spring: rabbitmq: host: xxx.xxx.xxx.xx port: 35672 username: xxx password: xxx virtual-host: /xxx cloud: stream: bindings: in: #若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topic destination: test #在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列 #并且默认生成的交换机是topic类型的,会导致重复消费 group: myIn out: destination: test
先上依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.fchan</groupId> <artifactId>springcloudstream</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springcloudstream</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- <version>2.0.1.RELEASE</version>--> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Ditmars.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
再上yml配置
spring: rabbitmq: host: xxx.xxx.xxx.xx port: 35672 username: xxx password: xxx virtual-host: /xxx cloud: stream: bindings: in: #若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topic destination: test #在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列 #并且默认生成的交换机是topic类型的,会导致重复消费 group: myIn out: destination: test
消息生产者
package com.fchan.springcloudstream.service; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface MyMessageChannel { String out = "out"; String in = "in"; @Output(out) MessageChannel out(); @Input(in) SubscribableChannel in(); }
发送消息
package com.fchan.springcloudstream.controller; import com.fchan.springcloudstream.service.MyMessageChannel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; @RestController public class MessageController { @Resource private MyMessageChannel myMessageChannel; @RequestMapping("test") public String testMessage(){ Map<String,Object> map = new HashMap<>(); map.put("shopId", "123"); myMessageChannel.out().send(MessageBuilder.withPayload(map).build()); return "success"; } }
消息消费者
package com.fchan.springcloudstream.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import java.util.Map; @Component @EnableBinding({MyMessageChannel.class}) public class MyConsumer { Logger log = LoggerFactory.getLogger(MyConsumer.class); @StreamListener(MyMessageChannel.in) public void input(Message<Map<String,Object>> message){ log.info("收到消息:{}", message.getPayload()); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Vue+ElementUI技巧之自定义表单项label的文字提示方法
这篇文章主要给大家介绍了关于Vue+ElementUI技巧之自定义表单项label文字提示的相关资料,文中通过图文以及代码示例介绍的非常详细,对大家的学习或者工作具有一定的参考借鉴价值,需要的朋友可以参考下2024-02-02vue项目报错:Missing script:"serve"的解决办法
这篇文章主要给大家介绍了关于vue项目报错:Missing script:"serve"的解决办法,"missing script: serve"是一个错误信息,意味着在执行启动脚本时,找不到名为"serve"的脚本,需要的朋友可以参考下2023-11-11vue3如何添加eslint校验(eslint-plugin-vue)
这篇文章主要介绍了vue3如何添加eslint校验(eslint-plugin-vue),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-01-01
最新评论