0


【2024】kafka原生以及配合springboot的使用(Kafka-3)

💻目录

前言

本文主要是介绍通过使用原生代码方式和结合springboot分别如何更好的去使用理解kafka

如果需要看理论或者安装kafka可以看我前面两篇内容
🍅kafka使用和安装

一、依赖

主要分为springboot和原生代码的依赖,还有hutool工具包

<dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--    springbootkafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--    springmvc原生kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies>

二、原生使用kafka

构建项目就不做过多说了,普通maven项目就行,重点在于方法API的使用。

1、发送消息

发送消息流程

  1. 首先是先创建一个Properties对象用于传递配置参数
  2. 然后通过props.put()方法添加需要添加的配置 - 添加连接kafka地址和设置序列化是必须的,后面的有默认的,可以根据情况设置
  3. 创建一个KafkaProducer连接客户端
  4. 创建ProducerRecord发送记录类,发送消息就是通过这个类进行发送
  5. 发送时可以选择同步或者异步进行发送
importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;/**
 * @projectName: kafka-mode
 * @package: com.zheng.kafkamode.kafka
 * @className: MyProducer
 * 消息的发送者---简单发送
 * @version: 1.0
 */@Slf4jpublicclassMyProducer{privatefinalstaticStringTOPIC_NAME="my-replicated-topic";publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{Properties props =newProperties();//        设置参数
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.211.55.6:9092");//        设置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        配置ACK//        props.put(ProducerConfig.ACKS_CONFIG,"1");        失败重试次数,3次//        props.put(ProducerConfig.RETRIES_CONFIG,3);//        失败重试时间间隔,300毫秒后重试//        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);//          kafka消息缓冲区大小,用来存放要发送到消息,缓冲区是32m,
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//          本地线程,一次性从缓冲区拉取的数据大小,16k
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//          如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafka
        props.put(ProducerConfig.LINGER_MS_CONFIG,10);//        连接客户端KafkaProducer<String,String> producer =newKafkaProducer<>(props);//        发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)//          默认partition数量和Broker创建的数量一致ProducerRecord<String,String> producerRecord =newProducerRecord<>(TOPIC_NAME,0,"my-keyValue3","hello");//        同步send(producer,producerRecord);//        异步asyncSend(producer,producerRecord);}/**
     * @param producer: 客户端对象
     * @return void
     * 同步发送
     * @date 2024/3/22 17:09
     */privatestaticvoidsend(KafkaProducer<String,String> producer,ProducerRecord<String,String> producerRecord)throwsInterruptedException,ExecutionException{//          等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();

        log.info("同步发送消息"+"topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}/**
     * @param producer: 客户端对象
     * @return void
     * 异步发送
     * @date 2024/3/22 17:09
     */privatestaticvoidasyncSend(KafkaProducer<String,String> producer,ProducerRecord<String,String> producerRecord)throwsInterruptedException{int sum =5;CountDownLatch countDownLatch =newCountDownLatch(sum);for(int i =0; i < sum; i++){ProducerRecord<String,String> producerRecord1 =newProducerRecord<>(TOPIC_NAME,"my-keyValue"+i,"zhangsan"+i);//        异步发送消息
            producer.send(producerRecord1,(metadata, exception)->{

                log.info("异步发送消息"+"topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());

                countDownLatch.countDown();});}

        countDownLatch.await(5,TimeUnit.SECONDS);}}

1.1、生产者同步发送消息

同步发送收到消息后会回复一个ack

ack有三个参数配置:(默认是1)

  • cak = 0:kafka收到消息后,不需要关心消息是否成功写入到分区中,马上就返回ack, - 会容易丢失消息,但效率最高
  • ack = 1 :多副本之间的leader分区已经收到消息,并把消息写入到本地log中,才会返回ack给生产者 - 性能和安全是最均衡的
  • ack = -1/all :里面有默认配置min.insync.replicas=2(默认为1,推荐配置大于等于2), - min.insync.replicas:代表同步副本的个数(如果是1,则是只需要leader收到就可以)- 最安全但性能最差

生产者如果3秒没有收到回复(ack),则会重试,如果重试3次还没成功,则抛出异常。

  • 消息丢失概率较小

在这里插入图片描述

1.2、生产者异步发送消息

异步发送不需要等待客户端回复ack

生产者发送消息后就可以做之后的业务,不需要等待broker在收到消息后异步调用生产者提供的callback。

  • 会出现消息丢失问题在这里插入图片描述

1.3、常用配置:

  • 基础配置
//        设置参数
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");//        设置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  • 发送消息配置
//        配置ACK
        props.put(ProducerConfig.ACKS_CONFIG,"1");//        失败重试次数,3次
        props.put(ProducerConfig.RETRIES_CONFIG,3);//        失败重试时间间隔,300毫秒后重试
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);
  • 缓冲区配置kafka发送消息的流程是先创建一个缓冲区,把消息先发送到缓冲区,然后再有一个本地线程,来这个缓冲区拉取数据,通过本地线程把数据从缓冲区拉取发送到kafka客户端

在这里插入图片描述

  • kafka默认会创建一个消息缓冲区,用来存放要发送到消息,缓冲区是32m,
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
  • 本地线程,一次性从缓冲区拉取的数据大小,16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
  • 如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafka
props.put(ProducerConfig.LINGER_MS_CONFIG,10);

2、接收消息

消费消息流程

  1. 也是首先是先创建一个Properties对象用于传递配置参数
  2. 然后在props.put()方法传递参数
  3. 在创建一个连接客户端携带上Properties。
  4. 然后通过消费者客户端去poll()消息,默认可以一次可以poll到五百条消息下来, - Duration.ofMillis(1000):表示如果没poll到五百条,1000ms后也结束这次poll。然后处理poll下来的消息,在继续循环poll下一次
@Slf4jpublicclassMyConsumer{privatefinalstaticStringTOPIC_NAME="my-replicated-topic";privatefinalstaticStringCONSUMER_GROUP_NAME="testGroup";publicstaticvoidmain(String[] args){Properties props =newProperties();//        设置参数
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");//        设置消费组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"CONSUMER_GROUP_NAME");//        设置序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//          创建一个消费者的客户端KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);//        消费者订阅主题列表
        consumer.subscribe(Arrays.asList(TOPIC_NAME));while(true){//            poll() API 是拉取消息的长轮训ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));//            取出单个offsetfor(ConsumerRecord record : records){
                
                log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}}}}

2.1、关于消费者的自动提交和手动提交

在这里插入图片描述

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息交给集群的_consumer_offsets主题里面。

  • 自动提交消费者poll消息下来以后就会自动提交offset// 设置自动提交offset:true=自动提交、false=手动提交。 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");注意:自动提交可能会丢失消息
  • 手动提交需要把自动提交的配置改为false props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");手动提交分为两种:- 手动同步提交在消费完消息后调用同步方法,会阻塞等待提交成功返回ack// 取出单个offsetfor(ConsumerRecord record : records){ log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}// 所有的消息已消费完if(records.count()>0){//有消息// 手动提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后页没什么业务逻辑 consumer.commitSync();}- 手动异步提交在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用// 取出单个offsetfor(ConsumerRecord record : records){ log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}// 所有的消息已消费完if(records.count()>0){//有消息// 手动异步提交,异步回调处理 consumer.commitAsync((offsetAndMetadataMap,exception)->{if(exception !=null){System.out.println("提交失败!失败原因:"+exception.getStackTrace());}System.out.println("提交成功!=="+offsetAndMetadataMap);});}

2.2、长轮训poll消息

  • 默认情况下,消费者一次性会poll500条消息。
//        一下poll消费的消息数,可以根据消费消息的快慢来决定,一次性消费多少消息
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
  • 代码中设置了长轮训的时间是1000毫秒
while(true){//            poll() API 是拉取消息的长轮训,1000代表本次拉取1秒钟就结束//              拉取消息到records中,ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));//            取出单个offset进行消费for(ConsumerRecord record : records){
                log.info("收到的消息"}}}

意味着:

  • 结束单次消费原因可能是: - 如果一次poll500条,就直接结束这次poll,去进入到for循环去消费这次poll到的消息。- 如果一次没有poll到500条,且时间在1秒内,那么长轮训继续poll;要么拉取到500条,要么达到1秒,才会结束拉取- 如果多次poll都没达到500条,且1秒时间到了,那么直接进入for循环。
  • 如果两次poll的时间间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点
//        如果两次poll的时间如果超过30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者。
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);

