详解C/C++如何发送与接收Kafka消息
一、背景
在实际工程中,难免会遇到不通系统之间通信,如何进行系统之间通信呢?(作为一个“全栈工程师”,必须要解决它!)。
系统之间通信方式很多如:系统之间调用(http/rpc等),异步间接调用如发送消息、公共存储等。目前,本人从事的项目中遇到web业务工程(Java)依赖与算法工程(C++) 处理的视频/图片分类与标记结果。两个系统之前数据通信采用了kafka消息方式。
算法工程为C/C++工程,本文将介绍如何在C/C++中如何发送与接收Kakfa消息(包含:Kafka的SASL认证方式),并提供了详细的源码和讲解。(至于Java中如何发送与接收Kakfa消息如有需要,可留言或私聊!)
二、环境依赖安装
# 下载librdkafka git clone https://github.com/edenhill/librdkafka.git # 编译 cd librdkafka ./configure --prefix=/usr/local # 安装 sudo make install # 验证:查看/usr/local/lib目录下是否有librdkafka文件 ls /usr/local/lib | grep kafka
三、编写kakfa生产者消费者
3.1 生产者
#include <rdkafka.h> // 包含C API头文件 #include <iostream> #include <cstring> #include <cerrno> int main() { const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址 const char *topic_name = "kafka_msg_topic_test"; const char *payload = "Hello, Kafka from librdkafka!"; size_t len = strlen(payload); // 创建配置对象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 设置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 创建生产者实例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); if (!rk) { std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 创建topic句柄(可选,但推荐) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL); if (!rkt) { std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 发送消息 int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区 int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息发送完成(可选,但推荐) // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认 int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去 msgs_sent += rd_kafka_outq_len(rk); } // 销毁topic句柄 rd_kafka_topic_destroy(rkt); // 销毁生产者实例 rd_kafka_destroy(rk); // 销毁配置对象 // rd_kafka_conf_destroy(conf); return 0; }
3.2 消费者
#include <rdkafka.h> #include <iostream> #include <cerrno> #include <cstring> #include <cstdlib> void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) { // 错误处理回调 std::cerr << "Kafka error: " << err << ": " << reason << std::endl; } int main() { std::cerr << "start " << std::endl; const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址 const char *group_id = "kafka_msg_topic_test"; // 消费者组ID const char *topic_name = "kafka_msg_topic_test"; // Kafka topic名称 // 创建配置对象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 设置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 设置消费者组ID if (rd_kafka_conf_set(conf, "group.id", group_id, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 设置错误处理回调(可选) rd_kafka_conf_set_error_cb(conf, error_cb); // 创建消费者实例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0); if (!rk) { std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 创建一个topic分区列表 rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1); if (!topics) { std::cerr << "Failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); return 1; } // 添加topic到分区列表 if (!rd_kafka_topic_partition_list_add(topics, topic_name, RD_KAFKA_PARTITION_UA)) { std::cerr << "Failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 订阅topic rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 销毁分区列表(订阅后不再需要) rd_kafka_topic_partition_list_destroy(topics); // 轮询消息 while (true) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以获取消息 if (rkmessage == NULL) { // 没有消息或者超时 continue; } if (rkmessage->err) { // 处理错误 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { // 消息流的末尾 std::cout << "End of partition event" << std::endl; } else { // 打印错误并退出 std::cerr << "Kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl; break; } } else { // 处理消息 std::cout << "Received message at offset " << rkmessage->offset << " from partition " << rkmessage->partition << " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len << " value :" <<(char *)rkmessage->payload << std::endl; // 如果需要,可以在这里处理消息内容 // 例如,使用rkmessage->payload()获取消息内容 // 释放消息 rd_kafka_message_destroy(rkmessage); } } // 清理 rd_kafka_destroy(rk); return 0; }
3.3 编译运行
3.3.1 编译生产者消费者
g++ -o send_kafka SendKakfaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka ReceiveKafkaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
3.3.2 运行验证
执行时,若出现错误: error while loading shared libraries: librdkafka++.so.1: cannot open shared object file: No such file or directory
则需要执行下面环境变量配置:
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
生产者:发送消息
消费者:接收消息
3.4 SASL认证kakfa
下面是,支持sasl认证的kakka生产者完整代码
#include <rdkafka.h> #include <iostream> #include <cstring> #include <cerrno> int main(int argc, char *argv[]) { const char *brokers = "xx.xx.xx.xx:8092"; // Kafka broker地址 const char *username = "xxx"; const char *password = "xxx"; const char *topic_name = "kafka_msg_test_sasl"; const char *payload = "Hello, Kafka from librdkafka! sasl"; size_t len = strlen(payload); // 初始化配置 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } char errstr[512]; // 声明一个足够大的字符数组来存储错误信息 // 设置SASL相关的配置 if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 检查配置是否设置成功 if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set configuration: " << errstr << std::endl; return 1; } // 创建producer实例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { std::cerr << "Failed to create new producer: " << errstr << std::endl; return 1; } // 创建topic句柄(可选,但推荐) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL); if (!rkt) { std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 发送消息 int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区 int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息发送完成(可选,但推荐) // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认 int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去 msgs_sent += rd_kafka_outq_len(rk); } // 销毁topic句柄 rd_kafka_topic_destroy(rkt); // 清理资源 rd_kafka_destroy(rk); return 0; }
在kafka map 管理界面中查看发送效果如下:
3.5 结束语
到此这篇关于详解C/C++如何发送与接收Kafka消息的文章就介绍到这了,更多相关C/C++发送与接收Kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
深入解析Radix Sort基数排序算法思想及C语言实现示例
基数排序和桶排序、计数排序共同是三种最常用的线性排序算法,这里我们就来深入解析Radix Sort基数排序算法思想及C语言实现示例,需要的朋友可以参考下2016-07-07
最新评论