0


Kafka(四)【Kafka 消费者】

前言

    截止昨天 Flink 第一遍是过了,当然得深入复习,把相关的书都看一遍。那么今天开始就得同样抓紧把 Kafka 、Flume 过完第一遍,完了看看相关的书。最后用这些先做一个小的项目。至于剩下的时间,就是每天离线数仓、实时数仓的学了,必须掌握到 70~90%。虽然不知道结果怎么样,但是无路可退了,那就肝到底吧。

    此外还有 SSM、SpringBoot 也是需要掌握的,好在兴趣使然,没有多大压力。

Kafka 消费者

1、消费方式

    Kafka 消费者使用 pull 的方式从 broker 主动拉取数据,而不是让 broker 去主动把数据 push(推/主动发送)给消费者,因为毕竟每个消费者的速度是不同的,最好还是根据消费者自己的性能来获取数据。

2、消费者工作流程

2.1、消费者总体工作流程

  • 每个消费者可以消费多个分区,但是一个分区的数据只能被一个消费者组里的一个消费者消费。

如果消费者在消费完某个数据之后挂掉了,有后续新的消费者代替它,那么新的小肥猪怎么继续消费?

其实,在消费者读取数据的 offset 是会被保存在 Kafka broker 系统主题中的,也就是说,即使消费者挂了,下一个消费者可以从 broker 的系统主题里获得上次消费的 offset ,然后接着继续消费。(旧版本 offset 是存储在 zookeeper 中的,但是当消费者非常多的时候可能会造成大量的网络交互;)

2.2、消费者组原理

2.2.1、消费者组

consumer group:消费者组,由多个 consumer 组成。形成一个消费者组的条件是所有消费者的 groupid 相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的一个消费者消费(防止数据被重复消费)。
  • 消费者组之间互不影响。所有消费者都属于某个消费者,即消费者组是逻辑上的一个订阅者。
  • 如果消费者组的组员数量 > 分区数量,则就会有多余的消费者闲置。
2.2.2、消费者组初始化流程

  • coordinator:辅助实现消费者组的初始化和分区的分配。coordinator 节点选择 = groupid.hashcode()%50 (goupid 是我们自己写代码的时候指定的,50 是 _consumer_offsets 的分区数量默认是 50)

选定 coordinator 后:

  1. 消费者组内的每个消费者都会向 coordinator 发送一个 JoinGroup 请求(请求加入到 groupid 这个组)
  2. coordinator 会从发来请求的所有消费者中随机选择一个作为消费者组中的 leader。
  3. coordinator 会把自己从这些消费者中收集到的请求中的信息都发送给这个 leader,也就是说,coordinator 只是辅助消费者组的分区选择,真正的分区分配是由 leader 完成的。
  4. leader 会定制一个消费方案。
  5. 制定好消费方案后,leader 会把制定好的计划发送给 coordinator。
  6. coordinator 然后把消费方案下发给每个消费者。
  7. 每个消费者都会和 coordinator 保持心跳(默认 3s),一旦超时(45s)该消费者就会被移除,并触发再平衡;或者消费者处理时间太长(5分钟)也会触发再平衡

消费者消费数据的条件:

  • fetch.min.bytes 每批次最小抓取字节数:只要达到该字节数就进行返回
  • fetch.max.wait.ms 一批数据最小值未到达的超时时间 :即使没有达到最小字节数,当等待时间达到该值时也会进行返回
  • fetch.max.bytes 每批次最大抓取字节数

消费者消费的参数:

  • max.poll.records 每次拉取的最大消息数,默认 500 条

此外,消费者可以和生产者一样在拦截器这里对数据进行处理。

2.3、消费者重要参数

参数名称

描述

bootstrap.servers

向Kafka集群建立初始连接用到的host/port列表。

key.deserializer和value.deserializer

指定接收消息的key和value的反序列化类型。一定要写全类名。

group.id

标记消费者所属的消费者组。

enable.auto.commit

默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

auto.offset.reset

当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。

offsets.topic.num.partitions

__consumer_offsets的分区数,默认是50个分区。

heartbeat.interval.ms

Kafka消费者和coordinator之间的心跳时间,默认3s。

该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。

session.timeout.ms

Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

fetch.min.bytes

默认1个字节。消费者获取服务器端一批消息最小的字节数。

fetch.max.wait.ms

默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。

fetch.max.bytes

默认Default:52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次poll拉取数据返回消息的最大条数,默认是500条。

