1,前提,kafka 的 server.properties里面开通了
listeners=PLAINTEXT://192.168.137.141:9092
advertised.listeners=PLAINTEXT://192.168.137.141:9092
2,防火墙
systemctl status firewalld
systemctl stop firewalld
3,java代码
生产者
import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer123 {
public static void main(String[] args) {
// TODO Auto-generated method stub
Producer123 aa1=new Producer123();
for(int i=0;i<50;i++) {
aa1.send1();
System.out.println(i);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void send1() {
String out = "";
String topic, msg;
Properties properties = new Properties();
KafkaProducer<String, String> kafkaProducer;
topic = "CHINA";
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.141:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProducer = new KafkaProducer<String, String>(properties);
Date dNow = new Date( );
SimpleDateFormat ft = new SimpleDateFormat ("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
msg = "现在是" + ft.format(dNow);
System.out.println(msg);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
kafkaProducer.send(producerRecord);
kafkaProducer.close();
// return out;
}
}
4,消费者
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class Main1 {
public static void main(String[] args) {
// TODO Auto-generated method stub
System.out.println("hello");
Main1 m1=new Main1();
m1.consumer1();
}
public String consumer1() {
System.out.println("starting......");
String out = "";
String topic;
Properties properties = new Properties();
KafkaConsumer<String, String> kafkaConsumer;
topic = "CHINA";
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.141:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "lowLevel");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
int i=0;
while(i<10){
i++;
System.out.println("aaa:"+String.valueOf(i));
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);//3000代表每3秒读一次
System.out.println(consumerRecords.count());
if(consumerRecords.count() > 0){
for (ConsumerRecord<String, String> consumerRecord : consumerRecords){
out = consumerRecord.value();
System.out.println("receive data: "+out);
}
// break;
}
}
return out;
}
}
5,依赖项,版本最好和kafka服务端保持一致
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test-kafka</groupId>
<artifactId>test-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>test-kafka</name>
<description>test-kafka</description>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
本文转载自: https://blog.csdn.net/mteng59101/article/details/126956253
版权归原作者 mteng59101 所有, 如有侵权,请联系我们删除。
版权归原作者 mteng59101 所有, 如有侵权,请联系我们删除。