0


Kafka-Java客户端数据生产流程解析,从发送类型实现代码到序列化器实现代码!

// 生产者会从给定的 broker 里查找到其它 broker 的信息。——建议至少提供两个 broker 的信息,因为一旦其中一个宕机,生产者仍然能够连接到集群上。
props.put(“bootstrap.servers”, brokerList);
// 将 key 转换为字节数组的配置,必须设定为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,
// 生产者会用这个类把键对象序列化为字节数组。
// ——kafka 默认提供了 StringSerializer和 IntegerSerializer、 ByteArraySerializer。当然也可以自定义序列化器。
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
// 和 key.serializer 一样,用于 value 的序列化
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
// 用来设定KafkaProducer对应的客户端ID,默认为空,如果不设置KafkaProducer会自动 生成一个非空字符串。
// 内容形式如:“producer-1”
props.put(“client.id”, “producer.client.id.demo”);
return props;
}

Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// KafkaProducer<String, String> producer = new KafkaProducer<>(props,
// new StringSerializer(), new StringSerializer());
//生成 ProducerRecord 对象,并制定 Topic,key 以及 value
ProducerRecord<String, String> record = new ProducerRecord<>(topic, “hello, Kafka!”);
try {

// 发送消息
producer.send(record);
}

3.发送类型

发送即忘记

producer.send(record)

同步发送

//通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应
//如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量
// 如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理
producer.send(record).get();

异步发送

producer.send(record, new Callback() {

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception == null) {

System.out.println(metadata.partition() + “:” + metadata.offset());
}
}
});

4.序列化器

消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。

Kafka 提供了默认的字符串序列化器(

org.apache.kafka.common.serialization.StringSerializer

),还有整型(

IntegerSerializer

)和字节数组(

BytesSerializer

标签: kafka java linq

本文转载自: https://blog.csdn.net/2401_84415615/article/details/137942838
版权归原作者 2401_84415615 所有, 如有侵权,请联系我们删除。

“Kafka-Java客户端数据生产流程解析,从发送类型实现代码到序列化器实现代码!”的评论:

还没有评论