被kafka-client和springkafka版本坑到自闭及解决

 更新时间:2022年03月26日 10:22:13   作者:追月亮的猴子  
这篇文章主要介绍了被kafka-client和springkafka版本坑到自闭及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

被kafka-client和springkafka版本坑

上周刚刚欢天喜地的在linux上部了kafka,这周打算用spring-boot框架写个简单demo跑一下,结果悲剧就此展开。

首先建立maven工程:pom中添加spring boot kafka依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.1.5.RELEASE</version>
      <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.example</groupId>
   <artifactId>kafkaproducer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>kafkaproducer</name>
   <description>Demo project for Spring Boot</description>
 
   <properties>
      <java.version>1.8</java.version>
   </properties>
 
   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
 
      <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
         <optional>true</optional>
      </dependency>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
      </dependency>
      <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
      </dependency>
   </dependencies>
 
   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>
 
 
</project>

配置文件如下:

server.port=8089
spring.kafka.bootstrap-servers=ip:port
spring.kafka.producer.retries= 0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.linger.ms=1

然后新建一个Producer类

package com.example.kafkaproducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaProducer {
    @Autowired
    KafkaTemplate kafkaTemplate;
    public void produce(){
        kafkaTemplate.send("test","hello word");
        System.out.println("发送消息");
    }
}

在test类中调用

package com.example.kafkaproducer;  
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; 
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaproducerApplicationTests { 
   @Autowired KafkaProducer kafkaProducer; 
   @Test
   public void contextLoads() {
      kafkaProducer.produce();
   } 
}

然后控制台就会打印一个莫名奇妙的错误,没有打印任何堆栈信息,大概意思只是表达了连接不上。

Exception thrown when sending a message with key='null' and payload='' to topic

telnet ip+port 是可以通的

随后发现,xshell上启动的kafka-server在报这样一个错,更详细的没有留存。

ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18

百度了一下,很可能是Linux上的kafka版本和pom中引入的spring-kafka依赖不匹配造成的,于是查看对应关系

查看kafka,发现装的是一个0.8.2.1 版本的kafka,该版本的kafka是2015年3月发布的版本,可以说是十分古老,真是不知道为什么当初要选这么老的版本。

换了几次spring-kafka的pom之后,依然在报这个问题,于是我选择换更新的kafka的包。

换了2.2.0版本kafka的包,问题得到解决。

其中consumer的创建命令和老版本的不太一样,且consumer和producer需使用相同的端口号,而不是像之前producer配置为broker的端口,consumer配置为zookeeper的端口号。

./bin/kafka-console-consumer.sh --bootstrap-server ip:9092  --topic test

且config文件夹下server.properties文件中的一些配置和之前不太一样,需要注意的是,以下两行配置原来是被注解了的,需要在这里取消掉注解,并配置自己的ip。

listeners = PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://your.host.name:9092

springboot、spring-kafka、kafka-client三者兼容性关系

spring官方描述的spring-kafka的版本和kafka-clients的版本对应关系:

官方地址:https://spring.io/projects/spring-kafka

中间列:“Spring Integration for Apache Kafka Version 可忽略不看:

也就是说spring-kafka与spring-client是存在在一对多关系的,那是不是他所有的spring-client都可以选呢?

接着往下看(摘自官网):

他说啥 ?

  • springboot 1.5 你应该用的是spring-kafka 1.3.x.
  • springboot2.0你应该使用的是spring-kafka2.0.x.
  • 如果用的是spring boot2.1.x,那么你必须使用spring-kafka的版本是2.2.x。否则就会出现noClass等等各种异常。
  • spring-kafka的版本是2.1默认使用的spring-client是1.1.x,当你要使用另外两个时,你就要使用如下的版本配置.
  • 如果你用的是2.2.x的spring-kafka,只看第一张图,你会以为2.1.x的kafka-clients也可以用。但是spring说了,此时默认用的kafka-clients是2.0.x,如果你想用2.1.x,必须看文档附录,下图的大概意思,必须换掉下图所示的所有依赖版本。

也就是说并不是一对多 他默认的还是只有一个kafka-client来给你的,你要选其他的可以的,你添加一些额外配置

例如:

 Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的默认版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0 (前面的2.11代表的是Scala的版本后面为kafka的版本号)

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java多线程中线程封闭详解

    java多线程中线程封闭详解

    在本文里我们给大家分享了关于java多线程中线程封闭的知识点内容以及用法,有需要读者们可以参考下。
    2019-07-07
  • 为Java应用创建Docker镜像的3种方式总结

    为Java应用创建Docker镜像的3种方式总结

    Docker的使用可以将应用程序做成镜像,这样可以将镜像发布到私有或者公有仓库中,在其他主机上也可以pull镜像,并且运行容器,运行程,下面这篇文章主要给大家总结介绍了关于为Java应用创建Docker镜像的3种方式,需要的朋友可以参考下
    2023-06-06
  • springdoc openapi使用解决方案

    springdoc openapi使用解决方案

    SpringDoc注解的使用,它是基于OpenAPI 3和Swagger 3的现代化解决方案,相较于旧版的Swagger2即SpringFox,SpringDoc提供了更简洁、更直观的注解方式,这篇文章主要介绍了springdoc openapi使用,需要的朋友可以参考下
    2024-04-04
  • 详解Spring AOP

    详解Spring AOP

    本文非常详细讲解了Spring AOP,本篇文章通过大量的代码,讲解了Spring AOP的使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • Spring Boot 整合 Mybatis Annotation 注解的完整 Web 案例

    Spring Boot 整合 Mybatis Annotation 注解的完整 Web 案例

    这篇文章主要介绍了Spring Boot 整合 Mybatis Annotation 注解的完整 Web 案例,需要的朋友可以参考下
    2017-05-05
  • 如何用Netty实现高效的HTTP服务器

    如何用Netty实现高效的HTTP服务器

    这篇文章主要介绍了如何用Netty实现高效的HTTP服务器,对HTTP感兴趣的同学可以参考一下
    2021-04-04
  • Java判断List中有无重复元素的方法

    Java判断List中有无重复元素的方法

    今天小编就为大家分享一篇Java判断List中有无重复元素的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • MyBatis接口绑定的实现方式和工作原理

    MyBatis接口绑定的实现方式和工作原理

    在日常开发中,数据持久层是几乎每个项目都会涉及的一个关键组成部分,MyBatis作为一个流行的持久层框架,其提供的接口绑定机制极大地简化了数据库操作,本文将通过详细的代码示例和讲解,带你深入理解MyBatis接口绑定的工作原理和实践方式,需要的朋友可以参考下
    2024-03-03
  • Java 遍历取出Map集合key-value数据的4种方法

    Java 遍历取出Map集合key-value数据的4种方法

    这篇文章主要介绍了Java 遍历取出Map集合key-value数据的4种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • MyBatis自定义TypeHandler如何解决字段映射问题

    MyBatis自定义TypeHandler如何解决字段映射问题

    这篇文章主要介绍了MyBatis自定义TypeHandler如何解决字段映射问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12

最新评论