3、消费者 API

我们分三部分来实践消费者 API,一种是用消费者来消费一个主题(一个消费者消费多个分区),另一种是用一个消费者来只消费一个分区,最后一种是用一个消费者组来消费(也就是消费者组内的每个消费者消费一个分区)。

使用 API 的注意事项:

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。

不管有没有消费者组,都需要配置消费者id!(因为独立消费者相当于特殊的消费者组,也就是相同消费者组 id 的消费者只有一个)

3.1、独立消费者案例(订阅主题)

public class CustomConsumer {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();

        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 注册要消费的主题
        List<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);

        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }

    }
}

注意:消费者这里是反序列化!

测试:

在 hadoop102 生产数据:

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

记得修改分区数为 3:

在 IDEA 消费数据:

3.2、独立消费者案例(订阅分区)

只需要稍微修改一下上面的代码;

List<TopicPartition> topics = new ArrayList<>();
topics.add(new TopicPartition("first",0));
consumer.assign(topics);

可以看到,直接消费主题中所有分区时,我们直接传入一个主题名称即可,而指定消费主题的特定分区时,需要传入一个或多个 TopicPartition 对象。

这次我们使用带回调的生产者来生产消息:

public class CustomProducerCallback {

    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();
        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的 key 和 value 的序列化类型 key.serialize
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
            // 确保数据发往不同的分区
            Thread.sleep(2);
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

可以看到我们共往0号分区发了 2 条消息:

观察消费者窗口:

可以看到,消费者只消费到了我们指定的分区数据。

3.3、消费者组案例

   要实现消费者组很简单,我们直接复制上面 3.1 中独立消费者代码为 CustomConsumer1,让 CustomConsumer1 去消费分区1的数据,这样两个 main 方法同时执行就实现相当于两个消费者同时消费了。

我们继续使用上面带回调函数的生产者:

可以看到生产者往主题中发送了 5 条数据,我们观察消费者:

可以看到,消费者0 接收了 0 号分区,而消费者 1 接收了 1号和2号分区的数据。

4、生产经验-分区的分配以及再平衡

4.1、Range 以及再平衡

1)Range 分区策略原理

一个主题有多个分区,而一个消费者组有多个消费者,那么每个消费者消费哪一个分区呢?

目前,Kafka 有 4 种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeStick(Kafka 3.0 新特性)。可以通过配置参数 partition.assignment.strategy ,修改分区的分配策略。默认策略是 Range + CooperativeStick。Kafka 可以同时使用多个分配策略。

参数名称

描述

heartbeat.interval.ms

Kafka消费者和coordinator之间的心跳时间,默认3s。

该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的1/3。

session.timeout.ms

Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

partition.assignment.strategy

消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

2)Range 分区分配策略案例

Range 是针对每个 topic 而言的。

  • 首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
  • 通过 partition数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前几个消费者会多消费几个分区。

比如上面的 topic 一共 7 个分区,我们的消费者组有 3 个消费者,7/3=2,7%3=1,多 1 个分区没人处理,于是交给消费者0处理。

注意:这种方式容易造成数据倾斜!因为,如果我们有多个 topic 由这一个消费者组来消费,那么每个 topic 如果都把剩余的分区交给前面的消费者,那么我们前面的消费者和后面的消费者的压力差距就会特别大。所以,这种方式只适合于 topic 较少的情况。

  1. 我们修改上面创建过的主题 first 的分区数为 7 。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7

注意:分区数只能增加,不能减少。

  1. 复制CustomConsumer类,创建CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2组成消费者组,组名都为“test”,同时启动3个消费者。

  2. 启动CustomProducer生产者,发送 7 条消息,发送到不同的分区。

public class CustomProducer {
    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        for (int i = 0; i < 7; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", i, "test", "lyh"));
        }

        kafkaProducer.close();
    }
}

说明:Kafka默认的分区分配策略就是Range + CooperativeSticky,所以不需要修改策略。

观查3个消费者分别消费哪些分区的数据:

消费者0 消费了 0、1、2号分区的数据

消费者1 消费了 5、6 号数据

消费者 2 消费了 3、4号数据

3)Range 分区分配再平衡案例
  1. 停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

     1号消费者:消费到3、4号分区数据。
    
     2号消费者:消费到5、6号分区数据。
    
     0号消费者的任务会整体被分配到1号消费者或者2号消费者。
    

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

  1. 再次重新发送消息观看结果(45s以后)。

     1号消费者:消费到0、1、2、3号分区数据。
    
     2号消费者:消费到4、5、6号分区数据。
    

