Kafka消费客户端协调器GroupCoordinator详解

 更新时间:2022年10月18日 10:21:27   作者:石臻臻的杂货铺  
这篇文章主要为大家介绍了Kafka消费客户端协调器GroupCoordinator使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

协调器的生命周期

  • 什么是协调器
  • 协调器工作原理
  • 协调器的Rebalance机制

GroupCoordinator的创建

在Kafka启动的时候, 会自动创建并启动GroupCoordinator

这个GroupCoordinator对象创建的时候传入的几个属性需要介绍一下

    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)

offsetConfig相关配置

  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
    maxMetadataSize = config.offsetMetadataMaxSize,
    loadBufferSize = config.offsetsLoadBufferSize,
    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
  )
属性介绍默认值
offset.metadata.max.bytes  
offsets.load.buffer.size  
offsets.retention.minutes  
offsets.retention.check.interval.ms  
offsets.topic.num.partitions  
offsets.commit.timeout.ms  
offsets.topic.segment.bytes  
offsets.topic.replication.factor  
offsets.topic.compression.codec  
offsets.commit.timeout.ms  
offsets.commit.required.acks  

groupConfig相关配置

属性介绍默认值
group.min.session.timeout.ms  
group.max.session.timeout.ms  
group.initial.rebalance.delay.ms  
group.max.size  
group.initial.rebalance.delay.ms  

groupMetadataManager

组元信息管理类

heartbeatPurgatory

心跳监测操作,每一秒执行一次

joinPurgatory

GroupCoordinator的启动

  def startup(enableMetadataExpiration: Boolean = true): Unit = {
    info("Starting up.")
    groupManager.startup(enableMetadataExpiration)
    isActive.set(true)
    info("Startup complete.")
  }

这个启动对于GroupCoordinator来说只是给属性isActive标记为了true, 但是同时呢也调用了GroupMetadataManager.startup

定时清理Group元信息

这个Group元信息管理类呢启动了一个定时任务, 名字为:delete-expired-group-metadata

每隔600000ms的时候就执行一下 清理过期组元信息的操作, 这个600000ms时间是代码写死的。

TODO:GroupMetadataManager#cleanupGroupMetadata

GroupCoordinator OnElection

当内部topic __consumer_offsets 有分区的Leader变更的时候,比如触发了 LeaderAndIsr的请求, 发现分区Leader进行了切换。

那么就会执行 GroupCoordinator#OnElection 的接口, 这个接口会把任务丢个一个单线程的调度程序, 专门处理offset元数据缓存加载和卸载的。线程名称前缀为group-metadata-manager- ,一个分区一个任务

最终执行的任务内容是:GroupMetadataManager#doLoadGroupsAndOffsets

__consumer_offsets 的key有两种消息类型

key version 0: 消费组消费偏移量信息 -> value version 0: [offset, metadata, timestamp]

key version 1: 消费组消费偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]

key version 2: 消费组的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

例如 version:3 的schemaForGroupValue

Version-0

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-1

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-2

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-3

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		group_instance_id: NULLABLE_STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Value每个版本的 Scheme如下

  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))

GroupCoordinator onResignation

以上就是Kafka消费客户端协调器GroupCoordinator详解的详细内容,更多关于Kafka GroupCoordinator的资料请关注脚本之家其它相关文章!

相关文章

  • 通过Java视角简单谈谈局部性原理

    通过Java视角简单谈谈局部性原理

    程序的局部性原理是指程序在执行时呈现出局部性规律,即在一段时间内,整个程序的执行仅限于程序中的某一部分,这篇文章主要给大家介绍了关于通过Java视角简单谈谈局部性原理的相关资料,需要的朋友可以参考下
    2021-07-07
  • 使用SpringBoot进行身份验证和授权的示例详解

    使用SpringBoot进行身份验证和授权的示例详解

    在广阔的 Web 开发世界中,身份验证是每个数字领域的守护者,在本教程中,我们将了解如何以本机方式保护、验证和授权 Spring-Boot 应用程序的用户,并遵循框架的良好实践,希望对大家有所帮助
    2023-11-11
  • IntelliJ IDEA 安装及初次使用图文教程(2020.3.2社区版)

    IntelliJ IDEA 安装及初次使用图文教程(2020.3.2社区版)

    这篇文章主要介绍了IntelliJ IDEA 安装及初次使用(2020.3.2社区版),本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • JPA中EntityListeners注解的使用详解

    JPA中EntityListeners注解的使用详解

    这篇文章主要介绍了JPA中EntityListeners注解的使用详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-01-01
  • idea引入外部jar包的方法实现

    idea引入外部jar包的方法实现

    本文主要介绍了idea引入外部jar包的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • Struts2学习教程之输入校验示例详解

    Struts2学习教程之输入校验示例详解

    这篇文章主要给大家介绍了关于Struts2学习教程之输入校验的相关资料,文中通过示例介绍的非常详细,对大家学习或者使用struts2具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-05-05
  • java程序员必须知道的4个书写代码技巧

    java程序员必须知道的4个书写代码技巧

    本篇文章主要给大家讲述了作为JAVA程序员如何能写出高效的代码以及运行效率更高的代码,一起学习分享下吧。
    2017-12-12
  • Java实现并发执行定时任务并手动控制开始结束

    Java实现并发执行定时任务并手动控制开始结束

    这篇文章主要介绍了Java实现并发执行定时任务并手动控制开始结束,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • Ubuntu快速安装eclipse

    Ubuntu快速安装eclipse

    这篇文章主要为大家详细介绍了Ubuntu快速安装eclipse的简单教程,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-04-04
  • 使用Mybatis-plus实现对数据库表的内部字段进行比较

    使用Mybatis-plus实现对数据库表的内部字段进行比较

    这篇文章主要介绍了使用Mybatis-plus实现对数据库表的内部字段进行比较方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07

最新评论