1.引入依赖(pox.xml文件)
<dependencies> <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.2</version>
</dependency>
</dependencies>
2.创建java类
3.配置运行属性
//连接的服务器
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//关联自定义分区器
//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");
4.创建生产者对象
键入new KafkaProducer<>(),光标置于括号内CTRL+P可以显示需要对象为properties;
键入new Properties().var 回车,键入new KafkaProducer<>(properties).var 回车,选择变量名
5.发送消息并返回发送结果
键入KafkaProducer.send(),提示需要对象ProducerRecord;键入topic名(order)和要发送的信息(“0000”+i),new Callback()回车会弹出需要重写的抽象类,补全返回条件、需要返回的信息即可实现抽象类;
e == null 表示消息全部发送完毕;
6.关闭资源
KafkaProducer.close();
7.运行查看结果
运行:
可以看到有返回信息;
另开窗口查看发送结果
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic order
信息发送成功;
8.完整代码
package com.ljr.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
/关联自定义分区器
// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for(int i =0; i < 3; i++){
kafkaProducer.send(new ProducerRecord<>("customers", "LiSi" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("topic:" + recordMetadata.topic() + " partition:" + recordMetadata.partition());
}
}
});
}
kafkaProducer.close();
}
}
版权归原作者 独孤雨鸿 所有, 如有侵权,请联系我们删除。