各位佬们大家好,很抱歉番薯停更了这么久,因为最近一直在闭关修炼,也就没来得及整理自己所学的东西。
今天跟新一个大家以后实际项目和应用中最常用的一种组件,消息队列Kafka,并且番薯在文末会附带kafka常见的面试题。。望大家理解。
Kafka
1.什么是Kafka?
1.1 Kafka的定义
Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
Kafka最新定义:Kafka是 一个开源的分布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
1.2 消息队列
目 前企 业中比 较常 见的 消息 队列产 品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。
在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。
1.2.1 传统消息队列的应用场景
传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。
1.3消息队列的好处
1.3.1解耦合
耦合的状态表示当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,将不会影响到当前的功能。
1.3.2 异步处理
异步处理替代了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可。
1.3.3 流量削峰
高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力。
Kafka消费模式
Kafka的消费模式主要有两种:一种是一对一的消费,也即点对点的通信,即一个发送一个接收。第二种为一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。
一对一
消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。
一对多
这种模式也称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。
1.4 Kafka的基础架构
名称说明Topic主题,可以理解为一个队列Partition分区,为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序Offset偏移量,kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafkaOffsetBroker一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topicProducer消息生产者,向kafka broker发消息的客户端Consumer消息消费者,向kafka broker取消息的客户端Consumer Group消费者组,这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。Replica副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。Leader每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。Follower每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。
2.Kafka的安装
这里内容太多博主就不一一写出了,大体步骤是安装zk在安装Kafka。
3.Kafka 生产者
3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
发送流程
3.1.2 生产者重要参数列表
参数名称描述bootstrap.servers生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop1:9092,hadoop2:9092,hadoop3:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。buffer.memory RecordAccumulator缓冲区总大小,默认 32m。batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。acks0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。retry.backoff.ms两次重试之间的时间间隔,默认是 100ms。enable.idempotence是否开启幂等性,默认 true,开启幂等性。compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。
3.2 异步发送 API
3.2.1 普通异步发送
1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
3.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
3.3 同步发送 API
3.4 生产者分区策略
3.4.1分区的原因
方便在集群中扩展:每个partition通过调整以适应它所在的机器,而一个Topic又可以有多个partition组成,因此整个集群可以适应适合的数据
可以提高并发:以Partition为单位进行读写。类似于多路。
3.4.2分区的原则
指明partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为partition的值
没有指明partition的情况下,但是存在值key,此时将key的hash值与topic的partition总数进行取余得到partition值
值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。
3.4.3分区的好处
3.4.4 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
1)需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,
不包含 atguigu,就发往 1 号分区。
2)实现步骤
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/public class MyPartitioner implements Partitioner {
/**
* 返回信息对应的分区
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/@Overridepublicintpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue =value.toString();// 创建 partitionintpartition;// 判断消息是否包含 atguiguif(msgValue.contains("atguigu")){
partition=0;
}else {
partition=1;
}
// 返回分区号returnpartition;
}
// 关闭资源@Overridepublic void close() {
}
// 配置方法@Overridepublic void configure(Map<String, ?> configs) {
}
}
(3)使用分区器的方法,在生产者的配置中添加分区器参数。
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
gu.kafka.producer.MyPartitioner");
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(properties);for(int i =0; i <5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu "+ i), new Callback() {
@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {
if(e ==null){
System.out.println(" 主题: "+ metadata.topic()+"->"+"分区:"+ metadata.partition());
}else { e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
3.4.5 如何提高Kafka的吞吐量
我们在运用Kafka的时候,确实解决了很多问题,但是我们怎么做到对Kafka的数据吞吐以及优化呢。
更改以下配置参数即可!!!
• batch.size:批次大小,默认16k
• linger.ms:等待时间,修改为5-100ms
• compression.type:压缩snappy
生产经验——生产者如何提高吞吐量
• RecordAccumulator:缓冲区大小,修改为64m
3.5 Kafka如何保证数据可靠性
发送流程
1)ack 应答原理
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,
对可靠性要求比较高的场景。
3.6 数据去重
3.6.1 数据传递语义
数据传递语义
• 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
• 最多一次(At Most Once)= ACK级别设置为0 •
总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
• 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
3.6.2 幂等性
1)幂等性原理
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其 中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。
2)如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。
3.6.3 生产者事务
1)Kafka 事务原理
说明:开启事务,必须开启幂等性。
2)Kafka 的事务一共有如下 5 个 API
// 1 初始化事务
void initTransactions();// 2 开启事务
void beginTransaction() throws ProducerFencedException;// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>offsets,
String consumerGroupId) throws
ProducerFencedException;// 4 提交事务
void commitTransaction() throws ProducerFencedException;// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
3.7 数据有序
3.8 数据乱序
1)kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
2)kafka在1.x及以后版本保证数据单分区有序,条件如下:
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,
故无论如何,都可以保证最近5个request的数据都是有序的。
4.Kafka的Broker
4.1 Kafka Broker 工作流程
4.1.1 Zookeeper 存储的 Kafka 信息
Zookeeper中存储的Kafka 信息
4.1.2 Kafka Broker 总体工作流程
4.1.3 Broker 重要参数
参数名称描述replica.lag.time.max.msISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。log.index.interval.bytes默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。log.retention.hoursKafka 中数据保存的时间,默认 7 天。log.retention.minutesKafka 中数据保存的时间,分钟级别,默认关闭。log.retention.msKafka 中数据保存的时间,毫秒级别,默认关闭。log.retention.check.interval.ms检查数据是否保存超时的间隔,默认是 5 分钟。log.retention.bytes默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。log.cleanup.policy默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3num.network.threads默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。log.flush.interval.ms每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。
ps:内容太多俺分两章更新。。。
感谢大家的支持,有建议可以私聊博主撒。
版权归原作者 红糖番薯 所有, 如有侵权,请联系我们删除。