0


Kafka 使用java实现,快速入门

一、kafka的生产者和消费者

1. 生产者发送消息的流程

2. 消费者接收消息的流程

二、 java 代码实现

1. 添加依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
        </dependency>

2. 实现生产者

public class NormalProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        //    1.配置生产者启动的关键属性参数

        //    1.1    BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
        //    1.2    CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
        //    1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
        //    Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
        //    A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
        //    字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
        //    KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //    VALUE: 实际发送消息的内容
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //    2.创建kafka生产者对象 传递properties属性参数集合
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for(int i = 0; i <10; i ++) {
            //    3.构造消息内容
            User user = new User("00" + i, "张三");
            ProducerRecord<String, String> record =
                    //    arg1:topic , arg2:实际的消息体内容,quick_start 是 topic 名称
                    new ProducerRecord<String, String>("quick_start",
                            JSON.toJSONString(user));

            //    4.发送消息
            producer.send(record);
        }

        //    5.关闭生产者
        producer.close();

    }
}

其中的 User 对象为:

public class User {

    private String id;
    
    private String name;

    public User() {
    }

    public User(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

3. 实现消费者

public class NormalConsumer {

    public static void main(String[] args) {

        //    1. 配置属性参数
        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");

        //    org.apache.kafka.common.serialization.StringDeserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //    非常重要的属性配置:与我们消费者订阅组有关系
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
        //    常规属性:会话连接超时时间
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        //    消费者提交offset: 自动提交 & 手工提交,默认是自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

        //    2. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //    3. 订阅你感兴趣的主题:quick_start
        consumer.subscribe(Collections.singletonList("quick_start"));

        System.err.println("quickstart consumer started...");

        try {
            //    4.采用拉取消息的方式消费数据
            while(true) {
                //    等待多久拉取一次消息
                //    拉取TOPIC_QUICKSTART主题里面所有的消息
                //    topic 和 partition是 一对多的关系,一个topic可以有多个partition
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                //    因为消息是在partition中存储的,所以需要遍历partition集合
                for(TopicPartition topicPartition : records.partitions()) {
                    //    通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
                    //    获取TopicPartition对应的主题名称
                    String topic = topicPartition.topic();
                    //    获取当前topicPartition下的消息条数
                    int size = partitionRecords.size();

                    System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
                            topic,
                            topicPartition.partition(),
                            size));

                    for(int i = 0; i < size; i++) {
                        ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
                        //    实际的数据内容
                        String value = consumerRecord.value();
                        //    当前获取的消息偏移量
                        long offset = consumerRecord.offset();
                        //    ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset+1
                        //    表示下一次从什么位置(offset)拉取消息
                        long commitOffser = offset + 1;
                        System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
                                value, offset, commitOffser));
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 测试结果

生产者发送的消息在消费者端可以正常接收:

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/Qynwang/article/details/130609499
版权归原作者 Java知者 所有, 如有侵权,请联系我们删除。

“Kafka 使用java实现,快速入门”的评论:

还没有评论