一.本地连接kafka发送消息:生产者同步发送消息和异步发送消息
导入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
代码:生产者
/*生产者同步发送消息*/
public class MySimpleProducer {
public final static String TOPIC_NAME = "my-replication-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//设置参数
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.133.133:8082,192.168.133.133:9093,192.168.133.133:9094");
//把发送的key从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//创建生产消息的客户端,传入参数
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
//创建消息
//key:作用是决定往那个分区上发,value:具体要发送的消息内容
ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME,"mykeyvalue","hellokafka");
//发送消息,得到消息发送的元数据并输出
Future future = kafkaProducer.send(producerRecord);
RecordMetadata recordMetadata = (RecordMetadata) future.get();
System.out.println("同步发送消息结果:"+"topic"+recordMetadata.topic()+"partition"+recordMetadata.partition()
+"offset"+recordMetadata.offset());
}
}
/*生产者异步发送消息*/
public class MySimpleProducer2 {
public final static String TOPIC_NAME = "my-replication-topic";
public static void main(String[] args) {
//设置参数
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.133.133:8082,192.168.133.133:9093,192.168.133.133:9094");
//把发送的key从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//创建生产消息的客户端,传入参数
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
//创建消息
//key:作用是决定往那个分区上发,value:具体要发送的消息内容
ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"mykeyvalue","hellokafka");
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null){
System.out.println("发送消息失败");
}
if (recordMetadata!=null){
//消息发送的元数据为
System.out.println("异步发送消息结果:"+"topic"+recordMetadata.topic()+"partition"+recordMetadata.partition()
+"offset"+recordMetadata.offset());
}
}
});
}
}
消费者:
本文转载自: https://blog.csdn.net/weixin_61407147/article/details/130759097
版权归原作者 难熬的日子终会过去 所有, 如有侵权,请联系我们删除。
版权归原作者 难熬的日子终会过去 所有, 如有侵权,请联系我们删除。