kafka监听问题的解决和剖析

 更新时间:2020年12月03日 11:31:42   作者:知晓汝名,吓吾一跳!  
这篇文章主要给大家介绍了关于kafka监听问题的解决和剖析的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

问题如下:

  1. kafka为什么监听不到数据
  2. kafka为什么会有重复数据发送
  3. kafka数据重复如何解决
  4. 为什么kafka会出现俩个消费端都可以消费问题
  5. kafka监听配置文件 

一. 解决问题一(kafka监听不到数据)

  首先kafka监听不得到数据,检查如下

  • 检查配置文件是否正确(可能会出现改了监听地址,监听Topic,监听的地址的数量问题)
  • 检查接收数据的正确性(比如原生的代码,可能是用byte序列化接收的数据,而你接收使用String。也是配置文件序列化问题,还有与发送者商量问题)
  • 检查kafka版本问题(一般的版本其实是没什么问题的,只有个别版本会出现监听不到问题)
  • 没有加
    @Component    犯了最不应该出差错的问题

  如果出现监听不到数据的问题,那么就试试更改方法一二,如果不可以在去试试方法三,之前出现这个问题也是查过 一般查到都会说  “低版本的服务器接收不到高版本的生产者发送的消息”,但是净由测试使用 用1.0.5RELEASE 和 2.6.3反复测试,并没有任何的问题。

如果按照版本一致,那么根本就不现实,因为可能不同的项目,springboot版本不一致的话,可能有的springboot版本低,那么你还得要求自己维护项目版本升级?如果出现第四种情况就无话可说了。

二. 解决问题二(kafka为什么会有重复数据发送)

  重复数据的发送问题如下

  1. 可能在发送者的那里的事务问题。mysql存储事务发生异常导致回滚操作,但是kafka消息却是已经发送到了服务器中。此事肯定会出现重复问题
  2. 生产者设置时间问题,生产发送设置的时间内,消息没完成发送,生产者以为消费者挂掉,便重新发送一个,导致重复
  3. offset问题,当项目重启,offset走到某一个位置已扔到kafka服务器中,但是项目被重启.那么offset会是在原本重启的那一个点的地方再次发送一次,这是kafka设计的问题,防止出现丢失数据问题

三. 解决问题三(kafka数据重复如何解决)

  目前我是使用的Redis进行的排重法,用的是Redis中的set,保证里面不存在重复,保证Redis里面不会存入太多的脏数据。并定期清理

  粘贴一下我的排重(Redis排重法)

//kafka prefix
  String cache = "kafka_cache";
  //kafka suffix
  Calendar c = Calendar.getInstance();
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  //0点,目前是为了设置为这一天的固定时间。这个完全可以去写个工具类自己弄,为了看的更清楚,麻烦了一点的写入
  SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
  String gtimeStart = sdf2.format(c.getTime());
  long time = sdf.parse(gtimeStart).getTime();


  //此位置为了设置是否是新的一天,新的一天需要设置定时时间,保证redis中不会存储太多无用数据
  Boolean flag = false;
  //数据接收
  Set<String> range = new HashSet<>();
  //判断是否存在
  if (redisTemplate.hasKey(cache + time)) {
  //存在则取出这个set
  range = redisTemplate.opsForSet().members(cache + time);
  }else {
  //不存在,则为下面过期时间的设置铺垫
  flag = true;
  }
  //判断监听到的数据是否是重复
  if (range.contains("测试需要")) {
  //重复则排出,根据逻辑自己修改
  continue;
  } else {
  //添加进去
  redisTemplate.opsForSet().add(cache + time, i+"");
  if (flag){
   //设置为24小时,保证新一天使用,之前使用的存储会消失掉
   redisTemplate.expire(cache + time,24,TimeUnit.HOURS);
   //不会在进入这个里面,如果多次的存入过期时间,那么这个key的过期时间就永远是24小时,一直就不会过期
   flag = false;
  }
  }

四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)

  原因是因为在不同groupId之下,kafka接收到以后,会给监听他的每一个组发送一个他所收到的消息,但是两个消费端监听同一个租,那么就只有一个消费端可以消费到。

五. 粘一下我的监听配置文件

# 指定kafka 代理地址,可以多个,用逗号间隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id= test
# 是否自动提交
spring.kafka.consumer.enable-auto-commit= true
# 提交间隔的毫秒
spring.kafka.consumer.auto-commit-interval.ms=60000
# 最大轮询的次数
spring.kafka.consumer.max-poll-records=1
# 将偏移量重置为最新偏移量
spring.kafka.consumer.auto-offset-reset=earliest
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

总结

到此这篇关于kafka监听问题的解决和剖析的文章就介绍到这了,更多相关kafka监听问题内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:

相关文章

  • Mybatis动态SQL的实现示例

    Mybatis动态SQL的实现示例

    这篇文章主要介绍了Mybatis动态SQL的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • JavaWeb实现文件的上传与下载

    JavaWeb实现文件的上传与下载

    这篇文章主要为大家详细介绍了JavaWeb实现文件的上传与下载,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-04-04
  • 详解Java 集合系列(三)—— LinkedList

    详解Java 集合系列(三)—— LinkedList

    这篇文章主要介绍了Java 集合系列(三)—— LinkedList,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • java中SPI服务提供者的接口使用总结

    java中SPI服务提供者的接口使用总结

    SPI英文全称为Service Provider Interface,顾名思义,服务提供者接口,它是jdk提供给“服务提供厂商”或者“插件开发者”使用的接口
    2022-02-02
  • Java中List add添加不同类型元素的讲解

    Java中List add添加不同类型元素的讲解

    今天小编就为大家分享一篇关于java的List add不同类型的对象,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • spring boot实现profiles动态切换的示例

    spring boot实现profiles动态切换的示例

    Spring Boot支持在不同的环境下使用不同的配置文件,该技术非常有利于持续集成,在构建项目的时候只需要使用不同的构建命令就可以生成不同运行环境下war包,而不需要手动切换配置文件。
    2020-10-10
  • Mockito mock Kotlin Object类方法报错解决方法

    Mockito mock Kotlin Object类方法报错解决方法

    这篇文章主要介绍了Mockito mock Kotlin Object类方法报错解决方法,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-09-09
  • 修改jar包package目录结构操作方法

    修改jar包package目录结构操作方法

    这篇文章主要介绍了修改jar包package目录结构操作方法,本文给大家介绍的非常详细,具有一定的参考借鉴价值 ,需要的朋友可以参考下
    2019-07-07
  • Java实现利用图片或视频生成GIF并发送微信

    Java实现利用图片或视频生成GIF并发送微信

    这篇文章主要为大家详细介绍了Java语言如何利用图片或视频实现生成GIF并发送微信的功能,文中的示例代码讲解详细,感兴趣的小伙伴可以尝试一下
    2022-11-11
  • java8中的List<String>转List<Integer>的实例代码

    java8中的List<String>转List<Integer>的实例代码

    这篇文章主要介绍了java8中的List<String>转List<Integer>,转换list列表String到列表Intger,java8提供了stream很好的进行操作,本文通过示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-07-07

最新评论