0


51.Go操作kafka示例(kafka-go库)

文章目录

代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go

一、简介

之前已经介绍过一个操作kafka的go库了,28.windows安装kafka,Go操作kafka示例(sarama库) ,但是这个库比较老了,当前比较流行的库是

github.com/segmentio/kafka-go

,所以本次我们就使用一下它。

我们在

GitHub

直接输入

kafka

并带上

language

标签为

Go

时,可以可以看到当前

get github.com/segmentio/kafka-go

库是最流行的。
在这里插入图片描述

首先启动kafka的服务器,然后在项目中

go get github.com/segmentio/kafka-go

接着我们就可以创建生产者和消费者了,

注意:在实际工作中,一般是一个服务为生产者,另一个服务作为消费者,但是本案例中不涉及微服务,就是演示一下生成和消费的示例代码,因此写到了一个服务当中。

代码文件组织如下:
在这里插入图片描述
user.go :用于测试发送和消费结构体字符串消息

package model

type User struct{
    Id       int64`json:"id"`
    UserName string`json:"user_name"`
    Age      int64`json:"age"`}

二、生产者

启动

zookeeper

kafka

,并创建名为

test

topic

,步骤可以参考:28.windows安装kafka,Go操作kafka示例(sarama库)

producer.go

package producer

import("context""encoding/json""fmt""golang-trick/31-kafka-go/model""time""github.com/segmentio/kafka-go")var(
    topic    ="user"
    Producer *kafka.Writer
)funcinit(){
    Producer =&kafka.Writer{
        Addr:                   kafka.TCP("localhost:9092"),//TCP函数参数为不定长参数,可以传多个地址组成集群
        Topic:                  topic,
        Balancer:&kafka.Hash{},// 用于对key进行hash,决定消息发送到哪个分区
        MaxAttempts:0,
        WriteBackoffMin:0,
        WriteBackoffMax:0,
        BatchSize:0,
        BatchBytes:0,
        BatchTimeout:0,
        ReadTimeout:0,
        WriteTimeout:           time.Second,// kafka有时候可能负载很高,写不进去,那么超时后可以放弃写入,用于可以丢消息的场景
        RequiredAcks:           kafka.RequireNone,// 不需要任何节点确认就返回
        Async:false,
        Completion:nil,
        Compression:0,
        Logger:nil,
        ErrorLogger:nil,
        Transport:nil,
        AllowAutoTopicCreation:false,// 第一次发消息的时候,如果topic不存在,就自动创建topic,工作中禁止使用}}// 生产消息,发送user信息funcSendMessage(ctx context.Context, user *model.User){
    msgContent, err := json.Marshal(user)if err !=nil{
        fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err))}
    msg := kafka.Message{
        Topic:"",
        Partition:0,
        Offset:0,
        HighWaterMark:0,
        Key:[]byte(fmt.Sprintf("%d", user.Id)),
        Value:         msgContent,
        Headers:nil,
        WriterData:nil,
        Time:          time.Time{},}

    err = Producer.WriteMessages(ctx, msg)if err !=nil{
        fmt.Println(fmt.Sprintf("写入kafka失败,user:%v,err:%v", user, err))}}
main.go

: 测试消息发送

package main

import("context""fmt""golang-trick/31-kafka-go/model""golang-trick/31-kafka-go/producer")funcmain(){
    ctx := context.Background()for i :=0; i <5; i++{
        user :=&model.User{
            Id:int64(i +1),
            UserName: fmt.Sprintf("lym:%d", i),
            Age:18,}
        producer.SendMessage(ctx, user)}
    producer.Producer.Close()// 消息发送完毕后,关闭生产者}

可以看到五条消息都发送成功
在这里插入图片描述

三、消费者

consumer.go

package consumer

import("context""encoding/json""fmt""golang-trick/24-gin-learning/class08/model""time""github.com/segmentio/kafka-go")var(
    topic    ="user"
    Consumer *kafka.Reader
)funcinit(){
    Consumer = kafka.NewReader(kafka.ReaderConfig{
        Brokers:[]string{"localhost:9092"},// broker地址 数组
        GroupID:"test",// 消费者组id,每个消费者组可以消费kafka的完整数据,但是同一个消费者组中的消费者根据设置的分区消费策略共同消费kafka中的数据
        GroupTopics:nil,
        Topic:                  topic,// 消费哪个topic
        Partition:0,
        Dialer:nil,
        QueueCapacity:0,
        MinBytes:0,
        MaxBytes:0,
        MaxWait:0,
        ReadBatchTimeout:0,
        ReadLagInterval:0,
        GroupBalancers:nil,
        HeartbeatInterval:0,
        CommitInterval:         time.Second,// offset 上报间隔
        PartitionWatchInterval:0,
        WatchPartitionChanges:false,
        SessionTimeout:0,
        RebalanceTimeout:0,
        JoinGroupBackoff:0,
        RetentionTime:0,
        StartOffset:            kafka.FirstOffset,// 仅对新创建的消费者组生效,从头开始消费,工作中可能更常用从最新的开始消费kafka.LastOffset
        ReadBackoffMin:0,
        ReadBackoffMax:0,
        Logger:nil,
        ErrorLogger:nil,
        IsolationLevel:0,
        MaxAttempts:0,
        OffsetOutOfRangeError:false,})}// 消费消息funcReadMessage(ctx context.Context){// 消费者应该通过协程一直开着,一直消费for{if msg, err := Consumer.ReadMessage(ctx); err !=nil{
            fmt.Println(fmt.Sprintf("读kafka失败,err:%v", err))break// 当前消息读取失败时,并不退出for终止所有后续消费,而是跳过该消息即可}else{
            user :=&model.User{}
            err := json.Unmarshal(msg.Value, user)if err !=nil{
                fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err))break// 当前消息处理失败时,并不退出for终止所有后续消费,而是跳过该消息即可}

            fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user))}}}

main.go: 测试接收消息

package main

import("context""fmt""golang-trick/31-kafka-go/consumer""os""os/signal""syscall")// 需要监听信息2和15,在程序退出时,关闭ConsumerfunclistenSignal(){
    c :=make(chan os.Signal,1)
    signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
    sig :=<-c
    fmt.Printf("收到信号 %s ", sig.String())if consumer.Consumer !=nil{
        consumer.Consumer.Close()}
    os.Exit(0)}funcmain(){
    ctx := context.Background()//for i := 0; i < 5; i++ {//    user := &model.User{//        Id:       int64(i + 1),//        UserName: fmt.Sprintf("lym:%d", i),//        Age:      18,//    }//    producer.SendMessage(ctx, user)//}//producer.Producer.Close()go consumer.ReadMessage(ctx)listenSignal()}

启动后,因为我们设置的从头开始消费,所以原有的五条消息消费成功,然后在等待着队列中有消息时继续消费
在这里插入图片描述
我们可以通过

kafka

客户端发两条消息,看看我们的消费者程序是否能消费到

在这里插入图片描述
最后关闭服务停止消费
在这里插入图片描述


本文转载自: https://blog.csdn.net/YouMing_Li/article/details/134818855
版权归原作者 百里守约学编程 所有, 如有侵权,请联系我们删除。

“51.Go操作kafka示例(kafka-go库)”的评论:

还没有评论