0


二、使用java简单操作kafka

系列文章目录

一、kafka基本原理
二、使用java简单操作kafka
三、简单了解kafka设计原理


文章目录


一、搭建一个kafka的demo

创建一个maven的项目

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.引入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.41</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.1.3</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.1.1</version></dependency></dependencies>

3.创建对应的类

1.订单类

packagecom.july.kafka;publicclassOrder{privateInteger orderId;privateInteger productId;privateInteger productNum;privateDouble orderAmount;publicOrder(){}publicOrder(Integer orderId,Integer productId,Integer productNum,Double orderAmount){super();this.orderId = orderId;this.productId = productId;this.productNum = productNum;this.orderAmount = orderAmount;}publicIntegergetOrderId(){return orderId;}publicvoidsetOrderId(Integer orderId){this.orderId = orderId;}publicIntegergetProductId(){return productId;}publicvoidsetProductId(Integer productId){this.productId = productId;}publicIntegergetProductNum(){return productNum;}publicvoidsetProductNum(Integer productNum){this.productNum = productNum;}publicDoublegetOrderAmount(){return orderAmount;}publicvoidsetOrderAmount(Double orderAmount){this.orderAmount = orderAmount;}}

二、生产者

  • BOOTSTRAP_SERVERS_CONFIG 配置kafka集群的机器地址

一些比较重要的参数:

  1. ACKS_CONFIG 发出消息持久化机制参数,上篇我们知道kafka集群配置了多个副本之后,读写是在leader进行的,但是需要同步follower副本,那么这个参数其实就是等待配置数量写入成功的副本个数,如果为1则是leader写入成功就可以继续发送下一条消息,2则为一个leader,和同步成功一个follower成功后可以发送一下条消息。以此类推。如果是全部则为-1或者all
  2. RETRIES_CONFIG 失败之后重试次数
  3. RETRY_BACKOFF_MS_CONFIG 重试间隔(毫秒)
  4. BATCH_SIZE_CONFIG 缓存区大小(字节Bytes),客户端不是生产一条消息就发送给kafka一条消息,客户端有一个缓存区先把消息放到这个缓存区,然后达到配置的大小,把这一批发送到kafka。那如果过了一年我的缓存区还没达到我配置的大小怎么办。请往下看。
  5. LINGER_MS_CONFIG 如果缓存区没满,也按照这个配置(毫秒),每隔这个时间就发送一次缓存区中的消息,不管满没满。

2-1发送到指定分区,等待消息发送成功(会阻塞)

