0


springboot的kafka使用

kafka的一些概念

  • 分组:同一组内的consumer对于队列里的消息只会有一个consumer消费一次(一对一),不同组的consumer对队列里的消息会同时消费(一对多)。
  • 分区:kafka将同一队列的消息存在不同服务器上该队列中(消息分区,避免消息集中到一个服务器上)。
  • 偏移量:分区中的消息的序列号,在每个分区中此偏移量都是唯一的。在这里插入图片描述
  • 分区策略:轮询策略(按顺序轮流将每条数据分配到每个分区中),随机策略(每次都随机的将消息分配到每个分区),按键保存策略(生产者发送数据的时候,可以指定一个key,计算这个key的hashcode值,按照hashcode的值对不同消息进行存储)。
  • 备份:kafka中消息的备份又叫做副本,kafka定义了两种副本:领导者副本(leader),追随者副本(follower)。在这里插入图片描述
  • 备份机制:同步方式,第一种ISR选定方式具有高可靠性,第二种其他follower方式具有高可用性。在这里插入图片描述在这里插入图片描述
  • 发送消息方式:同步方式和异步的差别在于异步加上一个回调函数,即加上了回调函数的就是异步发送消息的方式。
  • 消息确认机制:通过配置文件中ack的值配置。在这里插入图片描述
  • 消息重试次数:通过配置文件中retries的值配置。
  • 消息压缩:通过配置文件中的compression-type的值配置。在这里插入图片描述
  • 消息有序性:topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。
  • 提交偏移量:通过配置文件中enable-auto-commit的值配置手动提交和自动提交。kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)。消费者会往一个叫做 consumer offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡(把消息分区重新均衡的分配给所有可用的消费者,保证大家一起消费)。在这里插入图片描述

配置文件

spring:
kafka:
  bootstrap-servers:192.168.101.125:9092
  producer:
    retries:0
    batch-size:16KB
    buffer-memory:32MB
    acks:1
    key-serializer:org.apache.kafka.common.serialization.StringSerializer
    value-serializer:org.apache.kafka.common.serialization.ByteArraySerializer
    properties:
      linger.ms:5
  consumer:
    # 消费者需要在自己的配置文件里设置为true
    enable:false
    enable-auto-commit:false
    # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    auto-offset-reset: earliest
    # 批量拉取时的最大拉取数量
    max-poll-records:1000
    key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer
    # 默认group-id为应用名称
    group-id: ${spring.application.name}
  topics:
      command: msg_command
    offline-download: dev_websocket_unDownFile

依赖坐标

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

生产者

// 配置文件中设置了key的序列化器StringDeserializer和value的序列化器为ByteArrayDeserializer@AutowiredprivateKafkaTemplate<String,byte[]> kafkaTemplate;@Value("${spring.kafka.topics.offline-download}")privateString topic;// 这里发送的参数是字节类型,因为配置文件中设置了value的序列化器为ByteArrayDeserializer(对象类型的消息通常用json来传输,对象->json字符串->字节数组)
kafkaTemplate.send(topic,JsonUtils.serialize(OfflineDownload.builder().fileId(id).unDown(false).userId(userId).build()).getBytes()).addCallback(success ->// 消息发送成功
                log.debug("kafka sending success, topic:{}, body:{}",
                        topic,JsonUtils.serialize(OfflineDownload.builder().fileId(id).unDown(false).userId(userId).build())), failure ->// 消息发送失败
                log.error("kafka sending failure, topic:{}, body:{}",
                        topic,JsonUtils.serialize(OfflineDownload.builder().fileId(id).unDown(false).userId(userId).build())));

消费者

@KafkaListener(
        topics ={"${spring.kafka.topics.offline-download}",},
        containerFactory ="stringBytesKafkaFactory")publicvoidreceiveUnDownFileMessage(List<ConsumerRecord<String,byte[]>> records,Acknowledgment ack){debugLogMessageSize(records);List<OfflineDownload> unDownFileMessageDtoList =newArrayList<>();for(ConsumerRecord<String,byte[]> consumerRecord : records){try{// 将json数组转成json字符串String msgStr =newString(consumerRecord.value());// json字符串转成对象OfflineDownload unDownFileMessageDto =JsonUtils.deserialize(msgStr,OfflineDownload.class);
            unDownFileMessageDtoList.add(unDownFileMessageDto);}catch(Exception e){errorLogMessage(consumerRecord, e,false);}debugLogMessage(consumerRecord,false);}try{WebSocketServer.dealUnDownFileMessage(unDownFileMessageDtoList);
        ack.acknowledge();}catch(CommitFailedException ce){errorLog(ce, ack);}catch(Exception e){errorLog(e, ack);}}
// 消费对象key是string,value是baye[],因为配置文件中设置了key的序列化器StringDeserializer和value的序列化器为ByteArrayDeserializer@KafkaListener(
        topics ={"${spring.kafka.topics.command}"},
        containerFactory ="stringBytesKafkaFactory")publicvoidreceiveCommandByteMessage(List<ConsumerRecord<String,byte[]>> records,Acknowledgment ack){if(log.isDebugEnabled()){
        log.debug("从kafka拉取事件数据{}条", records.size());}List<byte[]> byteMessage =newArrayList<>();for(ConsumerRecord<String,byte[]> consumerRecord : records){
        byteMessage.add(consumerRecord.value());if(log.isDebugEnabled()){
            log.debug("topic:{}, key:{}, bytes:{}",
                    consumerRecord.topic(),
                    consumerRecord.key(),ByteBufUtil.hexDump(consumerRecord.value()));}}try{
        mqCommandMessageService.dealCommandMessageBatch(byteMessage);
        ack.acknowledge();if(log.isDebugEnabled()){
            log.debug("提交偏移量,提交时间:{}",System.currentTimeMillis());}}catch(Exception e){
        log.error("处理连接事件异常", e);
        ack.nack(0,5*1000L);}}

注:ack.acknowledge()用来确认消息,ack.nack()会确认第一个参数偏移量之前的消息,抛弃后续消息重新返回到队列中。第二个参数时间后再次poll查询未消费消息。

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/Lemon_MY/article/details/129078402
版权归原作者 Lemon_MY 所有, 如有侵权,请联系我们删除。

“springboot的kafka使用”的评论:

还没有评论