0


java操作kafka读写操作

1,前提,kafka 的 server.properties里面开通了

listeners=PLAINTEXT://192.168.137.141:9092
advertised.listeners=PLAINTEXT://192.168.137.141:9092

2,防火墙

systemctl status firewalld
systemctl stop firewalld

3,java代码

生产者

import java.text.SimpleDateFormat;
import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer123 {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Producer123 aa1=new Producer123();
        for(int i=0;i<50;i++) {
            aa1.send1();
            System.out.println(i);
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public void send1() {
        String out = "";
        String topic, msg;
        Properties properties = new Properties();
        KafkaProducer<String, String> kafkaProducer;
        topic = "CHINA";
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.141:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProducer = new KafkaProducer<String, String>(properties);
        Date dNow = new Date( );
        SimpleDateFormat ft = new SimpleDateFormat ("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
        msg = "现在是" + ft.format(dNow);
        System.out.println(msg);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
        kafkaProducer.send(producerRecord);
        kafkaProducer.close();

//    return out;
    }

}

4,消费者

import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Main1 {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        System.out.println("hello");
        Main1 m1=new Main1();
        m1.consumer1();

    }
    public String consumer1() {
        System.out.println("starting......");
        String out = "";
        String topic;
        Properties properties = new Properties();
        KafkaConsumer<String, String> kafkaConsumer;
        topic = "CHINA";
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.141:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "lowLevel");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        int i=0;
        while(i<10){
            i++;
            System.out.println("aaa:"+String.valueOf(i));
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);//3000代表每3秒读一次
            System.out.println(consumerRecords.count());
            if(consumerRecords.count() > 0){
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords){
                    out = consumerRecord.value();
                    System.out.println("receive data: "+out);
                }
//                break;
            }
        }
        return out;    
    }

}

5,依赖项,版本最好和kafka服务端保持一致

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test-kafka</groupId>
  <artifactId>test-kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>test-kafka</name>
  <description>test-kafka</description>
  <dependencies>
          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
  </dependencies>
  <build>
       <plugins>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-compiler-plugin</artifactId>
               <configuration>
                           <source>1.8</source>
                           <target>1.8</target>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>
标签: java kafka 开发语言

本文转载自: https://blog.csdn.net/mteng59101/article/details/126956253
版权归原作者 mteng59101 所有, 如有侵权,请联系我们删除。

“java操作kafka读写操作”的评论:

还没有评论