0


Kafka入门及生产者详解

1. Kafka定义

传统定义:分布式的、基于发布/订阅模式消息队列,主要用于大数据实时处理领域。发布/订阅模式中,发布者不会直接将消息发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息。

官网最新定义:开源的分布式事件流平台(Event Streaming Platform),用于高性能数据管道、流分析、数据集成。

2. 消息队列的应用场景及模式

传统的消息队列的主要应用场景包括:缓冲/消峰、解耦、异步通信

缓冲/消峰:有助于控制和优化数据流的速度,解决生产消息和消费消息的速度不一致的问题。

解耦:允许独立修改和扩展两边的处理过程,只需确保他们遵守相同的接口约束。

此时消息队列类似于一个超市,数据源是商品生产厂商,目的地是消费者,消费者无需跟各大厂商来往,而是去超市购物。

异步通信:允许用户把一个消息放入队列,不立即处理,再在需要的时候去处理。(比如发送验证码)

消息队列的两种模式:

1)点对点:消费者主动拉取数据,消息收到后清除消息

2)发布/订阅模式:有多个Topic主题;消费者消费完数据后,不删除数据,数据仍可以被其他消费者消费;每个消费者相互独立

3. Kafka基础架构

1)一个topic可以有多个分区,broker为服务器,即一份数据分为多个分区放在多个服务器

2)数据分为多块,消费者也有多个,组成一个消费者组,组内每个成员并行消费不同的分区

3)分区也有副本,不过和HDFS的副本有区别,HDFS的副本是相等的,而Kafka里的副本只有Leader的才能起作用,Follower的副本不能消费(除非Leader挂了,Follower成为Leader)

4)ZK里保存了Kafka的服务器id信息,以及每个topic的各个分区的Leader是哪个服务器,以及isr队列

4. Kafka命令行操作快速入门

针对Kafka基础架构的三大部分,分别有不同的脚本命令来操作。

生产者:kafka-console-producer.sh;

集群:kafka-topics.sh;

消费者:kafka-console-consumer.sh

1)kafka-topics.sh的命令参数如下

创建topic,1个分区,3个副本,并查看:

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --create --partitions 1 --replication-factor 3

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --list

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe

结果如下:

修改分区数(分区数只能改大,不能改小)为3

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --partitions 3

bin/kafka-topics.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --describe

结果如下:

另外副本数也不能通过命令行修改

2) kafka-console-producer.sh

向指定分区发送数据

bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first

3) kafka-console-consumer.sh

消费者消费指定分区的数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first

再在生产端发送数据,消费者端可以收到数据,但不能收到历史数据(即生产者在消费者起来之前发送的数据),要想消费历史数据,加上参数:--from-beginning

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092 --topic first --from-beginning

5. 生产者异步发送与同步发送

添加依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

创建Kafka生产者对象:

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRP_SERVERS_CONFIG, "hadoop102:9092");

// 指定key和value的序列化类型
// StringSerializer.class.getName()相当于StringSerializer的全路径名称
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

发送数据:

// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value"));

ProducerRecord的多个构造函数:

关闭资源:

kafkaProducer.close();

发送数据也可以带回调函数,返回主题、分区等信息:

kafkaProducer.send(new ProducerRecord<>("first", "value"), new CallBack() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition());
        }
    }
});

同步发送只需在send方法之后加上get方法:

// new ProducerRecord第一个参数是topic,第二个参数是值(key为null)
kafkaProducer.send(new ProducerRecord<>("first", "value")).get();

6. 生产者分区策略

生产者的默认分区器:DefaultPatitioner,即如果指定分区,就发送到指定分区;如果没指定分区,指定了key,则将key的哈希值对分区数取模得到分区;如果也没指定key,选择粘性分区(sticky partition),即随机选取一个分区,本批次数据满了或者linger.ms时间到了,再次选择另一个分区。

自定义分区,主要是实现Partitioner接口,重写其中的partition方法:

@Overrride
public int partition(String topic, Object key, byte[] keybytes, Object value, byte[] valuebytes, Cluster cluster) {
    String valueStr = value.toString();
    if (valueStr.contains("xxx")) {
        return 0;
    } else if (valueStr.contains("zzz")) {
        return 1;
    } else {
        return 2;
    }
}

配置关联自定义分区器:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

7. 如何提高吞吐量

batch.size:每批次数据大小,数据量达到这个值,就开始发送,默认为16K

linger.ms:等待时间,如果到了这个时间,无论数据量多大,立即发送,默认为0

如果linger.ms设置为0,意味着一旦有数据来就立马发送,这样效率并不高,所以适当提高linger.ms有利于提高吞吐量,但是不能太大,这样会造成较大的数据延迟。

也可以发送数据的过程中采用数据压缩(snappy)的方式,来提高实际发送的数据量。

还可以修改缓冲区大小RecordAccumulator

设置缓冲区大小:

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32M

设置批次大小:

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16K

设置linger.ms:

properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

设置压缩格式:

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

8. 数据可靠性

生产者发送给Kakfa集群,会收到如下几种应答:

1)ack = 0:不需要等待数据落盘,可靠性最差,存在丢数风险,一般不会用这种模式

2)ack = 1:需要Leader收到数据并进行落盘,也有丢数风险,比如Leader刚应答完就挂了,还没来得及同步数据给Follower

3)ack = -1/all,需要Leader和isr队列中所有节点收到数据并进行落盘,可靠性最好,但是数据可能会重复。

所谓ISR队列,就是和Leader保持同步的Leader+Follower的集合,例如:leader:0; isr: 0,1。如果某个Follower长时间未与Leader通信,该Follower就会被踢出isr队列,这样就不会出现Leader长期等待某个故障Follower节点的问题。

ack = -1,如果分区副本数为1,或者isr队列里只有一个节点,则与ack=1效果一样,仍有丢数风险。

数据完全可靠 = (ACK = -1)+ (分区副本数 >= 2) + (ISR队列里节点数 >= 2)

代码配置:

properties.put(ProducerConfig.ACK_CONFIG, -1);
//重试次数,默认为int最大值
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

为解决ack = -1时的数据重复性问题,kafka引入了幂等性和事务的概念。所谓幂等性,就是Producer无论向broker发送多少次重复数据,broker都只会持久化一条。

精确一次(Exactly Once) = 幂等性 + 数据完全可靠

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,broker只会持久化一条。PID是每次Kafka重启时会分配一个新的,Partition表示分区号,SeqNumber是单调递增的。所以能保证单分区单次会话数据不重复。

开启幂等性,只需将enable.idempodence设为true即可(默认就是true)。

Kafka事务原理:

使用事务发送数据:

properties.put(ProducerConfig.TRANSACTION_ID_CONFIG, "01");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

kafkaProducer.initTransactions();
kafkaProducer.beginTransactions();
try {
    kafkaProducer.send(new ProducerRecord<>("first", "value"));
    kafkaProducer.commitTransactions();
} catch (Exception e) {
    kafkaProducer.abortTransactions();
} finnaly {
    kafkaProducer.close();
}

9. 数据乱序

Kafka生产者发送数据给broker,每个broker默认缓存5个请求,如果其中一个请求发送失败,不影响后面请求发送,假如失败的请求后来又重试成功了,那么broker收到的数据会是乱序的。只需将max.in.flight.requests.per.connection设置小于等于5,broker就会自动排序。

标签: kafka 分布式

本文转载自: https://blog.csdn.net/weixin_46628668/article/details/136491773
版权归原作者 weixin_46628668 所有, 如有侵权,请联系我们删除。

“Kafka入门及生产者详解”的评论:

还没有评论