说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。

4.2、RoundRobin 以及再平衡

1)RoundRobin 分区策略原理

RoundRobin 是针对集群中所有 topic 而言的。它会把所有 topic 的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 都列给消费者。

2)RoundRobin 分区分配策略案例
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");  

消费者0 消费了分区 0,3,6

消费者1 消费了分区 2,5

消费者2 消费了分区 1,4

3)RoundRobin 分区分配再平衡案例

(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

1号消费者:消费到2、5号分区数据

2号消费者:消费到4、1号分区数据

0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

(2)再次重新发送消息观看结果(45s以后)。

1号消费者:消费到0、2、4、6号分区数据

2号消费者:消费到1、3、5号分区数据

说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。

4.3、Sticky 以及再平衡

**** 粘性分区定义:****可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

    粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

    粘性分区策略会尽量均匀分配分区并随机分配给每个消费者,比如一共有 0~6 7个分区要分配给3个消费者,那么可能的一种结果就是消费者0:1,4 消费者1:0,3,6 消费者2:2,5

(1)修改分区分配策略为粘性。

注意:3个消费者都应该注释掉,之后重启3个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

 // 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
Sticky 分区分配再平衡案例

(1)停止掉0号消费者(0号消费者消费的是 0,1号分区的数据),快速重新发送消息观看结果(45s以内,越快越好)。

    1号消费者:消费到2、5、3号分区数据。

    2号消费者:消费到4、6号分区数据。

    0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

(2)再次重新发送消息观看结果(45s以后)。

    1号消费者:消费到2、3、5号分区数据。

    2号消费者:消费到0、1、4、6号分区数据。

说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。

5、offset 位移

5.1、offset 的默认维护位置

__consumer_offsets主题里面采用key和value的方式存储数据。key是group.id+topic+分区号,value就是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是 key 保持不变,不断更新value。

1****)消费offset案例

(0)思想:__consumer_offsets为Kafka中的topic,那就可以通过消费者进行消费。

(1)在配置文件config/consumer.properties中添加配置exclude.internal.topics=false,默认是true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。

(2)采用命令行方式,创建一个新的topic

(3)向刚创建的主题 lyh 中发送数据

(4)启动一个消费者来消费 lyh 主题中的数据

消费数据才会有 offset 生成,同时我们需要指定组 id ,因为如果我们不指定,kafka 默认也会给我们指定一个组id,这样我们就不方便查找了。

(5)查看消费者消费主题 __consumer_offsets

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  hadoop102:9092 --consumer.config config/consumer.properties  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

5.2、自动提交 offset

为了使我们能够专注自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。

参数名称

描述

enable.auto.commit

默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

配置参数:

// 设置为自动提交 默认为true
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        // 设置自动提交间隔 默认5000ms
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

5.3、手动提交 offset

    虽然自动提交十分方便,但是由于自动提交的频率通常是固定的,这可能不适应所有场景。如果自动提交的间隔设置得过大,当消费者在自动提交偏移量之前异常退出时,可能会导致 Kafka 未提交偏移量,进而出现重复消费的问题。

    所以 Kafka 也提供了基于事件的手动提交,也就是消费完一批数据之后就提交一个 offset,这样就不用像自动提交那样出现一个攒批的过程,就不用担心出现 offset 丢失这种情况了。而手动提交又分为同步提交和异步提交。它俩的相同点是都会将一批数据最高的偏移量提交,不同点是,同步提交会阻塞当前线程,直到提交成功才会继续消费,如果失败会进行重试,但是异步提交消费完数据后不会等待提交完 offset 才消费,也没有失败重试机制,所以可能会出现提交失败。
5.3.1、同步提交offset

由于同步提交offset有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交offset的示例。

public class CustomConsumerByHand {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();

        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
        // 修改分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

        // 设置为手动提交 默认为自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 注册要消费的主题
        List<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);

        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
            // 手动提交 offset
            consumer.commitSync();//同步提交
        }

    }
}
5.3.2、异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

修改为异步提交 offset 只需要修改上面的代码:

consumer.commitAsync();

通常我们用异步发送多一点,因为这样效率高一点。

5.4、指定 offset 消费

auto.offset.reset = earliest | latest | none 默认是latest。

当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。

(2)latest(默认值):自动将偏移量重置为最新偏移量。

