💻目录
前言
本文主要是介绍通过使用原生代码方式和结合springboot分别如何更好的去使用理解kafka
如果需要看理论或者安装kafka可以看我前面两篇内容
🍅kafka使用和安装
一、依赖
主要分为springboot和原生代码的依赖,还有hutool工具包
<dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- springbootkafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- springmvc原生kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies>
二、原生使用kafka
构建项目就不做过多说了,普通maven项目就行,重点在于方法API的使用。
1、发送消息
发送消息流程
- 首先是先创建一个Properties对象用于传递配置参数
- 然后通过props.put()方法添加需要添加的配置 - 添加连接kafka地址和设置序列化是必须的,后面的有默认的,可以根据情况设置
- 创建一个KafkaProducer连接客户端
- 创建ProducerRecord发送记录类,发送消息就是通过这个类进行发送
- 发送时可以选择同步或者异步进行发送
importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;/**
* @projectName: kafka-mode
* @package: com.zheng.kafkamode.kafka
* @className: MyProducer
* 消息的发送者---简单发送
* @version: 1.0
*/@Slf4jpublicclassMyProducer{privatefinalstaticStringTOPIC_NAME="my-replicated-topic";publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{Properties props =newProperties();// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.211.55.6:9092");// 设置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 配置ACK// props.put(ProducerConfig.ACKS_CONFIG,"1"); 失败重试次数,3次// props.put(ProducerConfig.RETRIES_CONFIG,3);// 失败重试时间间隔,300毫秒后重试// props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);// kafka消息缓冲区大小,用来存放要发送到消息,缓冲区是32m,
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// 本地线程,一次性从缓冲区拉取的数据大小,16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafka
props.put(ProducerConfig.LINGER_MS_CONFIG,10);// 连接客户端KafkaProducer<String,String> producer =newKafkaProducer<>(props);// 发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)// 默认partition数量和Broker创建的数量一致ProducerRecord<String,String> producerRecord =newProducerRecord<>(TOPIC_NAME,0,"my-keyValue3","hello");// 同步send(producer,producerRecord);// 异步asyncSend(producer,producerRecord);}/**
* @param producer: 客户端对象
* @return void
* 同步发送
* @date 2024/3/22 17:09
*/privatestaticvoidsend(KafkaProducer<String,String> producer,ProducerRecord<String,String> producerRecord)throwsInterruptedException,ExecutionException{// 等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();
log.info("同步发送消息"+"topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}/**
* @param producer: 客户端对象
* @return void
* 异步发送
* @date 2024/3/22 17:09
*/privatestaticvoidasyncSend(KafkaProducer<String,String> producer,ProducerRecord<String,String> producerRecord)throwsInterruptedException{int sum =5;CountDownLatch countDownLatch =newCountDownLatch(sum);for(int i =0; i < sum; i++){ProducerRecord<String,String> producerRecord1 =newProducerRecord<>(TOPIC_NAME,"my-keyValue"+i,"zhangsan"+i);// 异步发送消息
producer.send(producerRecord1,(metadata, exception)->{
log.info("异步发送消息"+"topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());
countDownLatch.countDown();});}
countDownLatch.await(5,TimeUnit.SECONDS);}}
1.1、生产者同步发送消息
同步发送收到消息后会回复一个ack
ack有三个参数配置:(默认是1)
- cak = 0:kafka收到消息后,不需要关心消息是否成功写入到分区中,马上就返回ack, - 会容易丢失消息,但效率最高
- ack = 1 :多副本之间的leader分区已经收到消息,并把消息写入到本地log中,才会返回ack给生产者 - 性能和安全是最均衡的
- ack = -1/all :里面有默认配置min.insync.replicas=2(默认为1,推荐配置大于等于2), - min.insync.replicas:代表同步副本的个数(如果是1,则是只需要leader收到就可以)- 最安全但性能最差
生产者如果3秒没有收到回复(ack),则会重试,如果重试3次还没成功,则抛出异常。
- 消息丢失概率较小
1.2、生产者异步发送消息
异步发送不需要等待客户端回复ack
生产者发送消息后就可以做之后的业务,不需要等待broker在收到消息后异步调用生产者提供的callback。
- 会出现消息丢失问题
1.3、常用配置:
- 基础配置
// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");// 设置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- 发送消息配置
// 配置ACK
props.put(ProducerConfig.ACKS_CONFIG,"1");// 失败重试次数,3次
props.put(ProducerConfig.RETRIES_CONFIG,3);// 失败重试时间间隔,300毫秒后重试
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);
- 缓冲区配置kafka发送消息的流程是先创建一个缓冲区,把消息先发送到缓冲区,然后再有一个本地线程,来这个缓冲区拉取数据,通过本地线程把数据从缓冲区拉取发送到kafka客户端
- kafka默认会创建一个消息缓冲区,用来存放要发送到消息,缓冲区是32m,
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
- 本地线程,一次性从缓冲区拉取的数据大小,16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
- 如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafka
props.put(ProducerConfig.LINGER_MS_CONFIG,10);
2、接收消息
消费消息流程
- 也是首先是先创建一个Properties对象用于传递配置参数
- 然后在props.put()方法传递参数
- 在创建一个连接客户端携带上Properties。
- 然后通过消费者客户端去poll()消息,默认可以一次可以poll到五百条消息下来, - Duration.ofMillis(1000):表示如果没poll到五百条,1000ms后也结束这次poll。然后处理poll下来的消息,在继续循环poll下一次
@Slf4jpublicclassMyConsumer{privatefinalstaticStringTOPIC_NAME="my-replicated-topic";privatefinalstaticStringCONSUMER_GROUP_NAME="testGroup";publicstaticvoidmain(String[] args){Properties props =newProperties();// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");// 设置消费组名
props.put(ConsumerConfig.GROUP_ID_CONFIG,"CONSUMER_GROUP_NAME");// 设置序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 创建一个消费者的客户端KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);// 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));while(true){// poll() API 是拉取消息的长轮训ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));// 取出单个offsetfor(ConsumerRecord record : records){
log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}}}}
2.1、关于消费者的自动提交和手动提交
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息交给集群的_consumer_offsets主题里面。
- 自动提交消费者poll消息下来以后就会自动提交offset
// 设置自动提交offset:true=自动提交、false=手动提交。 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
注意:自动提交可能会丢失消息 - 手动提交需要把自动提交的配置改为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
手动提交分为两种:- 手动同步提交在消费完消息后调用同步方法,会阻塞等待提交成功返回ack// 取出单个offsetfor(ConsumerRecord record : records){ log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}// 所有的消息已消费完if(records.count()>0){//有消息// 手动提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后页没什么业务逻辑 consumer.commitSync();}
- 手动异步提交在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用// 取出单个offsetfor(ConsumerRecord record : records){ log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}// 所有的消息已消费完if(records.count()>0){//有消息// 手动异步提交,异步回调处理 consumer.commitAsync((offsetAndMetadataMap,exception)->{if(exception !=null){System.out.println("提交失败!失败原因:"+exception.getStackTrace());}System.out.println("提交成功!=="+offsetAndMetadataMap);});}
2.2、长轮训poll消息
- 默认情况下,消费者一次性会poll500条消息。
// 一下poll消费的消息数,可以根据消费消息的快慢来决定,一次性消费多少消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
- 代码中设置了长轮训的时间是1000毫秒
while(true){// poll() API 是拉取消息的长轮训,1000代表本次拉取1秒钟就结束// 拉取消息到records中,ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));// 取出单个offset进行消费for(ConsumerRecord record : records){
log.info("收到的消息"}}}
意味着:
- 结束单次消费原因可能是: - 如果一次poll500条,就直接结束这次poll,去进入到for循环去消费这次poll到的消息。- 如果一次没有poll到500条,且时间在1秒内,那么长轮训继续poll;要么拉取到500条,要么达到1秒,才会结束拉取- 如果多次poll都没达到500条,且1秒时间到了,那么直接进入for循环。
- 如果两次poll的时间间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点
// 如果两次poll的时间如果超过30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者。
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);
2.3、消费者的健康状态检查
消费者每隔1s向kafka集群发送心跳,集群发现如果超过10s没有续约的消费者,将被踢出消费组,触发rebalance机制,将该分区交给消费组里的其他消费者进行消费。
// consumer给broker发送心跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);// 如果超过10s没收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);
2.4、指定分区和偏移量,时间消费
- 指定分区消费
consumer.assign(Arrays.asList(newTopicPartition(TOPIC_NAME,0)));
- 从头消费
consumer.assign(Arrays.asList(newTopicPartition(TOPIC_NAME,0))); consumer.seekToBeginning(Arrays.asList(newTopicPartition(TOPIC_NAME,0)));
- 指定offset
consumer.assign(Arrays.asList(newTopicPartition(TOPIC_NAME,0))); consumer.seek(newTopicPartition(TOPIC_NAME,0),10);
- 指定时间节点开始消费
List<PartitionInfo> topicPartition = consumer.partitionsFor(TOPIC_NAME);// key=分区:value=偏移量HashMap<TopicPartition,Long> map =newHashMap<>();// 从1小时前开始消费long fetchDateTime =newDate().getTime()-1000*60*60;for(PartitionInfo par : topicPartition){ map.put(newTopicPartition(TOPIC_NAME, par.partition()),fetchDateTime);}// 根据时间查找指定分区的偏移量Map<TopicPartition,OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for(Map.Entry<TopicPartition,OffsetAndTimestamp> entry:parMap.entrySet()){TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if(key ==null|| value ==null)continue;long offset = value.offset();System.out.println("partition-"+key.partition()+"===offset-"+offset);System.out.println();// 再通过指定offset消费if(value !=null){ consumer.assign(Arrays.asList(key)); consumer.seek(key,offset);}}
2.5、新消费组的消费offset规则
新消费组的消费者在启动以后,默认会从连接后的分区的offset开始消费(消费新消息)。可以通过下面设置,让新消费者第一次从头开始消费,之后则会只消费新的消费者(最后消费的位置的偏移量+1)
- Latest:默认的,消费新消息
- earliest:第一次从头开始消费。之后开始消费新消息
// 新连接的消费组是否需要从头开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
三、Spring boot配置连接kafka
1、配置yml配置文件
和原生的区别是把客户端交给spring来管理,通过yml进行配置。配置的参数名字也基本上都是一样的,参数也是一样的。
spring:kafka:bootstrap-servers: 10.211.55.6:9092# 生产者producer:retries:3#设置大于0的值,则客户端会将发送失败的记录重新发送batch-size:16384#一次从缓冲区拉取的大小16kbuffer-memory:33554432#本地缓冲区大小32macks:1# 编解码规则(默认)key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者consumer:group-id: default-group #消费组enable-auto-commit:false#手动提交auto-offset-reset: earliest #新消费组从头消费#编解码规则key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records:500#一次拉取五百条listener:ack-mode: MANUAL_IMMEDIATE
# 手动调用acknowledge()后立即提交,一般使用这个# MANUAL_IMMEDIATE# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于time时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于count时提交# COUNT# TIME |COUNT 有一个条件满足时提交# COUNT_TIME# poll()拉取一批消息,处理完业务后,手动调用acknowledge()后立即提交# MANUAL
2、配置生产者
简单创建一个Controller用来生产消息
生产消息主要是通过kafkaTemplate模版类,
@RestController@RequestMapping("/kafka")publicclassKafkaController{privatefinalstaticStringTOPIC_NAME="my-replicated-topic";@ResourceprivateKafkaTemplate<String,Object> kafkaTemplate;@PostMapping("/test")publicStringtest(@RequestBodyUser user){JSONObject jsonObject =newJSONObject(user);ListenableFuture<SendResult<String,Object>> send = kafkaTemplate.send(TOPIC_NAME,0,"key", jsonObject.toString());return"成功!";}}
3、配置消费者
可以通过ConsumerRecord一条一条的接收处理或者通过ConsumerRecords批量接收处理,但我们还是得for一条一条的处理,所以一般选择第一种就好
通过KafkaListener注解,配置接收的topics,以及消费者组id,以及一些其他的消费者信息
如:
listenGroupPro
方法的使用
@Slf4j@ComponentpublicclassMySpringBootConsumer{/**
* @param record:
* @param ack:
* @return void
* 一次性读取一条,实际上也是一次性接收500条,然后一条消息回调一次,和下面的一样
* @date 2024/3/26 21:12
*/@KafkaListener(topics ="my-replicated-topic",groupId ="testGroup")publicvoidlistenGroup(ConsumerRecord<String,String> record,Acknowledgment ack){
log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());// 手动提交ack,每处理完一条消息提交一次
ack.acknowledge();}/**
* @param records:
* @param ack:
* @return void
* 一次性全部接收,指定分组和topic,和上面的区别是提交ack时机会不一样,
* @date 2024/3/26 21:09
*/@KafkaListener(topics ="my-replicated-topic",groupId ="testGroup1")publicvoidlistenGroupS(ConsumerRecords<String,Object> records,Acknowledgment ack){for(ConsumerRecord<String,Object> record: records){
log.info("testGroup1收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}// 手动提交ack,处理完records一批消息提交一次
ack.acknowledge();}@KafkaListener(groupId ="testGroup2",topicPartitions ={@TopicPartition(topic ="my-replicated-topic2",partitions ={"0","1"}),//指定多个分区@TopicPartition(topic ="my-replicated-topic",partitions ="0",
partitionOffsets =@PartitionOffset(partition ="1",initialOffset ="5"))//从5号offset开始消费},concurrency ="3")//concurrency就是同区消费组下的消费者个数,建议小于分区总数publicvoidlistenGroupPro(ConsumerRecord<String,String> record,Acknowledgment ack){
log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {};topic={}",record.partition(),record.offset(),record.key(),record.value(),record.topic());// 手动提交ack,每处理完一条消息提交一次
ack.acknowledge();}}
版权归原作者 方渐鸿 所有, 如有侵权,请联系我们删除。