1、Kafka的安装
1、下载与安装Kafka
Kafka官网https://Kafka.apache.org/downloads
所以这里推荐的版本是 : https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
将下载下来的安装包直接解压到一个路径下即可完成Kafka的安装,这里统一将Kafka安装到/usr/local目录下
基本操作过程如下:
mkdir-p /www/kuangstudy
cd /www/kuangstudy
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
tar-zxvf kafka_2.12-2.7.2.tgz -C /usr/local/
mv /usr/local/kafka_2.12-2.7.2 /usr/local/kafka
#新建存放日志和数据的文件夹mkdir /usr/local/kafka/logs
这里我们将Kafka安装到了/usr/local目录下。
2、配置Kafka
这里将Kafka安装到/usr/local目录下
因此,Kafka的主配置文件为/usr/local/Kafka/config/server.properties,这里以节点Kafkazk1为例,重点介绍一些常用配置项的含义:
broker.id=1listeners=PLAINTEXT://127.0.0.1:9092
num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/usr/local/Kafka/logs
num.partitions=6num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181
#不是集群,所以可以写成localhost#zookeeper.connect=127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181zookeeper.connection.timeout.ms=18000group.initial.rebalance.delay.ms=0auto.create.topics.enable=true
delete.topic.enable=true
每个配置项含义如下:
broker.id
:每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。listeners
:设置Kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址,这里将监听地址设置为IP地址。log.dirs
:这个参数用于配置Kafka保存数据的位置,Kafka中所有的消息都会存在这个目录下。可以通过逗号来指定多个路径, Kafka会根据最少被使用的原则选择目录分配新的parition。需要注意的是,Kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的 parition的个数多小而定。num.partitions
:这个参数用于设置新创建的topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能。这里配置6个。log.retention.hours
:这个参数用于配置Kafka中消息保存的时间,还支持log.retention.minutes和 log.retention.ms配置项。这三个参数都会控制删除过期数据的时间,推荐使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。log.segment.bytes
:配置partition中每个segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的segment file。zookeeper.connect
:这个参数用于指定zookeeper所在的地址,它存储了broker的元信息。 这个值可以通过逗号设置多个值,每个值的格式均为:hostname:port/path,每个部分的含义如下:- hostname:表示zookeeper服务器的主机名或者IP地址,这里设置为IP地址。- port: 表示是zookeeper服务器监听连接的端口号。- /path:表示Kafka在zookeeper上的根目录。如果不设置,会使用根目录。auto.create.topics.enable
:这个参数用于设置是否自动创建topic,如果请求一个topic时发现还没有创建, Kafka会在broker上自动创建一个topic,如果需要严格的控制topic的创建,那么可以设置auto.create.topics.enable为false,禁止自动创建topic。delete.topic.enable
:在0.8.2版本之后,Kafka提供了删除topic的功能,但是默认并不会直接将topic数据物理删除。如果要从物理上删除(即删除topic后,数据文件也会一同删除),就需要设置此配置项为true。
3、添加环境变量
$ vim /etc/profile
exportkafka_HOME=/usr/local/kafka
exportPATH=$PATH:$kafka_HOME/bin
#生效
$ source /etc/profile
zookeeper服务的启动
cd /usr/local/kafka/bin
# 占用启动
./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &# 后台启动nohup ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
4、Kafka启动脚本
$ vim /usr/lib/systemd/system/kafka.service
[Unit]Description=Apache kafka server (broker)After=network.target zookeeper.service
[Service]Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]WantedBy=multi-user.target
systemctl daemon-reload
5、启动Kafka
在启动Kafka集群前,需要确保ZooKeeper集群已经正常启动。接着,依次在Kafka各个节点上执行如下命令即可:
$ cd /usr/local/kafka
$ nohup bin/kafka-server-start.sh config/server.properties &# 或者
$ systemctl start kafka
$ jps
21840 kafka
15593 Jps
15789 QuorumPeerMain
这里将Kafka放到后台运行,启动后,会在启动Kafka的当前目录下生成一个nohup.out文件,可通过此文件查看Kafka的启动和运行状态。通过jps指令,可以看到有个Kafka标识,这是Kafka进程成功启动的标志。
6、测试Kafka基本命令操作
kefka提供了多个命令用于查看、创建、修改、删除topic信息,也可以通过命令测试如何生产消息、消费消息等,这些命令位于Kafka安装目录的bin目录下,这里是/usr/local/Kafka/bin。
登录任意一台Kafka集群节点,切换到此目录下,即可进行命令操作。
下面列举Kafka的一些常用命令的使用方法。
(1)显示topic列表
#kafka-topics.sh --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --list
$ kafka-topics.sh --zookeeper127.0.0.1:2181 --list
topic123
(2)创建一个topic,并指定topic属性(副本数、分区数等)
#kafka-topics.sh --create --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --replication-factor 1 --partitions 3 --topic topic123
$ kafka-topics.sh --create--zookeeper127.0.0.1:2181 --replication-factor 1--partitions3--topic topic123
Created topic topic123.
#--replication-factor表示指定副本的个数
(3)查看某个topic的状态
#kafka-topics.sh --describe --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
$ kafka-topics.sh --describe--zookeeper127.0.0.1:2181 --topic topic123
Topic: topic123 PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: topic123 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic123 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: topic123 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
(4)生产消息 阻塞状态
#kafka-console-producer.sh --broker-list 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic123
(5)消费消息 阻塞状态
#kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123
#从头开始消费消息#Kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123 --from-beginning
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 --from-beginning
(6)删除topic
#kafka-topics.sh --delete --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
$ kafka-topics.sh --delete--zookeeper127.0.0.1:2181 --topic topic_
2、GO整合Kafka实现消息发送和订阅
4.1 消息生产代码示例
package main
import("fmt""github.com/IBM/sarama")funcmain(){// 配置生产者信息
conf := sarama.NewConfig()
conf.Producer.RequiredAcks = sarama.WaitForAll // 生产者等待所有分区副本成功提交消息
conf.Producer.Return.Successes =true// 成功消息写入返回
client, err := sarama.NewSyncProducer([]string{"47.115.230.36:9092"}, conf)ifnil!= err {
fmt.Println("create Kafka sync producer failed", err)return}defer client.Close()
msg :=&sarama.ProducerMessage{
Topic:"topic123",// 指定消息主题
Value: sarama.StringEncoder("hello world"),// 构造消息}// 发送消息_,_, err = client.SendMessage(msg)ifnil!= err {
fmt.Println("send message to Kafka failed", err)return}
fmt.Println("send message success")}
4.2 消息消费代码示例
package main
import("fmt""github.com/IBM/sarama")/**
* @desc 生产者
* @author feige
* @date 2023-11-15
* @version 1.0
*/funcmain(){// 创建一个消费者
consumer, err := sarama.NewConsumer([]string{"47.115.230.36:9092"},nil)if err !=nil{
fmt.Println("消费者kafka连接服务失败,失败的原因:", err)return}// 从topic123这个主题去获取消息
partitions, err := consumer.Partitions("topic123")if err !=nil{
fmt.Println("主题获取失败,失败的原因:", err)return}
fmt.Println(partitions)// 开始遍历分区中的消息,开始进行消费for_, partition :=range partitions {
pc, err := consumer.ConsumePartition("topic123",int32(partition), sarama.OffsetNewest)if err !=nil{
fmt.Println("分区数据获取失败,失败的原因:", err)return}defer pc.AsyncClose()// 开始异步获取消息gofunc(sarama.PartitionConsumer){for message :=range pc.Messages(){
fmt.Printf("当前消费的分区是:%d,offset:%d,key:%v,消息的内容是:%v", message.Partition,
message.Offset, message.Key,string(message.Value))
fmt.Println("")}}(pc)}// 阻塞让消费一直处于监听状态select{}}
4.3 创建主题代码示例
package main
import("fmt""github.com/Shopify/sarama")funcCreateTopic(addrs []string, topic string)bool{
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0 // 设置客户端版本
config.Admin.Timeout =3* time.Second // 设置Admin请求超时时间
admin, err := sarama.NewClusterAdmin(addrs, config)if err!=nil{returnfalse}defer admin.Close()
err = admin.CreateTopic(topic,&sarama.TopicDetail{NumPartitions:3, ReplicationFactor:2},false)if err ==nil{
fmt.Println("success create topic:", topic)}else{
fmt.Println("failed create topic:", topic)}return err ==nil}
4.4 性能测试结果
Kafka目前已经成为云计算领域中的“事件驱动”架构、微服务架构中的主要消息队列,随着越来越多的公司和组织开始采用Kafka作为基础消息队列技术,越来越多的性能测试报告也陆续出来。笔者提前做了一轮性能测试,并发现它的消费性能比其它消息队列还要好,甚至更好些。下面是测试结果:
测试环境:
- 操作系统:Ubuntu 16.04
- CPU:Intel® Xeon® Gold 6148 CPU @ 2.40GHz
- 内存:128G DDR4 ECC
- Kafka集群:3节点,每节点配置6个CPU、32G内存、SSD
- 测试用例:生产者每秒钟发送2万条消息,消费者每秒钟消费100条消息。
测试结果:
Kafka消费者
每秒消费100条消息,平均耗时:67毫秒
每秒消费1000条消息,平均耗时:6.7毫秒
RabbitMQ消费者
每秒消费100条消息,平均耗时:1038毫秒
每秒消费1000条消息,平均耗时:10.38毫秒
3、参考
github.com/Shopify/sarama
github.com/bsm/sarama-cluster
生产者
import("fmt""math/rand""os""strconv""strings""time""github.com/Shopify/sarama""github.com/golang/glog")//同步生产者
func Produce(){
config :=sarama.NewConfig()config.Producer.RequiredAcks=sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner=sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes=true
msg :=&sarama.ProducerMessage{}msg.Topic= `test0`
msg.Value=sarama.StringEncoder("Hello World!")
client, err :=sarama.NewSyncProducer([]string{"Kafka_master:9092"}, config)if err != nil {fmt.Println("producer close err, ", err)return}
defer client.Close()
pid, offset, err :=client.SendMessage(msg)if err != nil {fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}//异步生产者
func AsyncProducer(){var topics ="test0"
config :=sarama.NewConfig()config.Producer.Return.Successes=true//必须有这个选项config.Producer.Timeout=5*time.Second
p, err :=sarama.NewAsyncProducer(strings.Split("Kafka_master:9092",","), config)
defer p.Close()if err != nil {return}//这个部分一定要写,不然通道会被堵塞
go func(p sarama.AsyncProducer){
errors :=p.Errors()
success :=p.Successes()for{
select {case err :=<-errors:if err != nil {glog.Errorln(err)}case<-success:}}}(p)for{
v :="async: "+strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))fmt.Fprintln(os.Stdout, v)
msg :=&sarama.ProducerMessage{Topic: topics,Value:sarama.ByteEncoder(v),}p.Input()<- msg
time.Sleep(time.Second*1)}}
消费者
package consumer
import("fmt""strings""sync""time""github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster""github.com/golang/glog")//单个消费者funcConsumer(){var wg sync.WaitGroup
consumer, err := sarama.NewConsumer([]string{"Kafka_master:9092"},nil)if err !=nil{
fmt.Println("Failed to start consumer: %s", err)return}
partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区if err !=nil{
fmt.Println("Failed to get the list of partition:, ", err)return}for partition :=range partitionList {
pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)if err !=nil{
fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}
wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值for msg :=range pc.Messages(){//阻塞直到有值发送过来,然后再继续等待
fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))}defer pc.AsyncClose()
wg.Done()}(pc)}
wg.Wait()}//消费组funcConsumerGroup(){
groupID :="test-consumer-group"
config := cluster.NewConfig()
config.Group.Return.Notifications =true
config.Consumer.Offsets.CommitInterval =1* time.Second
config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始
c, err := cluster.NewConsumer(strings.Split("Kafka_master:9092",","), groupID, strings.Split("test0",","), config)if err !=nil{
glog.Errorf("Failed open consumer: %v", err)return}defer c.Close()gofunc(c *cluster.Consumer){
errors := c.Errors()
noti := c.Notifications()for{select{case err :=<-errors:
glog.Errorln(err)case<-noti:}}}(c)for msg :=range c.Messages(){
fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))
c.MarkOffset(msg,"")//MarkOffset 并不是实时写入Kafka,有可能在程序crash时丢掉未提交的offset}}
主函数
package main
import("strom-huang-go/go_Kafka/consumer")funcmain(){// produce.AsyncProducer()
consumer.Consumer()}
版权归原作者 我不是你的梦 所有, 如有侵权,请联系我们删除。