packagecom.july.kafka;importcom.alibaba.fastjson.JSON;importorg.apache.kafka.clients.producer.*;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.ExecutionException;publicclassMsgProducer{privatefinalstaticStringTOPIC_NAME="test4";publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{Properties props =newProperties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.66:9092,192.168.220.66:9093,192.168.220.66:9094");/*
         发出消息持久化机制参数
        (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
        (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一
             条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
        (3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略
            会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
         */
        props.put(ProducerConfig.ACKS_CONFIG,"1");/*
        发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在
        接收者那边做好消息接收的幂等性处理
        */
        props.put(ProducerConfig.RETRIES_CONFIG,3);//重试间隔设置
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);/*
        kafka本地线程会从缓冲区取数据,批量发送到broker,
        设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
        */
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);/*
        默认值是0,意思就是消息必须立即被发送,但这样会影响性能
        一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
        如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
        */
        props.put(ProducerConfig.LINGER_MS_CONFIG,10);//把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String,String> producer =newKafkaProducer<String,String>(props);int msgNum =5;for(int i =1; i <= msgNum; i++){Order order =newOrder(i,100+ i,1,1000.00);//指定发送分区ProducerRecord<String,String> producerRecord =newProducerRecord<String,String>(TOPIC_NAME,0, order.getOrderId().toString(),JSON.toJSONString(order));//等待消息发送成功的同步阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步方式发送消息结果:"+"topic-"+ metadata.topic()+"|partition-"+ metadata.partition()+"|offset-"+ metadata.offset()+"|order-"+order.getProductId());}
        producer.close();}}

在这里插入图片描述
进行消费

bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.66:9092--from-beginning --topic test4

在这里插入图片描述

2-2发送到指定分区,异步方式

packagecom.july.kafka;importcom.alibaba.fastjson.JSON;importorg.apache.kafka.clients.producer.*;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;publicclassMsgProducer{privatefinalstaticStringTOPIC_NAME="test4";publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{Properties props =newProperties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.66:9092,192.168.220.66:9093,192.168.220.66:9094");/*
         发出消息持久化机制参数
        (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
        (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一
             条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
        (3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略
            会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
         */
        props.put(ProducerConfig.ACKS_CONFIG,"1");/*
        发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在
        接收者那边做好消息接收的幂等性处理
        */
        props.put(ProducerConfig.RETRIES_CONFIG,3);//重试间隔设置
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);/*
        kafka本地线程会从缓冲区取数据,批量发送到broker,
        设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
        */
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);/*
        默认值是0,意思就是消息必须立即被发送,但这样会影响性能
        一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
        如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
        */
        props.put(ProducerConfig.LINGER_MS_CONFIG,10);//把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String,String> producer =newKafkaProducer<String,String>(props);int msgNum =5;finalCountDownLatch countDownLatch =newCountDownLatch(msgNum);for(int i =1; i <= msgNum; i++){Order order =newOrder(i,100+ i,1,1000.00);//指定发送分区ProducerRecord<String,String> producerRecord =newProducerRecord<String,String>(TOPIC_NAME,0, order.getOrderId().toString(),JSON.toJSONString(order));//异步回调方式发送消息
            producer.send(producerRecord,newCallback(){publicvoidonCompletion(RecordMetadata metadata,Exception exception){//回调产生异常,打印异常if(exception !=null){System.err.println("发送消息失败:"+ exception.getStackTrace());}if(metadata !=null){System.out.println("同步方式发送消息结果:"+"topic-"+ metadata.topic()+"|partition-"+ metadata.partition()+"|offset-"+ metadata.offset()+"|order-"+order.getProductId());}
                    countDownLatch.countDown();}});}

        countDownLatch.await(5,TimeUnit.SECONDS);
        producer.close();}}

2-3其余两种情况,不指定分区的同步异步发送消息(通过对key进行某种算法来计算发送到那个分区)

packagecom.july.kafka;importcom.alibaba.fastjson.JSON;importorg.apache.kafka.clients.producer.*;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;publicclassMsgProducer{privatefinalstaticStringTOPIC_NAME="test4";publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{Properties props =newProperties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.66:9092,192.168.220.66:9093,192.168.220.66:9094");/*
         发出消息持久化机制参数
        (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
        (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一
             条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
        (3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略
            会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
         */
        props.put(ProducerConfig.ACKS_CONFIG,"1");/*
        发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在
        接收者那边做好消息接收的幂等性处理
        */
        props.put(ProducerConfig.RETRIES_CONFIG,3);//重试间隔设置
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);/*
        kafka本地线程会从缓冲区取数据,批量发送到broker,
        设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
        */
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);/*
        默认值是0,意思就是消息必须立即被发送,但这样会影响性能
        一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
        如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
        */
        props.put(ProducerConfig.LINGER_MS_CONFIG,10);//把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String,String> producer =newKafkaProducer<String,String>(props);int msgNum =5;finalCountDownLatch countDownLatch =newCountDownLatch(msgNum);for(int i =1; i <= msgNum; i++){Order order =newOrder(i,100+ i,1,1000.00);//指定发送分区//            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME//                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNumProducerRecord<String,String> producerRecord =newProducerRecord<String,String>(TOPIC_NAME, order.getOrderId().toString(),JSON.toJSONString(order));//等待消息发送成功的同步阻塞方法//            RecordMetadata metadata = producer.send(producerRecord).get();//            System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"//                    + metadata.partition() + "|offset-" + metadata.offset()+"|order-"+order.getProductId());//异步回调方式发送消息
            producer.send(producerRecord,newCallback(){publicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception !=null){System.err.println("发送消息失败:"+ exception.getStackTrace());}if(metadata !=null){System.out.println("异步方式发送消息结果:"+"topic-"+ metadata.topic()+"|partition-"+ metadata.partition()+"|offset-"+ metadata.offset()+"|order-"+order.getProductId());}
                    countDownLatch.countDown();}});}

        countDownLatch.await(5,TimeUnit.SECONDS);
        producer.close();}}

