结合线程池实现apache kafka消费者组的误区及解决方法

 更新时间:2022年07月07日 11:03:27   作者:字母哥哥  
这篇文章主要介绍了结合线程池实现apache kafka消费者组的误区及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

一个错误:多线程使用单一消费者

下图显现了一种错误的使用KafkaConsumer的方法

  • 创建多个线程用来消费kafka数据
  • 多线程使用同一个KafkaConsumer对象
  • 在单线程中使用这个KafkaConsumer对象,完成数据拉取、处理、提交偏移量。

在这里插入图片描述

这种方式之所以错误的原因是:KafkaConsumer是线程不安全的,可能出现把同一批数据既给线程A处理,也交给线程B处理重复消费的问题。

一个误区:多线程就是消费者组

下图中体现的是一种正常的KafkaConsumer使用方式

  • 使用一个KafkaConsumer拉取数据
  • 拉取数据后将一个批次的数据交给一个线程去处理

在这里插入图片描述

这个处理方式不是错误,但是他只是一个消费者在消费kafka消息队列中的数据,不是消费者组的方式消费数据。无法充分利用kafka分区提升消息处理的吞吐量。

常规正确做法:使用线程池实现消费者组

下面的方法是常规的正确实现方式

在这里插入图片描述

  • 因为KafkaConsumer是线程不安全的,所以不能跨线程使用KafkaConsumer
  • 每个线程持有一个KafkaConsumer对象
  • 多个线程的实现可以使用线程池,线程池的线程数量等于消费者组内消费者的数量
public class MyConsumerGroup {
    public void groupConsumer(){
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i++) {
            MyConsumer myConsumer = new MyConsumer();
            executorService.execute(myConsumer);
        }
    }
}

MyConsumer方法需要实现Runnable接口,并在run方法中调用MyConsumer#pollData。MyConsumer的代码参考本专栏的《消费者Java实现》( 集成apache kafka-clients实现数据消费者)

@Override
public void run() {
    MyConsumer myConsumer = new MyConsumer();
    myConsumer.pollData();
}

到此这篇关于结合线程池实现apache kafka消费者组的文章就介绍到这了,更多相关apache kafka消费者组内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java和 javaw 及 javaws的区别解析

    java和 javaw 及 javaws的区别解析

    这篇文章主要介绍了java和 javaw 及 javaws的区别解析,本文通过实例给大家详细介绍,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • 深入剖析Java工厂模式让你的代码更灵活

    深入剖析Java工厂模式让你的代码更灵活

    Java工厂模式是一种创建对象的设计模式,它可以在不暴露对象创建逻辑的情况下,将对象的创建委托给子类或者其他对象。本文就来深入剖析一下Java工厂模式是如何让你的代码更灵活、可扩展、易维护的
    2023-05-05
  • Java使用非覆盖的方法实现替换PDF中的文本

    Java使用非覆盖的方法实现替换PDF中的文本

    这篇文章主要为大家详细介绍了Java如何使用非覆盖的方法实现替换PDF中的文本,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-02-02
  • Java遍历Map的方法汇总

    Java遍历Map的方法汇总

    大家平时在使用Java开发时,经常会遇到遍历Map对象的问题,本文就给大家介绍几种Java遍历Map对象的方法,并简单分析一下每种方法的效率,需要的朋友可以参考下
    2023-12-12
  • Java SSM框架(Spring+SpringMVC+MyBatis)搭建过程

    Java SSM框架(Spring+SpringMVC+MyBatis)搭建过程

    最近一段时间搭建了ssm环境,并测试了几个小项目,下面小编通过图文并茂的形式给大家分享Java SSM框架(Spring+SpringMVC+MyBatis)搭建过程,需要的朋友参考下吧
    2017-11-11
  • Java中垃圾回收器GC对吞吐量的影响测试

    Java中垃圾回收器GC对吞吐量的影响测试

    这篇文章主要介绍了Java中垃圾回收器GC对吞吐量的影响测试,本文算是一个对垃圾回收器GC的优化文章,需要的朋友可以参考下
    2014-09-09
  • Java及Android中常用链式调用写法简单示例

    Java及Android中常用链式调用写法简单示例

    这篇文章主要介绍了Java及Android中常用链式调用写法,结合实例形式分析了java编程中的链式调用概念、简单使用方法及相关操作技巧,需要的朋友可以参考下
    2018-01-01
  • Spring如何使用PropertyPlaceholderConfigurer读取文件

    Spring如何使用PropertyPlaceholderConfigurer读取文件

    这篇文章主要介绍了Spring如何使用PropertyPlaceholderConfigurer读取文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • Java程序的初始化顺序,static{}静态代码块和实例语句块的使用方式

    Java程序的初始化顺序,static{}静态代码块和实例语句块的使用方式

    这篇文章主要介绍了Java程序的初始化顺序,static{}静态代码块和实例语句块的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • java 算法之冒泡排序实例详解

    java 算法之冒泡排序实例详解

    这篇文章主要介绍了java 算法之冒泡排序实例详解的相关资料,冒泡排序,就是模仿泡泡从水中浮起跑到水面的过程需要的朋友可以参考下
    2017-07-07

最新评论