1.Windows环境下安装zookeeper和kafka
Windows环境下安装zookeeper和kafka
2.运行zookeeper
3.运行kaka
4.生产者
import("encoding/json""github.com/Shopify/sarama""strconv")type Product struct{
Id int
Name string
Title string}funcNewProduct()error{
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max =10
config.Producer.Return.Successes =true
brokers :=[]string{"localhost:9092"}
producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{return err
}
p :=&Product{
Id:1,
Name:"钻戒",
Title:"那戒指的质地似乎是钻石制成的吧,闪闪发光又不失内敛,清雅又不失高贵,阳光洒下来,发出淡淡的光,和淡淡的清香,有着像是通了灵般的仙气",}
key := sarama.StringEncoder(strconv.Itoa(p.Id))
value, err := json.Marshal(p)if err !=nil{return err
}
msg :=&sarama.ProducerMessage{
Topic:"new-products",
Key: key,
Value: sarama.ByteEncoder(value),}
producer.Input()<- msg
returnnil}
5.消费者
import("encoding/json""fmt""github.com/Shopify/sarama""log")funcConsume()error{// 初始化 Kafka 消费者
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max =10
config.Producer.Return.Successes =true
brokers :=[]string{"localhost:9092"}
consumer, err := sarama.NewConsumer(brokers, config)
partitionConsumer, err := consumer.ConsumePartition("newProduct",0, sarama.OffsetNewest)if err !=nil{
log.Printf("Error consuming partition: %v", err)return err
}for{select{case msg :=<-partitionConsumer.Messages():var product Product
err = json.Unmarshal(msg.Value,&product)if err !=nil{
log.Printf("Error unmarshaling product: %v", err)return err
}else{
fmt.Printf("New product: %+v\n", product)}case err =<-partitionConsumer.Errors():
log.Printf("Error consuming message: %v", err)return err
}}}
6.main函数
import("fmt""golang_test/kafka_test/kafka""log""sync")var wg sync.WaitGroup
funcmain(){
wg.Add(2)gofunc(){defer wg.Done()if err := kafka.NewProduct(); err !=nil{
log.Println("kafka生产者运行失败")return}}()gofunc(){defer wg.Done()if err := kafka.Consume(); err !=nil{
log.Println("kafka生产者运行失败")return}}()
wg.Wait()
fmt.Println("运行结束")}
本文转载自: https://blog.csdn.net/qq_51537858/article/details/132010709
版权归原作者 终生成长者 所有, 如有侵权,请联系我们删除。
版权归原作者 终生成长者 所有, 如有侵权,请联系我们删除。