0


Kafka 安装、使用

Kafka安装

kafka 安装_naki_bb的博客-CSDN博客

概念

名称
**解释 **

Broker

消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群

Topic

Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定 一个topic

Producer

消息生产者,向Broker发送消息的客户端

Consumer

消息消费者,从Broker读取消息的客户端

简单使用

创建topic

  • 通过kafka命令向zk中创建一个名为test的主题
cd /opt/kafka_2.13-2.8.1/bin
#创建test主题
./kafka-topics.sh --create --zookeeper 192.168.99.100:2181 --replication-factor 1 --partitions 1 --topic test

  • 查看当前zk中所有的主题
./kafka-topics.sh --list --zookeeper 192.168.99.100:2181

注意指令中的ip和端口都是zookeeper的

发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令

行中直接输⼊内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

./kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic test

消费消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输

出,默认是消费最新的消息。使用kafka的消费者消息的客户端,从指定kafka服务器的指定

topic中消费消息

  • 方式一:从最后一条消息的偏移量+1开始消费

就是只能接受到连接之后,发送方发送的消息,连接之前的消息,接受不到

./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --topic test

可以发现连接之前的消息没有收到

  • 方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --from-beginning --topic test

经过以上两个方式的测试,可以发现:

  • **消息会被存储 **
  • **消息是顺序存储 **
  • **消息是有偏移量的 **
  • 消费时可以指明偏移量进行消费

消息的细节

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中

具体的日志目录在service.properies中配置

在日志文件目录中可以查看到topic的信息,索引,时间等信息

  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置

单播消息

在kafka的一个topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否

同时会被两个消费者消费?

  • 在没有消费组的情况下,消费者都可以收到消息,进行消费
  • 如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息,换言之,同一个消费组中只能有一个消费者收到topic中的消息
./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --consumer-property group.id=testGroup --topic test

多播消息

不同的消费组订阅同一个topic,那么每个消费组中只有个一个消费者能收到消息。

./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --consumer-property group.id=testGroup2 --topic test

下图就是描述多播和单播消息的区别

查看消费组的详细信息

# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.99.100:9092 --list

# 查看消费组中的具体信息:比如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
 ./kafka-consumer-groups.sh --bootstrap-server 192.168.99.100:9092 --describe --group testGroup

  • Currennt-offset: 当前消费组的已消费偏移量
  • Log-end-offset: 消息总量(最后一条消息的偏移量)
  • Lag: 当前消费组未消费的消息数

主题和分区的概念

1.主题topic

主题-topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被

订阅该topic的消费者消费。

但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被

保存到log日志文件中的。为了解决这个文件过大的问题,kafka提出了Partition分区的概念

2.分区partition

分区的作用:

  • 可以分布式存储
  • 可以并行写

通过partition将一个topic中的消息在不同的分区来存储。这样的好处有多个:

  • 分区存储,可以解决统一存储文件过大的问题
  • 提供了读写的吞吐量:读和写可以同时在多个分区中进行

上图中一个topic创建了3个分区。那么topic中的消息就会分别存放在这三个分区中,所有分区的消息总和才是这个topic的全部消息。

创建多分区的主题

./kafka-topics.sh --create --zookeeper 192.168.99.100:2181 --replication-factor 1 --partitions 2 --topic test1

通过--partitions 参数设置分区个数,来表示用多少个分区存储这个topic的消息

查看主题的分区消息

./kafka-topics.sh --describe --zookeeper 192.168.99.100:2181 --topic test1

消息日志文件

  • 00000.log: 这个文件中保存的就是消息
  • __consumer_offsets-49:kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放所有消费 者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区
  1. 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数
  2. 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前 offset的值
  • 文件中保存的消息,默认保存7天。七天到后消息会被删除。

kafka集群及副本的概念

kafka集群搭建

kafka 集群_naki_bb的博客-CSDN博客

副本

副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。

下面的例子:创建1个主题,2个分区、3个副本。

./kafka-topics.sh --create --zookeeper 192.168.99.100:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

--replication-factor表示副本数,只有broker的数量>副本数,才有意义

副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个

副本作为leader,其他是follower。

查看topic情况:

# 查看topic情况
./kafka-topics.sh --describe --zookeeper 192.168.99.100:2181 --topic myreplicated-topic

通过查看主题信息,其中的关键数据:

  • replicas: 当前副本存在的broker节点
  • leader:副本里的概念 ,每个partition都有1个broker作为leader。kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader
  • follwer : 接受leader的同步的数据
  • isr : 可以同步和已同步的节点会存入到isr集合中,如果isr中的节点性能较差,会被提出isr集合。

集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里。

集群消费

1.向集群发送消息:

./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic

2.从集群中消费消息(不带消费组)

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic

3.指定消费组消费消息

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

4.多分区,多消费组消费消息

  • 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的
  • partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息
  • 如果消费者挂了,那么会触发rebalance机制,会让同一个group的其他消费者来消费该分区需要消费的消息​​​​​​​

集群Controller、Rebalance、和HW

Controller

Kafka集群中broker在zookeeper中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群中的controller,负责管理整个集群中的所有分区和副本的状态:

  • 当某个分区的leader副本出现故障时,由controller控制器负责为该分区选取新的leader副本,选举的规则是从isr集合中最左边获得。
  • 当集群中有broker新增或者减少,controller会同步信息给其他broker
  • 当集群中有分区的新增和减少,controller会同步信息给其他broker

Rebalance机制

前提:消费组的消费者没有指定分区来消费

触发条件:当消费组中的消费者和分区发生变化的时候

分区分配的策略:在rebalance之前,分区怎么分配会有以下3中策略

range:根据公示计算得到每个消费者消费哪几个分区:剩余分区总数 / 消费者数量 +1

轮询 :每个消费者轮询所有分区

sticky:粘合策略,如果需要rebalance,会在之前的已分配的基础上调整,不会改变之前的分配状况。如果这个策略没有打开,那么就要进行全部的重新分配,建议开启。

HW 和 LEO

LEO是某个副本最后消息的位置,每一个部分都有自己的LEO

HW是已完成同步的位置,消息在写入broker时,且每个broker完成这条消息的同步后(更新了最新消息的LEO),HW才会变化。在这之前消费者是消费不到这条消息的,在同步完成以后,HW在更新,更新完成消费者才能消费到这条消息,这样的目的是防止消息的丢失(当leader所在的broker失效后,该消息仍然可以从新的选举的leader中获取到)

标签: kafka 大数据 MQ

本文转载自: https://blog.csdn.net/qq_33753147/article/details/127013053
版权归原作者 naki_bb 所有, 如有侵权,请联系我们删除。

“Kafka 安装、使用”的评论:

还没有评论