(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

(4)任意指定offset位移开始消费

public class CustomConsumerSeek {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();

        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");

        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 注册要消费的主题
        consumer.subscribe(Arrays.asList("first"));

        // 指定消费位置 offset
        // 获取分区信息 需要时间
        Set<TopicPartition> assignment = consumer.assignment();

        // 保证分区分配方案制定完毕
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));

            assignment = consumer.assignment();
        }

        for (TopicPartition partition : assignment) {
            // 指定从 offset=100 的位置开始消费
            consumer.seek(partition,33);
        }

        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }

    }
}

5.5、指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

获取一天前的消息数据:

public class CustomConsumerSeekTime {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();

        // 2. 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // 配置k,v序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");

        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 注册要消费的主题
        consumer.subscribe(Arrays.asList("first"));

        // 指定消费位置 offset
        // 获取分区信息 需要时间
        Set<TopicPartition> assignment = consumer.assignment();

        // 保证分区分配方案制定完毕
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));

            assignment = consumer.assignment();
        }

        // 希望通过时间获得相应的 offset
        HashMap<TopicPartition, Long> map = new HashMap<>();

        // 遍历每个分区添加到集合
        for (TopicPartition topicPartition : assignment) {
            map.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }

        // 通过集合得到 map<分区,offset信息>
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(map);

        for (TopicPartition partition : assignment) {
            // 指定时间开始消费
            // 把时间转为 offset
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(partition);
            consumer.seek(partition,offsetAndTimestamp.offset());
        }

        // 拉取数据打印
        while (true){
            // 设置 1s 消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }

    }
}

5.6、漏消费和重复消费

重复消费:已经消费了数据,但是offset没提交。

漏消费:先提交offset后消费,有可能会造成数据的漏消费。

  • 重复消费:在自动提交的时候,提交 offset (默认是5s提交一次最大 offset)和消费者是互不影响的,所以提交完 offset 同时,消费者已经又消费了一些大于当前 offset 的数据,所以如果在下一次提交 offset 之前如果消费者挂掉的话,那么这一部分已经被消费的数据由于没有提交 offset 就会被其它消费者重复消费。
  • 漏消费:手动提交的时候,当消费者拿到这个数据的时刻就会提交 offset,但是如果数据在消费者这里还没有被处理就挂机了,那么这个数据就会被漏掉

6、生产经验-消费者事务

    正因为有重复消费和漏消费,所以就引入了消费者事务。就像我们之前学 Flink 容错机制的时候讲的输出端一致性保证时用到的两阶段提交(2PC)我们写入 Kafka 的过程其实是一个两段式的提交处理完毕,得到结果写入 Kafka 是基于事物的“预提交”,等到检查点保存完毕才会提交事务,进行正式提交,如果中间出现故障,事故进行回滚,预提交就会被放弃,恢复状态之后也只能恢复所有已确认提交的操作。

    这里的消费者事务需要下游消费者(比如 Spark、Flink、MySQL)也支持事务才能做到精确一次消费(比如 HBase 就不支持事务),其实我们上面说的 Flink Sink 连接 Kafka 为保证精确一次而提出的两阶段提交、还有 Flink 事务回滚checkpoint恢复,Kafka重置偏移量都是通过事务确保数据精准一次的例子。

7、生产经验-数据积压(消费者如何提高吞吐量)

  • 如果说 Kafka 消费能力不足,则可以考虑增加 topic 的分区数量;并且同时提高消费者组的消费者数量,消费者数 = 分区数(二者缺一不可)
  • 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压

参数名称

描述

fetch.max.bytes

默认Default:52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次poll拉取数据返回消息的最大条数,默认是500条

回忆之前生产者到 Kafka 提高吞吐量的策略:

  • batch.size :内存队列中每个批次的大小,默认 16K
  • linger.ms:等待时间,修改为 5-100ms
  • compression.type:压缩 snappy
  • RecordAccumulator:缓冲区大小

这里我们又学习了怎么提高 Kafka 到消费者的吞吐量,这两个应该配合起来使用。


总结

    自此,Kafka 的第一遍学习基本上是完成了,之后开学的任务就是在课上把《Kafka 权威指南》看完理解记忆消化,Kafka 是十分重要的内容,需要不断学习加深理解。
标签: kafka 分布式

本文转载自: https://blog.csdn.net/m0_64261982/article/details/135815390
版权归原作者 让线程再跑一会 所有, 如有侵权,请联系我们删除。

“Kafka(四)【Kafka 消费者】”的评论:

还没有评论