0


Kafka概念初识

前置知识 消息队列MQ

一.kafka基本概念大略

Apache Kafka 是一个开源的流处理平台,设计用于构建实时数据管道和流应用程序。它提供高性能的发布/订阅消息系统,允许生产者将消息发布到主题,消费者可以订阅这些主题以接收消息。Kafka 支持高吞吐量和低延迟的消息处理,同时将消息持久化存储在磁盘上,确保数据的可靠性。其分布式架构支持水平扩展,提供高可用性和容错能力,确保同一主题的消息在同一分区内保持顺序。Kafka 还提供 Kafka Streams 和 Kafka Connect 等工具,支持实时数据流处理和与其他系统的集成,广泛应用于实时数据处理、日志聚合、事件源系统等场景。

kafka 的架构

一个典型的 kafka 体系架构包括若干 Producer、若干 Consumer、以及一个 Zookeeper 集群(在2.8.0版本中移,除了 Zookeeper,通过 KRaft 进行自己的集群管理)

Producer 将消息发送到 Broker,Broker 负责将受到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

Kafka 基本概念:

  • Producer :生产者,负责将消息发送到 Broker
  • Consumer :消费者,从 Broker 接收消息
  • Consumer Group :消费者组,由多个 Consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker :可以看做一个独立的 Kafka 服务节点或 Kafka 服务实例。如果一台服务器上只部署了一个 Kafka 实例,那么我们也可以将 Broker 看做一台 Kafka 服务器。
  • Topic :一个逻辑上的概念,包含很多 Partition,同一个 Topic 下的 Partiton 的消息内容是不相同的
  • Partition :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker 上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
  • Replica :副本,同一分区的不同副本保存的是相同的消息,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  • Leader :每个分区的多个副本中的"主副本",生产者以及消费者只与 Leader 交互
  • Follower :每个分区的多个副本中的"从副本",负责实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,从 Follower 副本中重新选举新的 Leader 副本对外提供服务。

二.安装kafka(单机)

前置条件:有jdk 与zookeeper

解压/与移动

tar -zxf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 soft/kafka200

修改kafka配置

cd /opt/soft/kafka200/config

vim server.properties

配置文件讲解

#broker.id属性在kafka集群中必须要是唯⼀

broker.id=0

#kafka部署的机器ip和提供服务的端⼝号

listeners=PLAINTEXT://192.168.65.60:9092

#kafka的消息存储⽂件

log.dir=/usr/local/data/kafka-logs

#kafka连接zookeeper的地址

zookeeper.connect=192.168.65.60:2181

(server.properties)补充

  • 产生的消息存放在/usr/local/data/kafka-logs 配置过的目录下.loc , .index文件用来快速定位消息,.timeindex 用来检测否可以删除
  • broker.id=0
  • listeners=PLAINTEXT://192.168.226.131:9092 socket监听地址
  • num.partitions=1 默认的分区
  • offsets.topic.replication.factor=1 默认的副本
  • log.retention.hours=1680 删除时间
  • log.retention.check.interval.ms=300000 检测时间
  • log.segment.bytes=1073741824 文件存放大小
  • zookeeper.connect=192.168.226.131:2181 zookeeper通信端口

核心概念

broker概念详解:

Broker 是 Kafka 集群中的核心节点,负责处理消息的接收、存储和转发。它为生产者提供消息发布服务,将消息持久化到磁盘,并通过分区和副本机制确保数据的高可用性和容错性。Broker 管理着主题和分区的元数据,执行负载均衡,并通过与 ZooKeeper 的交互处理集群的协调任务,如 leader 选举和成员管理。此外,Broker 支持数据压缩和灵活的配置选项,以优化性能和资源使用。总体而言,Broker 是 Kafka 实现高吞吐量、低延迟和可扩展消息系统的关键组件。

三.基本使用

1.启动

./kafka-server-start.sh -daemon ../config/server.properties

2.创建topic

kafka-topics.sh --create --zookeeper 192.168.226.131:2181 --topic batchmag --partitions 3 --replication-factor 1

3.删除topic

kafka-topics.sh --zookeeper 192.168.226.131:2181 --delete --topic batchmag