源码中指定分区计算分区
在这里插入图片描述

三、消费者

一些比较重要的参数配置:

  1. GROUP_ID_CONFIG 消费组名
  2. ENABLE_AUTO_COMMIT_CONFIG 是否自动提交offerset,true自动提交,false不自动提交(要提交的话需要代码中自己写提交)。
  3. AUTO_COMMIT_INTERVAL_MS_CONFIG 自动提交间隔(毫秒)— (会有丢消息的情况:如果设置间隔为1s,消费者拉取了一批消息,假如说有5条消息,处理完第一条消息,耗时2s,但是在1s的时候,已经自动提交offerset了,那么这时消费者炸了,余下的四条消息则没有被消费,而offerset已经被提交了。 重复消费情况:设置时间间隔为1s,但是你拉写的消费消息业务贼拉好,0.1s就处理完了这批消息。但是消费者又炸了,那么就是已经消费了,但是没有自动提交。 一般情况下都是设置手动提交offerset,而不是自动提交)
  4. AUTO_OFFSET_RESET_CONFIG 默认是latest:只消费自己启动之后产生的新消息。 earliest:如果是第一次设置消费组,没有offset的时候,第一次会从头开始消费,消费完之后,你把它停了,再启动也不会从头消费了,因为之前已经产生了offset相关信息。
  5. HEARTBEAT_INTERVAL_MS_CONFIG 指定时间向kafka发送心跳
  6. SESSION_TIMEOUT_MS_CONFIG 允许指定最大时间内没有收到心跳,超过这个时间消费者会被踢出消费组
  7. MAX_POLL_RECORDS_CONFIG 一次最多拉取多少消息
  8. MAX_POLL_INTERVAL_MS_CONFIG 两次拉取消息的时间超过配置时间,则会被踢出消费组。

一些比较重要的方法:

  1. subscribe() 指定消费的主题,可以是多个主题(不指定分区)。
  2. assign() 消费指定分区(与subscribe方法不能同时使用)
  3. seekToBeginning() 设置从头开始消费,(与assign配合使用)
  4. seek() 指定offset消费(assign配合使用)
  5. commitSync() 手动同步提交offerset,只有写入成功之后,才会执行它下边的代码。拉一批消息提交一批offerset,也支持单个单个提交。
  6. commitAsync() 手动异步提交
  7. poll() 长轮询去拉取消息,传参为时间(毫秒)。再指定传参的时间里,会不断的进行拉取消息。如果拉到消息则停止拉取,否则这个时间周期内,会不断的重试拉取。如果还是没有拉到最终返回空。
  8. offsetsForTimes() 指定时间点消费

增加一个配置文件,为了防止启动消费者后,控制台一直打印日志.
在这里插入图片描述

<?xml version="1.0" encoding="UTF-8"?><configurationscan="true"scanPeriod="60 seconds"debug="false"><contextName>logback</contextName><!--输出到控制台--><appendername="console"class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><appendername="logFile"class="ch.qos.logback.core.rolling.RollingFileAppender"><Prudent>true</Prudent><rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><FileNamePattern>
                poslog/%d{yyyy-MM-dd}/%d{yyyy-MM-dd}.log
            </FileNamePattern></rollingPolicy><layoutclass="ch.qos.logback.classic.PatternLayout"><Pattern>
                %d{yyyy-MM-dd HH:mm:ss} -%msg%n
            </Pattern></layout></appender><appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender"><layoutclass="ch.qos.logback.classic.PatternLayout"><Pattern>
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
            </Pattern></layout></appender><loggername="org.apache.kafka"level="OFF"><appender-refref="STDOUT"/></logger><rootlevel="INFO,ERROR"><appender-refref="console"/><appender-refref="logFile"/></root></configuration>
