Kafka之kafka-topics.sh的使用解读

 更新时间:2023年03月07日 10:16:36   作者:Lion Long  
这篇文章主要介绍了Kafka之kafka-topics.sh的使用解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

一、kafka的基本操作

1.1、创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

  • –create 是创建主题的的动作指令。
  • –zookeeper 指定kafka所连接的zookeeper服务地址。
  • –replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。
  • –partitions 指定分区个数;多通道,类似车道。
  • –topic 指定所要创建主题的名称,比如test。

成功则显示:

Created topic "test".

1.2、查看topic

sh kafka-topics.sh --list --zookeeper localhost:2181

显示:

test

1.3、查看topic属性

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

显示:

Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

1.4、发送消息

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

发送端输入:

>hello
>where are you
>let's go

1.5、消费消息

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
--from-beginning

消费端显示:

hello
where are you
let's go
^CProcessed a total of 3 messages

二、kafka-topics.sh 使用方式

创建、修改、删除以及查看等功能。

2.1、查看帮助

/bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使用 --help 参数来打印列出其所需的参数信息。

$ sh kafka-topics.sh --help
Command must include exactly one action: --list, --describe, --create, --alter or --delete
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
                                           configurations:                      
                                             cleanup.policy                        
                                             compression.type                      
                                             delete.retention.ms                   
                                             file.delete.delay.ms                  
                                             flush.messages                        
                                             flush.ms                              
                                             follower.replication.throttled.       
                                           replicas                             
                                             index.interval.bytes                  
                                             leader.replication.throttled.replicas 
                                             max.message.bytes                     
                                             message.downconversion.enable         
                                             message.format.version                
                                             message.timestamp.difference.max.ms   
                                             message.timestamp.type                
                                             min.cleanable.dirty.ratio             
                                             min.compaction.lag.ms                 
                                             min.insync.replicas                   
                                             preallocate                           
                                             retention.bytes                       
                                             retention.ms                          
                                             segment.bytes                         
                                             segment.index.bytes                   
                                             segment.jitter.ms                     
                                             segment.ms                            
                                             unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs.        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option).                    
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting       
                                           topics, the action will only execute 
                                           if the topic exists                  
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist         
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected                     
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or       
                                           describe. Can also accept a regular  
                                           expression except for --create option
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--zookeeper <String: hosts>              REQUIRED: The connection string for    
                                           the zookeeper connection in the form 
                                           host:port. Multiple hosts can be     
                                           given to allow fail-over.         

2.2、副本数量规则

副本数量不能大于broker的数量。

kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1

报错:

Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
 (kafka.admin.TopicCommand$)

注意:副本数量和分区数量的区别。

2.3、创建主题

创建主题时候,有3个参数是必填的:

  • –partitions(分区数量)、
  • –topic(主题名) 、
  • –replication-factor(复制系数),

同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

返回显示:

Created topic "test1".

另外在创建主题的时候,还可以附加以下两个选项:–if-not-exists 和 --if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。

2.4、查看broker上所有的主题

–list。

sh kafka-topics.sh --list --zookeeper localhost:2181

结果显示:

__consumer_offsets
test
test1

2.5、查看指定主题 topic 的详细信息

–describe。

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1 

结果显示:

Topic:test1    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

2.6、修改主题信息之增加主题分区数量

–alter。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2

结果显示:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

查看主题信息:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

可以看到已经成功的将主题的分区数量从1修改为了2。

Topic:test1    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0

当去修改一个不存在的topic信息时(比如修改主题 test2,当前这主题是不存在的)。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2

会报错:

Error while executing topic command : Topic test2 does not exist on ZK path localhost:2181
[2022-11-24 14:21:33,564] ERROR java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181
    at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)

注意:不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题 topic, 然后重新创建。

2.7、删除主题

–delete。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将不会有任何作用。

Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

可以测试一些:

# 一个终端启动生产者:
sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1

# 另一个终端启动消费者:
sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning

发现此时还是可以发送消息和接收消息。如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。

然后就重启kafka:

# 停止:
sh kafka-server-stop.sh -daemon ../config/server.properties
# 启动:
sh kafka-server-start.sh -daemon ../config/server.properties

再次删除就可以了。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot 图形验证码的生成和校验

    SpringBoot 图形验证码的生成和校验

    随着系统和业务的不停升级,前后端代码放在一起的项目越来越臃肿,已经无法快速迭代和职责区分了,于是纷纷投入了前后端分离的怀抱,发现代码和职责分离以后,开发效率越来越高了,但是以前的验证码登录方案就要更改了。本文来看一下SpringBoot 图形验证码的生成和校验
    2021-05-05
  • JWT原理与java操作jwt验证详解

    JWT原理与java操作jwt验证详解

    这篇文章主要介绍了JWT原理与java操作jwt验证,详细分析了JWT的基本概念、原理与java基于JWT进行token验证的相关操作技巧,需要的朋友可以参考下
    2023-06-06
  • SpringBoot 配置文件中配置的中文,程序读取出来是乱码的解决

    SpringBoot 配置文件中配置的中文,程序读取出来是乱码的解决

    这篇文章主要介绍了SpringBoot 配置文件中配置的中文,程序读取出来是乱码的解决,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • jdk自带定时器使用方法详解

    jdk自带定时器使用方法详解

    这篇文章主要为大家详细介绍了jdk自带定时器的使用方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-06-06
  • 灵活控制任务执行时间的Cron表达式范例

    灵活控制任务执行时间的Cron表达式范例

    这篇文章主要为大家介绍了灵活控制任务执行时间的Cron表达式范例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • java基于jcifs.smb实现远程发送文件到服务器

    java基于jcifs.smb实现远程发送文件到服务器

    这篇文章主要介绍了java基于jcifs.smb实现远程发送文件到服务器,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01
  • SpringBoot整合rockerMQ消息队列详解

    SpringBoot整合rockerMQ消息队列详解

    今天和大家一起深入生产级别消息中间件 - RocketMQ 的内核实现,来看看真正落地能支撑万亿级消息容量、低延迟的消息队列到底是如何设计的。我会先介绍整体的架构设计,然后再深入各核心模块的详细设计、核心流程的剖析
    2022-07-07
  • springboot 实战:异常与重定向问题

    springboot 实战:异常与重定向问题

    这篇文章主要介绍了springboot实战:异常与重定向问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • Springboot读取配置文件及自定义配置文件的方法

    Springboot读取配置文件及自定义配置文件的方法

    这篇文章主要介绍了Springboot读取配置文件及自定义配置文件的方法,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-12-12
  • Java编程—在测试中考虑多态

    Java编程—在测试中考虑多态

    这篇文章主要介绍了Java编程—在测试中考虑多态,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11

最新评论