2.3、消费者的健康状态检查

消费者每隔1s向kafka集群发送心跳,集群发现如果超过10s没有续约的消费者,将被踢出消费组,触发rebalance机制,将该分区交给消费组里的其他消费者进行消费。

//        consumer给broker发送心跳的间隔时间
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);//        如果超过10s没收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);

2.4、指定分区和偏移量,时间消费

  • 指定分区消费 consumer.assign(Arrays.asList(newTopicPartition(TOPIC_NAME,0)));
  • 从头消费 consumer.assign(Arrays.asList(newTopicPartition(TOPIC_NAME,0))); consumer.seekToBeginning(Arrays.asList(newTopicPartition(TOPIC_NAME,0)));
  • 指定offset consumer.assign(Arrays.asList(newTopicPartition(TOPIC_NAME,0))); consumer.seek(newTopicPartition(TOPIC_NAME,0),10);
  • 指定时间节点开始消费List<PartitionInfo> topicPartition = consumer.partitionsFor(TOPIC_NAME);// key=分区:value=偏移量HashMap<TopicPartition,Long> map =newHashMap<>();// 从1小时前开始消费long fetchDateTime =newDate().getTime()-1000*60*60;for(PartitionInfo par : topicPartition){ map.put(newTopicPartition(TOPIC_NAME, par.partition()),fetchDateTime);}// 根据时间查找指定分区的偏移量Map<TopicPartition,OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for(Map.Entry<TopicPartition,OffsetAndTimestamp> entry:parMap.entrySet()){TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if(key ==null|| value ==null)continue;long offset = value.offset();System.out.println("partition-"+key.partition()+"===offset-"+offset);System.out.println();// 再通过指定offset消费if(value !=null){ consumer.assign(Arrays.asList(key)); consumer.seek(key,offset);}}