packagecom.july.kafka;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Arrays;importjava.util.Properties;publicclassMsgConsumer{privatefinalstaticStringTOPIC_NAME="test4";privatefinalstaticStringCONSUMER_GROUP_NAME="testGroup";publicstaticvoidmain(String[] args)throwsException{Properties props =newProperties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.66:9092,192.168.220.66:9093,192.168.220.66:9094");// 消费分组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 是否自动提交offset,默认就是true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 自动提交offset的间隔时间
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");/*
        当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
        latest(默认) :只消费自己启动之后发送到主题的消息
        earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
        *///props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");/*
        consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将
        rebalance方案下发给consumer,这个时间可以稍微短一点
        */
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);/*
        服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,
        对应的Partition也会被重新分配给其他consumer,默认是10秒
        */
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);//一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);/*
        如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
        会将其踢出消费组,将分区分配给别的consumer消费
        */
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消费指定分区//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));//消息回溯消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
        consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*///指定offset消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
        consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*///从指定时间点开始消费/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
        //从1小时前开始消费
        long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
        Map<TopicPartition, Long> map = new HashMap<>();
        for (PartitionInfo par : topicPartitions) {
            map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
        }
        Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp value = entry.getValue();
            if (key == null || value == null) continue;
            Long offset = value.offset();
            System.out.println("partition-" + key.partition() + "|offset-" + offset);
            System.out.println();
            //根据消费里的timestamp确定offset
            if (value != null) {
                consumer.assign(Arrays.asList(key));
                consumer.seek(key, offset);
            }
        }*/while(true){/*
             * poll() API 是拉取消息的长轮询
             */ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                        record.offset(), record.key(), record.value());}if(records.count()>0){// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
                consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑/*consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.err.println("Commit failed for " + offsets);
                            System.err.println("Commit failed exception: " + exception.getStackTrace());
                        }
                    }
                });*/}}}}

四、springboot整合kafka

pom

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>spring-boot-kafka</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>spring-boot-kafka</name><description>Demo project for Spring Boot</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.2.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

生产者

packagecom.kafka;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassKafkaController{privatefinalstaticStringTOPIC_NAME="test4";@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@RequestMapping("/send")publicvoidsend(){
        kafkaTemplate.send(TOPIC_NAME,0,"key","产生的消息");}}

消费者

packagecom.kafka;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.stereotype.Component;@ComponentpublicclassMyConsumer{/**
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     *             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     *             @TopicPartition(topic = "topic2", partitions = "0",
     *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     *     },concurrency = "6")
     *  //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
     * @param record
     */@KafkaListener(topics ="test4",groupId ="zhugeGroup")publicvoidlistenZhugeGroup(ConsumerRecord<String,String> record,Acknowledgment ack){String value = record.value();System.out.println(value);System.out.println(record);//手动提交offset//ack.acknowledge();}//配置多个消费组/*@KafkaListener(topics = "test4",groupId = "tulingGroup")
    public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        ack.acknowledge();
    }*/}

配置文件application.yml

server:port:8080spring:kafka:bootstrap-servers: 192.168.220.66:9092,192.168.220.66:9093,192.168.220.66:9094producer:# 生产者retries:3# 设置大于0的值,则客户端会将发送失败的记录重新发送batch-size:16384buffer-memory:33554432acks:1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:group-id: default-group
      enable-auto-commit:falseauto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE

启动类

packagecom.kafka;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclassApplication{publicstaticvoidmain(String[] args){SpringApplication.run(Application.class, args);}}

总结

这里只是简单了解一下kafka常用的参数,详细了解这些参数还是要去参考kafka的官方文档。

标签: kafka java 大数据

本文转载自: https://blog.csdn.net/xiaobai_july/article/details/127823456
版权归原作者 叫我柒月 所有, 如有侵权,请联系我们删除。

“二、使用java简单操作kafka”的评论:

还没有评论