4.查看消息队列详情(topic)

kafka-topics.sh --describe --zookeeper 192.168.226.131:2181 --topic newsdemo

5. 查看主题分区内消息数量

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.226.131:9092 --topic newsdemo

6.重置消费者组对指定topic访问偏移量

kafka-streams-application-reset.sh --zookeeper 192.168.78.143:2181 --application-id kfkgroup1 --input-topics newsdemo

7.查看当前有哪些消费者组

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

8.查看某个特定消费者组的详细信息

kafka-consumer-groups.sh --bootstrap-server hd:9092 --describe --group Group1

9.消息生产

kafka-console-producer.sh --topic newsdemo --broker-list 192.168.226.131:9092

10.消费消息

⽅式⼀:从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费

/kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test

⽅式⼆:从当前主题中的第⼀条消息开始消费

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test

**5.**关于消息的细节

  • ⽣产者将消息发送给broker,broker会将消息保存在本地的⽇志⽂件
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置

6**.**单播消息与多播消息

在⼀个kafka的topic中,启动两个消费者,⼀个⽣产者,问:⽣产者发送消息,这条消息是否

同时会被两个消费者消费?

单播:如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。换⾔

之,同⼀个消费组中只能有⼀个消费者收到⼀个topic中的消息

多播:不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也

是多个消费组中的多个消费者收到了同⼀个消息。

kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup --topic test

kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup1 --topic test
kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup2 --topic test

四.java操作kafka

1.生产者的实现

public class MyProducer {
    public static void main(String[] args) {

        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.226.131:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.ACKS_CONFIG, "1");
        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
        Scanner scanner = new Scanner(System.in);
        String content = "1";
        while (content.equals("1")){
            System.out.println("请输入返送到Kafka的内容");
            String txt = scanner.next();
            System.out.println("您输入的内容"+txt);
            ProducerRecord<String, String> record = new ProducerRecord<>("newsdemo", txt);
            producer.send(record);
            System.out.println("是否推出 1:继续 2:推出");
            content = scanner.next();
        }
    }
}

ProducerConfig的参数说名

必备参数

  • **bootstrap.servers **Kafka 集群的地址和端口,生产者用来连接到 Kafka。
  • **key.serializer: **用于将消息键序列化的类。
  • value.serializer: 用于将消息值序列化的类。
同步发送与异步发送
  • 同步发送:⽣产者同步发消息,在收到kafka的ack告知发送成功之前⼀直处于阻塞状态 // 同步发送消息 RecordMetadata metadata = producer.send(record).get();// 这里使用 get() 方法进行同步等待
  • 异步发送:⽣产者发消息,发送完后不⽤等待broker给回复,直接执⾏下⾯的业务逻辑。 // 异步发送消息 producer.send(record, (RecordMetadata metadata, Exception exception) -> { if (exception != null) { exception.printStackTrace(); } else { System.out.printf("Message sent to topic %s partition %d with offset %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } });
ack应答机制

生产者配置中一个非常重要的参数,它决定了生产者在发送消息时需要等待多少个副本的确认。这一机制直接影响消息的可靠性和吞吐量。

参数

  • 0:描述: 生产者发送消息后不等待任何确认。
  • 1:生产者发送消息后,只需要等待领导者(Leader)确认。
  • -1:生产者发送消息后,需要等待所有副本的确认。
props.put("acks", "all"); // 设置 ack 机制
消息发送缓冲区
  • kafka默认会创建⼀个消息缓冲区,⽤来存放要发送的消息,缓冲区是32m
  • kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker
  • 如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

2.消费者实现

 Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.226.131:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkagroup1");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"500");
       

        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
        //消费者订阅

        consumer.subscribe(Collections.singleton("demo1"));
        while (true){
            //
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record : records){
                long offset = record.offset();
                String topic = record.topic();
                int partition = record.partition();
                String value = record.value();
                String key = record.key();
                long timetamp = record.timestamp();
                System.out.println(topic +"--"+partition+"--"+offset+"--"+key+"--"+value+"--"+timetamp);
                consumer.commitAsync();
            }

        }
消息提交内容

无论是手动提交还是自动提交,都需要把所属的消费者组,主题,分区和偏移量提交到_consumer_offests

