0


spring boot集成Kafka发送和订阅数据两种方式

spring boot集成Kafka发送和订阅数据两种方式

Kafka安装

MacBook Linux安装Kafka

Linux解压安装Kafka

kafka可视化工具Kafka Tool安装使用

Kafka集群和kafka-manager安装

方式一

maven的pom.xml引入依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

配置文件

spring:kafka:bootstrap-servers: 192.168.1.7:9092producer:retries:3acks:1batch-size:16384properties:linger:ms:0buffer-memory:33554432key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:properties:group:id: defaultConsumerGroup
                session:timeout:ms:120000request:timeout:ms:180000enable-auto-commit:trueauto:commit:interval:ms:1000auto-offset-reset: latest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            max-poll-records:50listener:missing-topics-fatal:falsetype: batch

代码里直接使用: KafkaTemplate, 因为spring启动时会自动把bean对象加载到容器里

发送数据到kafka

@AutowiredprivateKafkaTemplate kafkaTemplate;//发送数据到kafkaprivatevoidsendKafka(String abcd,String efg){Map<String,Object> body =newHashMap<>(8);
        body.put("time",System.currentTimeMillis());
        body.put("abcd", abcd);
        body.put("efg", efg);
        kafkaTemplate.send("test-topic", JSON.toJSONString(body));}

订阅数据

importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.List;@ComponentpublicclassKafkaConsumer{@AutowiredprivateProcessDataComponent processDataComponent;/**
     * 单个topic订阅
     */@KafkaListener(topics ="test-topic1")publicvoidonDeviceSubStatusMessageDevice(List<ConsumerRecord<String,?>> listRecord){process(listRecord);}/**
     * 订阅多个topic
     */@KafkaListener(topics ={"topic1","topic2","topic3"})publicvoidonDeviceMessage(List<ConsumerRecord<String,?>> listRecord){process(listRecord);}privatevoidprocess(List<ConsumerRecord<String,?>> listRecord){
        listRecord.forEach(record->{
            processDataComponent.process(record.key(),record.value()+"");});}}

方式二

maven的pom.xml引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.2</version></dependency>

发送数据到kafka

packagecom.test.kafka.demo;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;publicclassKafkaProducerDemo{publicstaticvoidmain(String[] args){Properties props =newProperties();
        props.put("bootstrap.servers","192.168.1.7:9092");
        props.put("acks","all");
        props.put("retries",0);
        props.put("batch.size",16384);
        props.put("linger.ms",1);
        props.put("buffer.memory",33554432);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <100; i++){
            producer.send(newProducerRecord<String,String>("my-topic",Integer.toString(i),Integer.toString(i)));}

        producer.close();}}

订阅数据

packagecom.test.kafka.demo;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Arrays;importjava.util.Properties;publicclassKafkaComsumerDemo{privatestaticLogger logger =LoggerFactory.getLogger(KafkaComsumerDemo.class);publicstaticvoidmain(String[] args){try{Properties props =newProperties();
            props.put("bootstrap.servers","192.168.1.7:9092");
            props.put("group.id","group-foo1");
            props.put("auto.offset.reset","earliest");//策略1 自动提交,周期性的提交偏移量
            props.put("enable.auto.commit","true");
            props.put("auto.commit.interval.ms","1000");//策略2 consumer.commitSync() //调用commitSync,手动同步ack。每处理完1条消息,commitSync 1次//策略3 consumer.commitASync() //手动异步ack

            props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<String,String>(props);
            kafkaConsumer.subscribe(Arrays.asList("my-topic"));boolean flag =true;while(flag){ConsumerRecords<String,String> records = kafkaConsumer.poll(100);for(ConsumerRecord<String,String>record: records){
                    logger.info("offset = {},key = {},value = {}",record.offset(),record.key(),record.value());}}
            kafkaConsumer.close();
            logger.info("consumer client has been closed");}catch(Exception e){
            logger.error("{}", e.getMessage());}}}
标签: spring boot kafka

本文转载自: https://blog.csdn.net/yinjl123456/article/details/124502869
版权归原作者 beyond阿亮 所有, 如有侵权,请联系我们删除。

“spring boot集成Kafka发送和订阅数据两种方式”的评论:

还没有评论