python 实时获取kafka消费队列信息示例详解
更新时间:2023年07月24日 08:40:03 作者:xiaoming0018
这篇文章主要介绍了python实时获取kafka消费队列信息,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
安装 pykafka
pip install pykafka
一、消费kafka消息
#!/usr/bin/env python # -*- coding: utf-8 -*- from pykafka import KafkaClient from pykafka.common import OffsetType from vpn_data_handler import handler_data bootstrap_servers = '10.*.**.**:9092' group_id = 'test1' class KConsumer(object): """kafka 消费者; 动态传参,非配置文件传入; kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中; """ _encode = "UTF-8" def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None): """ 初始化kafka的消费者; 1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值) 2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数; Args: topics: str; kafka 的消费主题; bootstrap_server: list; kafka 的消费者地址; group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id; """ if bootstrap_server is None: bootstrap_server = bootstrap_servers self.client = KafkaClient(hosts=bootstrap_server) # 选择要消费的topic vpn_topic = self.client.topics[topics] self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id, consumer_timeout_ms=200, auto_commit_enable=True,# 自动提交偏移量 auto_offset_reset=OffsetType.LATEST) #LATEST 获取当前偏移量最新消息 EARLIEST从头开始获取信息 def recv(self): """ 接收消费中的数据 Returns: """ return self.consumer def main(): """ kafka消费队列入口 :param topic: :return: """ obj = KConsumer(topics="topics_name") while True: for message in obj.recv(): data = eval(message.value.decode('utf-8')) handler_data(data) if __name__ == '__main__': main()
二、生产者推送消息
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts="10.XX0.XX0.XX4:9092") # 可接受多个client # 查看所有的topic # print(client.topics) topic = client.topics['test_78'] # 选择一个topic message = "test message2 test message2" with topic.get_sync_producer() as producer: producer.produce(bytes(message, encoding='utf8')) #python3需要编码 print(message)
到此这篇关于python 实时获取kafka消费队列信息的文章就介绍到这了,更多相关python kafka消费队列信息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Python matplotlib如何删除subplots中多余的空白子图
这篇文章主要介绍了Python matplotlib如何删除subplots中多余的空白子图问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-05-05Python实现数据库与Excel文件之间的数据自动化导入与导出
数据库和Excel文件是两种常见且重要的数据存储方式,本文将介绍如何使用Python有效地实现数据库与Excel文件之间的数据自动化导入与导出,以SQLite数据库为例,需要的朋友可以参考下2024-06-06Pycharm中SQL语句提示SQL Dialect is Not Config
这篇文章主要介绍了Pycharm中SQL语句提示SQL Dialect is Not Configured的解决方案,具有很好的参考价值,希望对大家有所帮助。2022-07-07Python subprocess.Popen 实时输出 stdout的解决方法(正确管道写法)
这篇文章主要介绍了Python subprocess.Popen实时输出stdout正确管道写法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2023-07-07
最新评论