自动提交与手动提交(同步与异步)
1.自动提交:

消费者poll消息下来以后就会⾃动提交offset。⾃动提交会丢消息。因为消费者在消费前提交offset。

// 是否⾃动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// ⾃动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
2.手动提交:在消费后再提交offset

1)手动同步:手动同步提交是指消费者在处理完消息后,使用

commitSync()

方法提交 offset。在这种方式下,消费者会阻塞,直到 Kafka 确认 offset 已成功提交。

// 手动同步提交 offset
consumer.commitSync();
2)手动异步:手动异步提交是指消费者在处理完消息后,使用 
commitAsync()

方法提交 offset。在这种方式下,消费者不会阻塞,而是立即返回,提交的结果会通过回调函数处理。

 consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
        System.err.printf("Error committing offsets: %s%n",exception.getMessage());
                        } else {
                    System.out.printf("Offsets committed: %s%n", offsets);
                 }
           });
⻓轮询poll消息:
 //一次poll的消息数
        prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
        //如果两此poll超过30秒间隔,Kafka就会认为起消费能力弱,将其剔除消费者组,将分区分给其他消费者 --reblance
        prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);

        //检测健康状态

        //consumer给broker发送⼼跳的间隔时间 1s
        prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        //kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

如果⼀次poll到500条,就执行逻辑操作。如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s,如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环。如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,
让⼀次poll的消息条数少⼀点。

小细节:
指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); 1
消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
新消费组的消费offset规则

新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的offset+1开始消费

  • Latest:默认的,消费新消息
  • earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

五.kafka集群的controller ISR,LOD 及其他细节

controller

每个broker启动时会向zk创建⼀个临时序号节点,获得的序号最⼩的那个broker将会作为集

群中的controller,负责这么⼏件事:

  • 当集群中有⼀个副本的leader挂掉,需要在集群中选举出⼀个新的leader,选举的规是从集合中最左边获得。
  • 当集群中有分区新增或减少,controller会同步信息给其他broker
  • 当集群中有broker新增或减少,controller会同步信息给其他broker

HW

HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。LEO,LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。

另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。

考虑如下情况:

LeaderA同步了消息4,5,6,followerB同步了消息4,5,与此同时followerB被选举为leader,假如没有HW,在A重新恢复之后会做同步(makeFollower)操作,在宕机时log文件之后直接做追加操作,而假如B的LEO已经达到了A的LEO,会产生数据不一致的情况,所以使用HW来避免这种情况。

A在做同步操作的时候,先将log文件截断到之前自己的HW的位置,即3,之后再从B中拉取消息进行同步。当ISR中的个副本的LEO不一致时,如果此时leader挂掉,选举新的leader时并不是按照LEO的高低进行选举,而是按照ISR中的顺序选举。

ISR

SR 是 Kafka 中确保数据一致性和系统可靠性的关键机制。通过管理与领导者副本同步的副本,Kafka 能够实现高可用性和容错性,确保消息的可靠传输。

所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。

文件存储机制

同一个topic 上可以有多个partition,每个partition为一个目录,partition为实际物理上的概念,topic是逻辑上的概念。partition还可以再细分为segment,每个parititon相当于一个巨型文件被平均分配到多个大小相等的segment的数据文件中,这样方便old,segment的删除。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。segment文件由两部分组成”.index“文件和".log"文件,分别表示segment索引文件和,和数据文件。

那么如何从partition中通过offset查找message呢?
以上图为例,读取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。

Leader选举

一种非常常用的选举leader的方式是“少数服从多数”,Kafka并不是采用这种方式。这种模式下,如果我们有2f+1个副本,那么在commit之前必须保证有f+1个replica复制完消息,同时为了保证能正确选举出新的leader,失败的副本数不能超过f个。HDFS的HA功能也是基于“少数服从多数”的方式,但是其数据存储并不是采用这样的方式。Kafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。

标签: kafka 分布式

本文转载自: https://blog.csdn.net/2302_77073920/article/details/140524454
版权归原作者 疏影横斜 所有, 如有侵权,请联系我们删除。

“Kafka概念初识”的评论:

还没有评论