0


Go语言中使用kafka

1.Windows环境下安装zookeeper和kafka

Windows环境下安装zookeeper和kafka

2.运行zookeeper

3.运行kaka

4.生产者

import("encoding/json""github.com/Shopify/sarama""strconv")type Product struct{
    Id    int
    Name  string
    Title string}funcNewProduct()error{
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max =10
    config.Producer.Return.Successes =true
    brokers :=[]string{"localhost:9092"}
    producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{return err
    }
    p :=&Product{
        Id:1,
        Name:"钻戒",
        Title:"那戒指的质地似乎是钻石制成的吧,闪闪发光又不失内敛,清雅又不失高贵,阳光洒下来,发出淡淡的光,和淡淡的清香,有着像是通了灵般的仙气",}

    key := sarama.StringEncoder(strconv.Itoa(p.Id))
    value, err := json.Marshal(p)if err !=nil{return err
    }
    msg :=&sarama.ProducerMessage{
        Topic:"new-products",
        Key:   key,
        Value: sarama.ByteEncoder(value),}
    producer.Input()<- msg
    returnnil}

5.消费者

import("encoding/json""fmt""github.com/Shopify/sarama""log")funcConsume()error{// 初始化 Kafka 消费者
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max =10
    config.Producer.Return.Successes =true
    brokers :=[]string{"localhost:9092"}
    consumer, err := sarama.NewConsumer(brokers, config)
    partitionConsumer, err := consumer.ConsumePartition("newProduct",0, sarama.OffsetNewest)if err !=nil{
        log.Printf("Error consuming partition: %v", err)return err
    }for{select{case msg :=<-partitionConsumer.Messages():var product Product
            err = json.Unmarshal(msg.Value,&product)if err !=nil{
                log.Printf("Error unmarshaling product: %v", err)return err
            }else{
                fmt.Printf("New product: %+v\n", product)}case err =<-partitionConsumer.Errors():
            log.Printf("Error consuming message: %v", err)return err
        }}}

6.main函数

import("fmt""golang_test/kafka_test/kafka""log""sync")var wg sync.WaitGroup

funcmain(){
    wg.Add(2)gofunc(){defer wg.Done()if err := kafka.NewProduct(); err !=nil{
            log.Println("kafka生产者运行失败")return}}()gofunc(){defer wg.Done()if err := kafka.Consume(); err !=nil{
            log.Println("kafka生产者运行失败")return}}()
    wg.Wait()
    fmt.Println("运行结束")}

本文转载自: https://blog.csdn.net/qq_51537858/article/details/132010709
版权归原作者 终生成长者 所有, 如有侵权,请联系我们删除。

“Go语言中使用kafka”的评论:

还没有评论