spring boot集成Kafka发送和订阅数据两种方式
Kafka安装
MacBook Linux安装Kafka
Linux解压安装Kafka
kafka可视化工具Kafka Tool安装使用
Kafka集群和kafka-manager安装
方式一
maven的pom.xml引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
配置文件
spring:kafka:bootstrap-servers: 192.168.1.7:9092producer:retries:3acks:1batch-size:16384properties:linger:ms:0buffer-memory:33554432key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:properties:group:id: defaultConsumerGroup
session:timeout:ms:120000request:timeout:ms:180000enable-auto-commit:trueauto:commit:interval:ms:1000auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records:50listener:missing-topics-fatal:falsetype: batch
代码里直接使用: KafkaTemplate, 因为spring启动时会自动把bean对象加载到容器里
发送数据到kafka
@AutowiredprivateKafkaTemplate kafkaTemplate;//发送数据到kafkaprivatevoidsendKafka(String abcd,String efg){Map<String,Object> body =newHashMap<>(8);
body.put("time",System.currentTimeMillis());
body.put("abcd", abcd);
body.put("efg", efg);
kafkaTemplate.send("test-topic", JSON.toJSONString(body));}
订阅数据
importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.List;@ComponentpublicclassKafkaConsumer{@AutowiredprivateProcessDataComponent processDataComponent;/**
* 单个topic订阅
*/@KafkaListener(topics ="test-topic1")publicvoidonDeviceSubStatusMessageDevice(List<ConsumerRecord<String,?>> listRecord){process(listRecord);}/**
* 订阅多个topic
*/@KafkaListener(topics ={"topic1","topic2","topic3"})publicvoidonDeviceMessage(List<ConsumerRecord<String,?>> listRecord){process(listRecord);}privatevoidprocess(List<ConsumerRecord<String,?>> listRecord){
listRecord.forEach(record->{
processDataComponent.process(record.key(),record.value()+"");});}}
方式二
maven的pom.xml引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.2</version></dependency>
发送数据到kafka
packagecom.test.kafka.demo;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;publicclassKafkaProducerDemo{publicstaticvoidmain(String[] args){Properties props =newProperties();
props.put("bootstrap.servers","192.168.1.7:9092");
props.put("acks","all");
props.put("retries",0);
props.put("batch.size",16384);
props.put("linger.ms",1);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <100; i++){
producer.send(newProducerRecord<String,String>("my-topic",Integer.toString(i),Integer.toString(i)));}
producer.close();}}
订阅数据
packagecom.test.kafka.demo;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Arrays;importjava.util.Properties;publicclassKafkaComsumerDemo{privatestaticLogger logger =LoggerFactory.getLogger(KafkaComsumerDemo.class);publicstaticvoidmain(String[] args){try{Properties props =newProperties();
props.put("bootstrap.servers","192.168.1.7:9092");
props.put("group.id","group-foo1");
props.put("auto.offset.reset","earliest");//策略1 自动提交,周期性的提交偏移量
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");//策略2 consumer.commitSync() //调用commitSync,手动同步ack。每处理完1条消息,commitSync 1次//策略3 consumer.commitASync() //手动异步ack
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<String,String>(props);
kafkaConsumer.subscribe(Arrays.asList("my-topic"));boolean flag =true;while(flag){ConsumerRecords<String,String> records = kafkaConsumer.poll(100);for(ConsumerRecord<String,String>record: records){
logger.info("offset = {},key = {},value = {}",record.offset(),record.key(),record.value());}}
kafkaConsumer.close();
logger.info("consumer client has been closed");}catch(Exception e){
logger.error("{}", e.getMessage());}}}
版权归原作者 beyond阿亮 所有, 如有侵权,请联系我们删除。