2.5、新消费组的消费offset规则

新消费组的消费者在启动以后,默认会从连接后的分区的offset开始消费(消费新消息)。可以通过下面设置,让新消费者第一次从头开始消费,之后则会只消费新的消费者(最后消费的位置的偏移量+1)

  • Latest:默认的,消费新消息
  • earliest:第一次从头开始消费。之后开始消费新消息
//        新连接的消费组是否需要从头开始消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

三、Spring boot配置连接kafka

1、配置yml配置文件

和原生的区别是把客户端交给spring来管理,通过yml进行配置。配置的参数名字也基本上都是一样的,参数也是一样的。

spring:kafka:bootstrap-servers: 10.211.55.6:9092#    生产者producer:retries:3#设置大于0的值,则客户端会将发送失败的记录重新发送batch-size:16384#一次从缓冲区拉取的大小16kbuffer-memory:33554432#本地缓冲区大小32macks:1#      编解码规则(默认)key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#     消费者consumer:group-id: default-group #消费组enable-auto-commit:false#手动提交auto-offset-reset: earliest  #新消费组从头消费#编解码规则key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records:500#一次拉取五百条listener:ack-mode: MANUAL_IMMEDIATE
#       手动调用acknowledge()后立即提交,一般使用这个#      MANUAL_IMMEDIATE#       当每一条记录被消费者监听器(ListenerConsumer)处理之后提交#      RECORD#       当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交#      BATCH#       当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于time时提交#      TIME#       当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于count时提交#       COUNT#       TIME |COUNT 有一个条件满足时提交#      COUNT_TIME#       poll()拉取一批消息,处理完业务后,手动调用acknowledge()后立即提交#      MANUAL

2、配置生产者

简单创建一个Controller用来生产消息

生产消息主要是通过kafkaTemplate模版类,

@RestController@RequestMapping("/kafka")publicclassKafkaController{privatefinalstaticStringTOPIC_NAME="my-replicated-topic";@ResourceprivateKafkaTemplate<String,Object> kafkaTemplate;@PostMapping("/test")publicStringtest(@RequestBodyUser user){JSONObject jsonObject =newJSONObject(user);ListenableFuture<SendResult<String,Object>> send = kafkaTemplate.send(TOPIC_NAME,0,"key", jsonObject.toString());return"成功!";}}

3、配置消费者

可以通过ConsumerRecord一条一条的接收处理或者通过ConsumerRecords批量接收处理,但我们还是得for一条一条的处理,所以一般选择第一种就好

通过KafkaListener注解,配置接收的topics,以及消费者组id,以及一些其他的消费者信息

如:

listenGroupPro

方法的使用

@Slf4j@ComponentpublicclassMySpringBootConsumer{/**
     * @param record:
     * @param ack:
     * @return void
     * 一次性读取一条,实际上也是一次性接收500条,然后一条消息回调一次,和下面的一样
     * @date 2024/3/26 21:12
     */@KafkaListener(topics ="my-replicated-topic",groupId ="testGroup")publicvoidlistenGroup(ConsumerRecord<String,String> record,Acknowledgment ack){
        log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());//        手动提交ack,每处理完一条消息提交一次
        ack.acknowledge();}/**
     * @param records:
     * @param ack:
     * @return void
     * 一次性全部接收,指定分组和topic,和上面的区别是提交ack时机会不一样,
     * @date 2024/3/26 21:09
     */@KafkaListener(topics ="my-replicated-topic",groupId ="testGroup1")publicvoidlistenGroupS(ConsumerRecords<String,Object> records,Acknowledgment ack){for(ConsumerRecord<String,Object> record: records){
            log.info("testGroup1收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}//        手动提交ack,处理完records一批消息提交一次
        ack.acknowledge();}@KafkaListener(groupId ="testGroup2",topicPartitions ={@TopicPartition(topic ="my-replicated-topic2",partitions ={"0","1"}),//指定多个分区@TopicPartition(topic ="my-replicated-topic",partitions ="0",
                    partitionOffsets =@PartitionOffset(partition ="1",initialOffset ="5"))//从5号offset开始消费},concurrency ="3")//concurrency就是同区消费组下的消费者个数,建议小于分区总数publicvoidlistenGroupPro(ConsumerRecord<String,String> record,Acknowledgment ack){

        log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {};topic={}",record.partition(),record.offset(),record.key(),record.value(),record.topic());//        手动提交ack,每处理完一条消息提交一次
        ack.acknowledge();}}

本文转载自: https://blog.csdn.net/weixin_52315708/article/details/137247961
版权归原作者 方渐鸿 所有, 如有侵权,请联系我们删除。

“【2024】kafka原生以及配合springboot的使用(Kafka-3)”的评论:

还没有评论