python连接kafka加载数据的项目实践

 更新时间:2023年05月22日 10:46:28   作者:呼伦贝尔-钢蛋儿  
本文主要介绍了python连接kafka加载数据的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

背景:读取TXT文件,加载到kafka中,然后通过logstash消费kafka中的数据加载到es中

第一步:导入相应的依赖包

pip install kafka-python   
pip install loguru
pip install msgpack

第二步:编写连接kafka的代码

# -*- coding: utf-8 -*-
import json
import json
import msgpack
from loguru import logger
from kafka import KafkaProducer
from kafka.errors import KafkaError
def kfk_produce_1():
    """
        发送 json 格式数据
    :return:
    """
    producer = KafkaProducer(
    //连接kafka集群的配置信息
        bootstrap_servers='192.168.85.109:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    //这里是你创建topic和打算发送数据的地方
    producer.send('python_test_topic', {'key': 'value'})
kfk_produce_1()

第三步:验证是否在kafka中创建topic

kafka的消费者界面上已经出现了创建的topic,并且数据也接收到了

注意:下面的消费者界面的按钮,要先运行起来,选择好kafka环境和topic,group以后,点击那个绿色的运行按钮,就能实时看到发送过来的消息了,😄

问题记录:

然后在使用时,报错提示:ImportError: cannot import name ‘KafkaConsumer’
找了一会儿最后发现自己创建的文件名叫做:kafka.py,突然意识到问题出在哪里了。

原因:

简单说就是因为,创建的文件名是kafka.py,这会导致代码运行时,python解释器查找kafka的模块时,就找到自身kafka.py了,所以就报错。
以后写代码的时候,还是要注意,切记不要用关键字去命名文件,避免不必要的麻烦。

到此这篇关于python连接kafka加载数据的项目实践的文章就介绍到这了,更多相关python连接kafka加载数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

最新评论