环境的搭建
Kafka以及相关组件的下载
我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:
Windows下安装Kafka(图文记录详细步骤)
sarama包的安装
今天我们所时机的内容需要用到go语言的第三方包
sarama
,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载
sarama v1.19.0
,所以这里我们不能直接使用
go get'
命令来安装第三方包,我们要使用/
go mod
文件来实现,下面是主要步骤:
- 在项目中创建文件夹(博主的是Kafkademo)
- 打开终端,输入
go mod init
,进行go.mod
文件的初始化: - 我们在.mod文件内指定第三方包及其版本:
module Kafkademo
require (
github.com/Shopify/sarama v1.19)go1.21.6
其实这是已经可以使用命令
go mod tidy
了,但是博主在做的时候发现,这样会直接清除掉
.mod
文件里面的内容,所以建议先创建一个producer文件,在文件里面写:
package main
import("fmt""github.com/Shopify/sarama")funcmain(){
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
}
这时候再打开终端输入
go mod tidy
等待命令运行完毕,打开
.mod
文件,看到如下内容就OK了:
利用sarama向Kafka发送消息(消息的生产)
代码
package main
import("fmt""github.com/Shopify/sarama")funcmain(){
config := sarama.NewConfig()//创建config实例
config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
config.Producer.Return.Successes =true//成功交付的消息将在success channel返回//创建信息
msg :=&sarama.ProducerMessage{}
msg.Topic ="web.log"
msg.Value = sarama.StringEncoder("this is a test log")//连接KafKa
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)if err !=nil{
fmt.Println("producer closed, err:", err)return}defer client.Close()//发送消息
pid, offset, err := client.SendMessage(msg)if err !=nil{
fmt.Println("send msg failed,err:", err)return}
fmt.Printf("pid:%v offset:%v\n", pid, offset)}
运行过程
- 首先我们打开终端开起ZooKepper服务
zkServer
- 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
最后运行程序即可,输出结果为:
补充:消息的消费
代码
package main
import("fmt""github.com/Shopify/sarama""time")funcmain(){
customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"},nil)if err !=nil{
fmt.Println("failed init customer,err:", err)return}
partitionlist, err := customer.Partitions("web.log-0")//获取topic的所有分区if err !=nil{
fmt.Println("failed get partition list,err:", err)return}
fmt.Println("partitions:", partitionlist)for partition :=range partitionlist {// 遍历所有分区//根据消费者对象创建一个分区对象
pc, err := customer.ConsumePartition("web.log",int32(partition), sarama.OffsetNewest)if err !=nil{
fmt.Println("failed get partition consumer,err:", err)return}defer pc.Close()// 移动到这里gofunc(consumer sarama.PartitionConsumer){defer pc.AsyncClose()// 移除这行,因为已经在循环结束时关闭了for msg :=range pc.Messages(){
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}(pc)
time.Sleep(time.Second *10)}}
不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。
版权归原作者 落雨便归尘 所有, 如有侵权,请联系我们删除。