0


java操作kafka

一.本地连接kafka发送消息:生产者同步发送消息和异步发送消息

导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>

代码:生产者

/*生产者同步发送消息*/
public class MySimpleProducer {

    public final static String TOPIC_NAME = "my-replication-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //设置参数
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.133.133:8082,192.168.133.133:9093,192.168.133.133:9094");

        //把发送的key从字符串序列化为字节数组
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //创建生产消息的客户端,传入参数
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);

        //创建消息
        //key:作用是决定往那个分区上发,value:具体要发送的消息内容
        ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME,"mykeyvalue","hellokafka");

        //发送消息,得到消息发送的元数据并输出
        Future future = kafkaProducer.send(producerRecord);
        RecordMetadata recordMetadata = (RecordMetadata) future.get();
        System.out.println("同步发送消息结果:"+"topic"+recordMetadata.topic()+"partition"+recordMetadata.partition()
        +"offset"+recordMetadata.offset());
    }
}
/*生产者异步发送消息*/
public class MySimpleProducer2 {

    public final static String TOPIC_NAME = "my-replication-topic";

    public static void main(String[] args) {

        //设置参数
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.133.133:8082,192.168.133.133:9093,192.168.133.133:9094");

        //把发送的key从字符串序列化为字节数组
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //创建生产消息的客户端,传入参数
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);

        //创建消息
        //key:作用是决定往那个分区上发,value:具体要发送的消息内容
        ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"mykeyvalue","hellokafka");

        kafkaProducer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e!=null){
                    System.out.println("发送消息失败");
                }
                if (recordMetadata!=null){
                    //消息发送的元数据为
                    System.out.println("异步发送消息结果:"+"topic"+recordMetadata.topic()+"partition"+recordMetadata.partition()
                    +"offset"+recordMetadata.offset());
                }
            }
        });
    }
}

消费者:

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/weixin_61407147/article/details/130759097
版权归原作者 难熬的日子终会过去 所有, 如有侵权,请联系我们删除。

“java操作kafka”的评论:

还没有评论