Golang操作Kafka的实现示例

 更新时间:2023年02月19日 09:00:04   作者:YUHAOHAO  
本文主要介绍了Golang操作Kafka的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一.使用库说明

Golang中连接kafka可以使用第三方库:github.com/Shopify/sarama

二.Kafka Producer发送消息

package main 

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follower都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner  //写到随机分区中,我们默认设置32个分区
    config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回

    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "task"
    msg.Value = sarama.StringEncoder("producer kafka messages...")

    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config)
    if err != nil {
        fmt.Println("Producer closed, err:", err)
        return
    }
    defer client.Close()

    // 发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

三.Kafka Consumer消费消息

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil)
    if err != nil {
        fmt.Println("Failed to start consumer: %s", err)
        return
    }
    partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区
    if err != nil {
        fmt.Println("Failed to get the list of partition: ", err)
        return
    }
    fmt.Println(partitionList)

    for partition := range partitionList{ // 遍历所有的分区
        pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者
        if err != nil {
            fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
        }
        wg.Add(1)
        go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值
            for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待
                fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }
            defer pc.AsyncClose()
            wg.Done()
        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

到此这篇关于Golang操作Kafka的实现示例的文章就介绍到这了,更多相关Golang操作Kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Golang常用环境变量说明与设置详解

    Golang常用环境变量说明与设置详解

    这篇文章主要介绍了Golang常用环境变量说明与设置,需要的朋友可以参考下
    2020-02-02
  • 深入探索Golang中的SM4加密解密算法

    深入探索Golang中的SM4加密解密算法

    SM4加密算法在安全性、高效性、简单性、标准化和广泛支持等方面具有优势,适用于各种数据保护和加密应用场景,这篇文章就来和大家探索一下Golang中的SM4加密解密算法吧
    2023-06-06
  • go语言方法集为类型添加方法示例解析

    go语言方法集为类型添加方法示例解析

    这篇文章主要为大家介绍了go语言方法集以及为类型添加方法示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪
    2022-04-04
  • 解决go获取文件md5值不正确的问题

    解决go获取文件md5值不正确的问题

    本文主要介绍了解决go获取文件md5值不正确的问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-01-01
  • 在ubuntu下安装go开发环境的全过程

    在ubuntu下安装go开发环境的全过程

    Go语言是谷歌公司开发的编程语言,虽然安装和配置go很简单,但是很多初学者在第一次安装go环境时会遇到各种坑,下面这篇文章主要给大家介绍了关于在ubuntu下安装go开发环境的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2022-08-08
  • GO语言异常处理机制panic和recover分析

    GO语言异常处理机制panic和recover分析

    这篇文章主要介绍了GO语言异常处理机制panic和recover,分析了捕获运行时发生错误的方法,是非常实用的技巧,需要的朋友可以参考下
    2014-12-12
  • Go 数据结构之堆排序示例详解

    Go 数据结构之堆排序示例详解

    这篇文章主要为大家介绍了Go 数据结构之堆排序示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • GO语言基础入门第一个go程序解读

    GO语言基础入门第一个go程序解读

    这篇文章主要为大家介绍了GO语言基础入门的第一个go程序解读,下面来带大家进入Go语言世界helloworld的大门吧,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2021-11-11
  • Go语言题解LeetCode下一个更大元素示例详解

    Go语言题解LeetCode下一个更大元素示例详解

    这篇文章主要为大家介绍了Go语言题解LeetCode下一个更大元素示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • 如何基于Golang实现Kubernetes边车模式

    如何基于Golang实现Kubernetes边车模式

    本文介绍了如何基于Go实现Kubernetes Sidecar模式,并通过实际示例演示创建Golang实现的微服务服务、Docker 容器化以及在 Kubernetes 上的部署和管理,感兴趣的朋友一起看看吧
    2024-08-08

最新评论