Kafka3.x 学习
📈 锲而舍之,朽木不折;锲而不舍,金石可镂。 —— 荀况
观看b站千锋教育Kafka教学视频:https://www.bilibili.com/video/BV1Xy4y1G7zA?p=1&vd_source=f5ed15a716a0d2394ab18fcc53eac495
3.20
1、消息队列的两种模式
1)点对点模式
2)发布/订阅模式
2、Kafka 基础架构
3、Kafka 安装
3.1 安装部署
3.1.1 集群规划
hadoop102hadoop103hadoop103zkzkzkkafkakafkakafka
2、集群部署
Kafka下载
3.21
1、单播消息
启动 Kafka 服务器
bin/kafka-server-start.sh config/kraft/server.properties
创建 topic=test
bin/kafka-topics.sh --create--topictest --bootstrap-server localhost:9092
生产者
bin/kafka-console-producer.sh --topictest --bootstrap-server localhost:9092
消费者组 group.id=testGroup1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topictest
同一组同一主题的消费者只有一个能收到消息
2、多播消息
消费者组 group.id=testGroup1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topictest
消费者组 group.id=testGroup2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topictest
不同组同一主题的消费者都能接受消息
3、查看消费组及信息
# 查看当前主题下有哪些消费者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list#查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe--group testGroup1
4、主题和分区的概念
4.1 主题 topic
主题 topic 可以理解成一个类别的名称
4.2 分区 partition
创建分区
bin/kafka-topic.sh --create --bootstrap-server localhost:9092 --partitions2--topictest
查看 topic 的分区信息
bin/kafka-topic.sh --describe --bootstrap-server localhost:9092 --topictest
3.22
1、搭建 kafka 集群(三个 broker)
# 进入bin目录启动三台服务器
./kafka-server-start.sh -daemon../config/server.properties
./kafka-server-start.sh -daemon../config/server1.properties
./kafka-server-start.sh -daemon../config/server2.properties
基于 kraft 搭建集群,修改 kraft 下的 server.properities
配置 server1,server2 其余两个节点配置除了
node.id
不同,其他配置都相同。
去 zookeeper 检查是否启动成功或者直接查询进程
# 启动zookeeper客户端
./zkCli.sh
# 查看brokersls /brokers/ids
ps-aux|grep server.properties
ps-aux|grep server1.properties
ps-aux|grep server2.properties
2、副本的概念
副本是对分区的备份。在集群中,不同的副本会被部署在不同的 broker 上。下面例子:创建 1 个主题, 2 个分区、 3 个副本。
# 查看所有topic
./kafka-topics.sh --bootstrap-server 192.168.163.130:9092 --list# 创建 1个主题, 2 个分区、 3 个副本
./kafka-topics.sh --create --bootstrap-server 192.168.163.130:9092 --replication-factor 3--partitions2--topic my-replicated-topic
#查看特定topic# 查看topic情况
./kafka-topics.sh --describe --bootstrap-server 192.168.163.130:9092 --topic my-replicated-topic
isr: 可以同步的 broker 节点和已同步的 broker 节点,存放在 isr 集合中。
3.broker、主题、分区、副本
- kafka 集群中由多个 broker 组成
- 一个 broker 中存放一个 topic 的不同 partition——副本
4.kafka 集群消息的发送
./kafka-console-producer.sh --broker-list 192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094 --topic my-replicated-topic
5.kafka 集群消息的消费
./kafka-console-consumer.sh --bootstrap-server 192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094 --from-beginning --topic my-replicated-topic
6.关于分区消费组消费者的细节
消费组中消费者的数量不能比一个 topic 中的 partition 数量多,否则多出来的消费者消费不到消息。
3.25
1、Kafka 的 Java 客户端-生产者
1.1.引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
2.生产者发送消息的基本实现
public class MySimpleProducer {
private final static String _TOPIC_NAME _="my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig._BOOTSTRAP_SERVERS_CONFIG_, "192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig._KEY_SERIALIZER_CLASS_CONFIG_, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig._VALUE_SERIALIZER_CLASS_CONFIG_, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> producerRecord = new ProducerRecord(_TOPIC_NAME_, "mykey", "hellokafka");
//发送消息
RecordMetadata metadata = producer.send(producerRecord).get();
System._out_.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());}}
3.发送消息到指定分区上
ProducerRecord<String, String> producerRecord = new ProducerRecord(_TOPIC_NAME_, 0, "mykey", "hellokafka");
4.未指定分区,则会通过业务 key 的 hash 运算,算出消息往哪个分区上发
ProducerRecord<String, String> producerRecord = new ProducerRecord(_TOPIC_NAME_, "mykey", "hellokafka");
5.同步发送
生产者同步发消息,在收到 kafka 的 ack 告知发送成功之前一直处于阻塞状态
//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
6.异步发送
生产者发消息,发送完后不用等待 broker 给回复,直接执行下面的业务逻辑。可以提供 callback,让 broker 异步的调用 callback,告知生产者,消息发送的结果
//异步发送消息
producer.send(producerRecord, new Callback(){
@Override
public void onCompletion(RecordMetadata metadata, Exception e){
if(e != null){
System._out_.println("发送消息失败" + e.getStackTrace());}if(metadata != null){
System._out_.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());}}});
7.关于生产者的 ack 参数配置
在同步发消息的场景下:生产者发动 broker 上后,ack 会有 3 种不同的选择:
- acks=0: 表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
- acks=1: 至少要等待 leader 已经成功将数据写入本地 log,但是不需要等待所有 follower 是否成功写入。就可以继续发送下一条消息。这种情况下,如果 follower 没有成功备份数据,而此时 leader 又挂掉,则消息会丢失。
- acks=-1 或 all: 需要等待 min.insync.replicas(默认为 1 ,推荐配置大于等于 2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
下面是关于 ack 重试的配置
// 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,
// 接收者那边做好接收消息的幂等性处理
props.put(ProducerConfig._RETRIES_CONFIG_, 3);
// 重试间隔设置
props.put(ProducerConfig._RETRY_BACKOFF_MS_CONFIG_, 300);
8.发送区消息的缓冲机制
- kafka 默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是 32M
props.put(ProducerConfig._BUFFER_MEMORY_CONFIG_, 33554432);
- kafka 本地线程会去缓冲区一次拉 16K 的数据,发送到 broker
props.put(ProducerConfig._BATCH_SIZE_CONFIG_, 16384);
- 如果线程拉不到 16K 的数据,间隔 10ms 会将已拉到的数据发到 broker
props.put(ProducerConfig._LINGER_MS_CONFIG_, 10);
CountDownLatch 计数,异步调用发完所有消息后主线程才结束
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
//异步发送消息
countDownLatch.countDown();
countDownLatch.await(5, TimeUnit._SECONDS_);
3.26
1.消费者消费消息的基本实现
public class MyConsumer {
private final static String TOPIC_NAME ="my-replicated-topic";
private final static String CONSUMER_GROUP_NAME ="testGroup";
public static void main(String[] args){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//创建一个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
// 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));while(true){
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String, String> record : records){
System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}}}}
2.自动提交 offset
3.手动提交 offset
设置手动提交参数
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
在消费完消息后进行手动提交
- 手动同步提交
if(records.count()>0){//有消息
// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();//会阻塞,直到broker返回ack
}
- 手动异步提交
if(records.count()>0){
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback(){
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata>offsets, Exception exception){if(exception != null){
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());}}});}
4、消费者 poll 消息的细节
- 消费者建立了与 broker 之间的⻓连接,开始 poll 消息。
- 默认一次 poll 500 条消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
- 如果每隔 1s 内没有 poll 到任何消息,则继续去 poll 消息,循环往复,直到 poll 到消息。如果超出了 1s,则此次⻓轮询结束。
ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));
- 如果一次 poll 到 500 条,就直接执行 for
- 如果一次没有 poll 到 500 条,且时间在 1s 内,那么长轮询继续 poll,直到 500 条或者 1s
- 如果多次 poll 没到 500 条,但 1s 时间到了,则执行 for
- 但是 AI 这么说:在 Apache Kafka 中,当消费者调用
poll()
方法时,它会尝试从 Kafka 服务器拉取最多ConsumerConfig.MAX_POLL_RECORDS_CONFIG
设置数量的消息。如果在一次poll()
调用期间,服务端可用的消息条数少于配置的最大值,消费者并不会等待更多消息到来,而是立刻返回当前可用的消息。
- 如果两次 poll 的时间如果超出了 30s 的时间间隔,kafka 会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。可以通过这个值进行设置:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
3.29
1、消费者健康状态检查
消费者发送心跳的时间间隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
kafka 如果超过 10 秒没有收到消费者的心跳,则会把消费者踢出消费组,进行 rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
2、指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
3、消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0 )));
4.指定 offset 消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seek(new TopicPartition(TOPIC_NAME, 0 ), 10 );
5.从指定时间点消费
List<PartitionInfo> topicPartitions =consumer.partitionsFor(TOPIC_NAME);
//从 1 小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();for(PartitionInfo par : topicPartitions){
map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDataTime);}
Map<TopicPartition, OffsetAndTimestamp> parMap =consumer.offsetsForTimes(map);for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry :parMap.entrySet()){
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();if(key == null || value == null)continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +"|offset-" + offset);
System.out.println();
//根据消费里的timestamp确定offset
if(value != null){
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);}}
6.新消费组的消费偏移量
- latest(默认) :只消费自己启动之后发送到主题的消息
- earliest:第一次从头开始消费,以后按照消费 offset 记录继续消费,这个需要区别于 consumer.seekToBeginning(每次都从头开始消费)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
十、Kafka 线上问题优化
1.如何防止消息丢失
- 发送方: ack 是 1 或者-1/all 可以防止消息丢失,如果要做到 99.9999%,ack 设成 all,把 min.insync.replicas 配置成分区备份数
- 消费方:把自动提交改为手动提交。
因网络抖动,生产者未收到 ack
一条消息被消费者消费多次。如果为了消息的不重复消费,而把生产端的重试机制关闭、消费端的手动提交改成自动提交,这样反而会出现消息丢失,那么可以直接在防治消息丢失的手段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题。
3.30
Springboot 中使用 Kafka
1、引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 172.16.253.21: 9093
producer: # 生产者
retries: 3# 设置大于 0 的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
3、消息生产者
@RestController
public class KafkaController {
private final static String TOPIC_NAME ="my-replicated-topic";
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send(){
kafkaTemplate.send(TOPIC_NAME, 0 , "key", "this is a msg");}}
4、消息消费者
@KafkaListener(topics ="my-replicated-topic",groupId ="MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record,Acknowledgment ack){
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
ack.acknowledge();}
版权归原作者 zrj316 所有, 如有侵权,请联系我们删除。