0


一文教你Go语言如何轻松使用kafka

Go语言操作Kafka文档

本文档详细介绍了如何使用Go语言对Kafka进行基础操作。我们将介绍如何使用Go连接Kafka、生产消息、消费消息。以下是详细操作步骤:

1. 安装驱动

首先,使用以下命令安装Sarama,一个优秀的Kafka Go客户端库:

go get github.com/Shopify/sarama

2. 导入依赖

导入必要的依赖包:

import("fmt""github.com/Shopify/sarama""log""os""os/signal""strings""sync""time")

3. 生产者(Producer)

创建一个函数,用于连接并返回一个Kafka生产者:

funccreateProducer(brokers []string)(sarama.AsyncProducer,error){
    config := sarama.NewConfig()
    config.Producer.Return.Successes =true
    config.Producer.Timeout =5* time.Second
    return sarama.NewAsyncProducer(brokers, config)}

创建一个函数,用于发送消息到指定的Kafka主题:

funcproduceMessage(producer sarama.AsyncProducer, topic, value string){
    message :=&sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(value),}

    producer.Input()<- message
}

4. 消费者(Consumer)

创建一个函数,用于连接并返回一个Kafka消费者:

funccreateConsumer(brokers []string, groupID string)(sarama.ConsumerGroup,error){
    config := sarama.NewConfig()
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    return sarama.NewConsumerGroup(brokers, groupID, config)}

定义一个消费者组对象:

type KafkaConsumerGroupHandler struct{
    ready chanbool}func(handler *KafkaConsumerGroupHandler)Setup(_ sarama.ConsumerGroupSession)error{close(handler.ready)returnnil}func(handler *KafkaConsumerGroupHandler)Cleanup(_ sarama.ConsumerGroupSession)error{returnnil}func(handler *KafkaConsumerGroupHandler)ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{for message :=range claim.Messages(){
        fmt.Printf("消息: 主题=%s 分区=%d 偏移量=%d\n", message.Topic, message.Partition, message.Offset)
        fmt.Printf("消息内容: %s\n",string(message.Value))
        sess.MarkMessage(message,"")}returnnil}

创建一个函数,用于消费指定的Kafka主题:

funcconsumeMessages(consumer sarama.ConsumerGroup, topics []string){
    handler :=&KafkaConsumerGroupHandler{
        ready:make(chanbool),}for{
        err := consumer.Consume(context.Background(), topics, handler)if err !=nil{
            log.Printf("消费者错误: %v", err)}select{case<-handler.ready:default:return}}}

参考代码

main

函数中调用以上方法展示生产和消费操作:

funcmain(){
    brokers := strings.Split("localhost:9092",",")
    topic :="my_topic"
    groupID :="my_group"// 创建生产者
    producer, err :=createProducer(brokers)if err !=nil{
        log.Fatal("无法创建生产者:", err)}deferfunc(){if err := producer.Close(); err !=nil{
            log.Fatal("无法关闭生产者:", err)}}()// 发送消息produceMessage(producer, topic,"hello world")// 创建消费者
    consumer, err :=createConsumer(brokers, groupID)if err !=nil{
        log.Fatal("无法创建消费者:", err)}deferfunc(){if err := consumer.Close(); err !=nil{
            log.Fatal("无法关闭消费者:", err)}}()

    topics :=[]string{topic}
    wg :=&sync.WaitGroup{}
    wg.Add(1)gofunc(){defer wg.Done()consumeMessages(consumer, topics)}()// 监听退出信号
    sigterm :=make(chan os.Signal,1)
    signal.Notify(sigterm, os.Interrupt)<-sigterm

    // 优雅关闭消费者
    wg.Wait()}

当你运行上述程序时,你将首先连接到Kafka集群并创建一个生产者,然后发送一条"hello world"消息到名为 “my_topic” 的主题。接下来,程序创建一个消费者,用于消费刚刚发送的消息并在终端输出消息内容。程序运行过程中,使用Ctrl+C或发送中断信号,可以优雅终止消费者并退出程序。

标签: kafka golang 分布式

本文转载自: https://blog.csdn.net/weixin_59801183/article/details/130575127
版权归原作者 ~奔跑的简默~ 所有, 如有侵权,请联系我们删除。

“一文教你Go语言如何轻松使用kafka”的评论:

还没有评论