0


大数据-kafka学习(六)——简单代码实现生产者消费者

maven依赖导入

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>2.2.0</version>
    </dependency>
  </dependencies>

生产者

package my.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author :jizhibing
 * @date :Created in 2022/3/31
 * @description:
 */
public class MyKafkaProducer {

    public static void main(String[] args) throws Exception{
        //0 设置配置
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //选择序列化方式
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,  StringSerializer.class.getName());

        //1 :建立连接
        org.apache.kafka.clients.producer.KafkaProducer kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(properties);

        //2:发送消息
        for (int i = 0; i < 100; i++) {
            kafkaProducer.send(new ProducerRecord("first", "helloworld"+i));
            System.out.println("helloworld"+i);
        }
        //3:关闭连接
        kafkaProducer.close();
    }

}

消费者

package my.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author :jizhibing
 * @date :Created in 2022/3/31
 * @description:
 */
public class MyKafkaConsumer {

    public static void main(String[] args) throws Exception{
        //0 设置配置
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //选择反序列化方式
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  StringDeserializer.class.getName());
        //设置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //1 :建立连接
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);

        kafkaConsumer.subscribe(Arrays.asList("first"));
        //2:接收消息
        while (true){
            ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofSeconds(1));
            poll.forEach(dto-> System.out.println(dto));
        }

    }

}

消费者结果展示

ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1649949091174, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld1)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 1, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld4)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 2, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld7)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 3, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld10)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 4, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld13)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 5, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld16)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 6, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld19)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 7, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld22)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 8, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld25)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 9, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld28)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 10, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld31)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 11, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld34)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 12, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld37)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 13, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld40)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 14, CreateTime = 1649949091178, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld43)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 15, CreateTime = 1649949091178, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld46)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 16, CreateTime = 1649949091178, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld49)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 17, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld52)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 18, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld55)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 19, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld58)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 20, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld61)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 21, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld64)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 22, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld67)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 23, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld70)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 24, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld73)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 25, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld76)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 26, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld79)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 27, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld82)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 28, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld85)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 29, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld88)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 30, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld91)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 31, CreateTime = 1649949091185, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld94)
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 32, CreateTime = 1649949091185, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld97)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1649949091174, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld2)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 1, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld5)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld8)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 3, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld11)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 4, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld14)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld17)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 6, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld20)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 7, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld23)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 8, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld26)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 9, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld29)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 10, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld32)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 11, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld35)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 12, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld38)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 13, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld41)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 14, CreateTime = 1649949091178, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld44)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 15, CreateTime = 1649949091178, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld47)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 16, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld50)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 17, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld53)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 18, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld56)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 19, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld59)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 20, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld62)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 21, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld65)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 22, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld68)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 23, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld71)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 24, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld74)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 25, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld77)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 26, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld80)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 27, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld83)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 28, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld86)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 29, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld89)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 30, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld92)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 31, CreateTime = 1649949091185, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld95)
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 32, CreateTime = 1649949091185, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld98)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1649949091165, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld0)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld3)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld6)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld9)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld12)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1649949091175, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld15)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld18)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld21)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld24)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1649949091176, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld27)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld30)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld33)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld36)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 13, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld39)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 14, CreateTime = 1649949091177, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld42)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 15, CreateTime = 1649949091178, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld45)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 16, CreateTime = 1649949091178, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld48)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 17, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld51)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 18, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld54)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld57)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 20, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld60)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1649949091182, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld63)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld66)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 23, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld69)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 24, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld72)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 25, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld75)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 26, CreateTime = 1649949091183, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld78)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 27, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld81)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 28, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld84)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 29, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld87)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 30, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld90)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 31, CreateTime = 1649949091184, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld93)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1649949091185, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld96)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 33, CreateTime = 1649949091185, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld99)
标签: java kafka 大数据

本文转载自: https://blog.csdn.net/a6470831/article/details/124184555
版权归原作者 机智兵 所有, 如有侵权,请联系我们删除。

“大数据-kafka学习(六)&mdash;&mdash;简单代码实现生产者消费者”的评论:

还没有评论