文章目录
一、kafka是什么
1、Kafka 本质上是⼀个消息队列,一个高吞吐量、持久性、分布式的消息系统。
2、包含生产者(producer)和消费者(consumer),每个consumer属于一个特定的消费者组(Consumer Group)。
3、生产者生产消息(message)写入到kafka服务器(broker,kafka集群的节点),消费者从kafka服务器(broker)读取消息。
4、消息可分为不同的类型即不同的主题(topic)。
5、同一主题(topic)的消息可以分散存储到不同的服务器节点(partition)上,一个分区(partition)只能由一个消费者组内的一个消费者消费。
6、每个partition可以有多个副本,一个Leader和若干个Follower,Leader发生故障时,会选取某个Follower成为新的Leader。
二、kafka的安装
kafka集群管理依赖zookeeper的支持,kafka、zookeeper运行需要java环境。我的kafka安装在了windows wsl环境下。
1、jdk安装
1.1、https://www.oracle.com/cn/java/technologies/downloads/下载需要的jdk.
1.2、解压下载的jdk
tar -zxvf jdk-19_linux-x64_bin.tar.gz
1.3、配置jdk环境变量,/etc/profile影响所有用户,.bashrc影响当前用户。
vi /etc/profile
exportJAVA_HOME=/mnt/d/workspace/wsl/java/jdk-19.0.1
exportPATH=${JAVA_HOME}/bin:$PATH
1.4、测试jdk是否安装成功
java -version
java version "19.0.1"2022-10-18
Java(TM) SE Runtime Environment (build 19.0.1+10-21)
Java HotSpot(TM)64-Bit Server VM (build 19.0.1+10-21, mixed mode, sharing)
2、zookeeper单机安装
2.1、https://zookeeper.apache.org/releases.html zookeeper下载地址
2.2、解压zookeeper包 apache-zookeeper-3.8.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz
2.3、配置zookeeper
解压后zookeeper配置文件名称默认为zoo_sample.cfg需要修改为zoo.cfg
mv zoo_sample.cfg zoo.cfg
2.4、zookeeper启动
进入到zookeeper安装目录,输入启动命令
bin/zkServer.sh start
输出如下信息说明启动成功。
Starting zookeeper ... STARTED
2.5、查看zookeeper运行信息
输入查看命令
bin/zkServer.sh status
输出zookeeper运行信息
ZooKeeper JMX enabled by default
Using config: /mnt/d/ProgramFiles/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
2.6、zookeeper停止运行
bin/zkServer.sh stop
3、kafka单机安装
3.1、kafka包下载https://kafka.apache.org/downloads.html
3.2、解压kafka包
tar -zxvf kafka_2.12-3.3.1.tgz
3.3、kafka配置
3.3.0、kafka配置文件 config/server.properties
3.3.1、kafka数据日志目录:log.dirs=***
3.3.2、zookeeper连接地址:zookeeper.connect=localhost:2181
3.3.3、节点id集群时使用:broker.id=0
3.3.4、kafka服务监听的ip和端口:listeners=PLAINTEXT://172.24.198.152:9092。我的kafka安装在了windows wsl内,172.24.198.152是我的wsl地址可以通过如下命令查看:
命令: ip addr |grep eth0
输出: inet 172.24.198.152/20 brd .......
3.4、kafka启动
bin/kafka-server-start.sh config/server.properties
3.5、创建topic
bin/kafka-topics.sh --create --topic my_topic
3.6、kafka停止
bin/kafka-server-stop.sh
三、go连接kafka
1、go kafka安装
go get github.com/segmentio/kafka-go
2、生产者:官方github examples producer-api
package main
import("fmt""io/ioutil""log""net/http"
kafka "github.com/segmentio/kafka-go")funcproducerHandler(kafkaWriter *kafka.Writer)func(http.ResponseWriter,*http.Request){return http.HandlerFunc(func(wrt http.ResponseWriter, req *http.Request){
body, err := ioutil.ReadAll(req.Body)if err !=nil{
log.Fatalln(err)}
msg := kafka.Message{
Key:[]byte(fmt.Sprintf("address-%s", req.RemoteAddr)),
Value: body,}
err = kafkaWriter.WriteMessages(req.Context(), msg)if err !=nil{
wrt.Write([]byte(err.Error()))
log.Fatalln(err)}})}funcgetKafkaWriter(kafkaURL, topic string)*kafka.Writer {return&kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer:&kafka.LeastBytes{},}}funcmain(){// get kafka writer using environment variables.//kafkaURL := os.Getenv("kafkaURL")
kafkaURL :="172.24.198.152:9092"//topic := os.Getenv("topic")
topic :="my_topic"
kafkaWriter :=getKafkaWriter(kafkaURL, topic)defer kafkaWriter.Close()// Add handle func for producer.
http.HandleFunc("/",producerHandler(kafkaWriter))// Run the web server.
fmt.Println("start producer-api ... !!")
log.Fatal(http.ListenAndServe(":8081",nil))}
3、消费者:官方github examples consumer-logger
package main
import("context""fmt""log""strings"
kafka "github.com/segmentio/kafka-go")funcgetKafkaReader(kafkaURL, topic, groupID string)*kafka.Reader {
brokers := strings.Split(kafkaURL,",")return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes:10e3,// 10KB
MaxBytes:10e6,// 10MB})}funcmain(){// get kafka reader using environment variables.//kafkaURL := os.Getenv("kafkaURL")
kafkaURL :="172.24.198.152:9092"//topic := os.Getenv("topic")
topic :="my_topic"//groupID := os.Getenv("groupID")
groupID :=""
reader :=getKafkaReader(kafkaURL, topic, groupID)defer reader.Close()
fmt.Println("start consuming ... !!")for{
m, err := reader.ReadMessage(context.Background())if err !=nil{
log.Fatalln(err)}
fmt.Printf("message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset,string(m.Key),string(m.Value))}}
版权归原作者 星星火光 所有, 如有侵权,请联系我们删除。