主流的消息队列有:ActiveMQ、RabbitMQ、RocketMQ、Kafka;主要应用场景是解耦、异步、消峰;可以参考链接
0. 主要参考:
https://www.bilibili.com/video/BV1vr4y1677k
1. Kafka基础架构组成:
主要由生产者(组)、Broke、消费者(组)、Topic主题、Partition分区 、Replica副本(Leader副本、Follower副本)
【架构组成】:
- Broker:一台 Kafka 服务器就是一个 broker,Kafka集群就会有多个 broker
- Producer:消息生产者,就是向 broker 发消息的客户端
- Consumer:消息消费者,就是向 broker 取消息的客户端
- Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic
- Partition:一个 Topic 可以分为多个 partition分区,每个 partition 是一个有序的队列,可以分布到多个 broker上
- Replica:每个分区有多个副本,每个分区都有一个Leader副本和若干Follower副本 `
【消费者组-主题关系】:
- 消费者组由多个消费者组成,每一个消费者订阅的同一个Topic
- 一个消费者可以消费多个分区但一个分区只能被一个消费者组的消费者消费,此外消费者组之间互不影响; `
【Topic主题-Partition分区-Replica副本关系】:
- 一个Topic有多个Partition,每个分区有多个副本,如上图,一个TopicA有2个分区,三个副本,其中Broke1的分区0是主副本,其他的为从副本
2. Kafka的一些操作命令:
2.1.【启动】
- sh kafka-server-start.sh -daemon …/config/server.properties ·
2.2.【创建Topic】
- bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first ·选项说明: – topic 定义 topic 名 –replication-factor 定义副本 –partitions 定义分区数 ·
2.3.【启动生产者】
- bin/kafka-console-producer.sh --bootstrap-server 192.169.182.128:9092 --topic first ·
2.4.【启动消费者者消费主题】
- bin/kafka-console-consumer.sh --bootstrap-server 192.169.182.128:9092 --topic first ·
2.5.【启动消费者者消费主题,包括历史数据】
- bin/kafka-console-consumer.sh --bootstrap-server 192.169.182.128:9092 --from-beginning --topic first ·
2.6. 【查看当前服务器中的所有 Topic】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --list `
2.7. 【查看 first 主题的详情】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --describe --topic first ·
2.8.【修改分区数(注意:分区数只能增加,不能减少)】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --alter --topic first --partitions 3 ·
2.9. 【删除 topic】
- bin/kafka-topics.sh --bootstrap-server 192.169.182.128:9092 --delete --topic first
3. Kafka 生产者消息发送流程:
【发送原理】
- 1)生产者实例Producer调用send方法发送消息
- 2)先经过一层拦截器处理,在经过kafka的序列化器兑key和value进行序列化,然后经过分区器选择消息发送的分区
- 3)消息会先发到RecordAccumulator缓存区,默认32M,由生产者配置buffer.memory 设置
- ** 3.1)首先根据分区,创建队列Deque<ProducerBatch>,然后将消息追到队列批次ProducerBatch**
- ** 3.2)批次大小默认16K,由生产者配置batch.size设置**
- 4)队列批次已满或者经过linger.ms等待时间(默认0ms),就唤醒sender线程进行消息发送到broker
- ** 4.1)会先判断RecordAccumulator缓存区是否满足发送条件:达到batch.size或linger.ms**
- ** 4.2)发送时会先过滤出可发送的broke节点,然后进行封装成ClientRequest请求,通过selector发送到broker**** 4.3)发送时broker可以缓存最近的5个请求**
- 5)请求发送到集群后,Kafka集群会返回对应的acks应答,发送成功则区移除缓存区的队列批次
- ** 5.1)如果失败并允许重试(配置retries重试次数),则会进行重试发送;不管最终成功还是失败都要移除队列批次**
4. Kafka 的ack机制:
** Kafka 的ack机制是指生产者的消息发送确认机制,有0,1,-1可选值,不同的值会影响消息发送的吞吐量和可靠性**
- 通过acks进行配置properties.put(ProducerConfig.ACKS_CONFIG,“1”);
- 0:生产者发送过来的数据,不需要等数据落盘应答,可靠性差(消息没落盘,leader挂了,消息丢失),但效率高。
- 1:生产者发送过来的数据,Leader收到数据落盘后应答,可靠性和效率都适中
- -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答,可靠性高,效率低。
- 一般acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用对可靠性要求比较高的场景,比如转账。
5. Kafka 生产者消息发送模式(同步/异步):
【普通异步发送】:
- 异步:消息发送不需要等broke落盘确认,便可以继续发送,数据会一直先发到RecordAccumulator缓存区;
- kafkaProducer.send(new ProducerRecord<>(“first”,“aaaa”));
- ·
【普通异步回调发送】:
- 异步回调可以知道生产者发送异步消息有没有异常,如果有异常exception不为null,可以进行再处理
【同步发送】:
- 同步:消息发送需要等broke落盘确认,才可以继续发送,再确认之前,下一批数据不会发到RecordAccumulator缓存区;
- kafkaProducer.send(new ProducerRecord<>(“first”,“aaaa”)).get();
6. Kafka发送消息的分区策略:
kafkaProducer.send(new ProducerRecord<>(“first”, 1,“”,“hello”)
7. Kafka消息发送可靠性保证(消息不丢失):
- 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 但会存在数据重复落盘的情况:即消息都落盘后,leader挂了,然后选举新的leader,消息重试,重复落盘
8. Kafka消息发送去重:
Kafka默认是开启幂等性的,即enable.idempotence 默认为 true
8.1. 幂等性:
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
- 重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。
- 比如上述ack=-1,消息重试时,SeqNumber是一致的,就不能落盘
8.2. 生产者事务:
【事务原理】:
【发送代码】:
·
9. Kafka的有序消息保证:
【有序】:在同一个分区是有序的,但在多个分区是无法保证有序的
·
【有序消息保证】:
- 如果要保证某一类业务有序,可以通过自定义分区器(实现Partitioner接口),根据某一唯一key,进行发送到指定broker分区
- 如果不区分业务,要保证强有序,只能使用一个分区;但性能会大大降低
·
【重试乱序问题】:
- 往同一分区发送abcdef六个消息,生产者配置是重试(retries),比如c异常重试,理论上会造成乱序abdefc;
- 【解决】:开启幂等性(enable.idempotence=true),broke缓存请求max.in.flight.requests.per.connection需要设置小于等于5
- 【原理】:缓存请求5个时:ab无异常,正常broke数据落盘;c异常重试,def正常发送,由于幂等性,def还在内存中并不会落盘,等c重试时,在broke重新排序,最后cdef落盘
- 如果c一直重试失败,最后被异常,那是不是cdef都发送失败??
10. Kafka提高生产者吞吐量:
可以增大缓冲区、批次大小,或者稍微调大批次等待时间linger.ms,以及对批次消息压缩
// 缓冲区大小 ,默认32M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小,默认16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms,默认0ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩,默认不压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
版权归原作者 做猪呢,最重要的是开森啦 所有, 如有侵权,请联系我们删除。