0


【深入理解Kafka系列】 第二章 生产者

  生产者就是负责向Kafka发送消息的应用程序。Kafka一共两个大版本的客户端,第一个是开源之处使用Scala编写的客户端;第二个是0.9.x版本开始推出的java编写的客户端。

1、客户端开发

一个正常的生产逻辑需要以下几个步骤:

(1)配置生产者客户端参数及创建相应的生产者实例。

(2)构建待发送的消息

(3)发送消息

(4)关闭生产者实例

需要单独说明下构建消息的ProducerRecord,它包含了多个属性,定义如下:

public class ProducerRecord<K, V> { 
private final String topic; // 主题
private final Integer partit on //分区号
private nal Headers headers; // 消息头部
private final K key; //键
private nal V value ; //值
private nal Long timestamp ; // 消息的时间戳
//省略其他成员方法和构造方法
}
    topic和partition 字段分别代表消息要发往的主题和分区号。key 是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个 key以让消息再进行二次归类,同一个 key 的消息会被划分到同一个分区 中。 value 是指消息体,一般不为空,如果为空则表示特定的消息一一墓碑消息。 timestamp 是指消息的时间戳,它有 CreateTime LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

1.1、必要的参数

    创建真正的生产者实例前需要配置相应的参数,在 Kafka 生产者客户端 KafkaProducer 中有3个参数是必填的。
  • bootstrap.servers :该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 hostl:portl,hos t2:port2 ,可以设置1个或多个地址,中间以逗号隔开,注意这里并非需要所有的 broker址,因为生产者会从给定的 broker 里查找到其他 broker 的信息 ,建议设置2个。
  • key.serializer和value.serializer: broker 端接收的消息必须以字节数组 (byte [])的形式存在。key.serializer value .seriaizer 这两个参数分别用来指定 key、value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名 。

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

1.2、发送消息

KafkaProducer的send()方法并非是void类型,而是Future<RecordMetadata>类型,send()
方法有2个重载方法,具体定义如下:

publicFuture<RecordMetadata>send(ProducerRecord<K,V>record)
publicFuture<RecordMetadata>send(ProducerRecordcK,V> record,Callback callback)

发送消息主要有三种模式:**发后即忘(fire-and-forget)、同步(sync)及异步(async)**。

  • 发后既忘:直接调用send()方法,它只管往Kafka发送消息不关系是否到达。
  • 同步发送:可以利用返回的Future对象实现,示例如下:
try{
    producer.send(record).get();
} catch (ExecutionException|InterruptedExceptione) {
    e.printStackTrace();
}

实际上 send()方法本身就是异步的 , send()方法返回的 Future 对象可以使调用方稍后获得发送的结果。

try {
    Future<RecordMetadata> future= producer.send(record) ;
    RecordMetadata metadata =future.get();
} catch (ExecutionException I InterruptedException e) {

}

可以获取到一个RecordMetadata对象,对象中包含了消息的一些元数据信息,比如消息的主题、分区号、分区中偏移量等。

  • 异步发送:一般是在send()方法里指定一个Callback的回调函数,kafka在返回响应时调用该函数实现异步的发送确认。

KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。

  • 常见的可重试异常:NetworkException、LeaderNotAvailableException等,对于可重试的异常。 如果配置了retries参数,如果只要在规定的重试次数内自行恢复,就不会抛出异常。retries默认值为0。
  • 不可重试异常比如RecordToolLargeExcetption,表示发送的消息太大,KafkaProducer不会进行重试,直接抛出异常。

1.3、序列化

  生产者需要用序列化器( Serializer)把对象转换成字节数组才能通过网络发送给 Kafka,而消费者需要使用反序列化器(Deserializer)把从Kafka收到的字节数组转换成相应的对象。生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。

1.4、分区器

  如果消息ProducerRecord中指定了partition字段,那么就不需要分区器,因为partition就是所要发往的分区号,如果没有指定,就需要依赖分区器,根据key这个字段来计算partition的值。

  默认分区器DefaultPartitioner的主要逻辑:如果key不为null,那么就对key进行哈希根据得到的哈希值计算分区号,如果key为null,那么消息以轮询的方式发往主题内的各个可用分区**。**

1.5、生产者拦截器

  生产者拦截器用来在消息发送前做一些准备工作,如按照某个规则过滤不符合消息、修改消息内容、计数等。

  生产者拦截器的实现:自定义实现org.apache.kafka.clients.producer.Producerlnterceptor接口。接口主要包含3个方法:
public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record );
public void onAcknowledgement(RecordMetadata metadata , Excepti on exception );
public void close() ;

2、原理分析

2.1、整体架构

   整个生产者客户端由两个线程协调运行,这两个线程分别为**主线程和 Sender 线程** (发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator,也称为消息收集器〉中。 Sender 线程负责从RecordAccumulator 中 获取消息并将其发送到 Kafka 中 。
   **RecordAccumulator **主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 。
   主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是**ProducerBatch**。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时 ,从双端队列的头部读取。

2.2、重要的生产者参数

· acks

指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks参数有3种类型的值。

  • acks = 1默认值即为 1。生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应 。 acks 设置为1,是消息可靠性和吞吐量之 间的折中方案。
  • acks = 0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下, acks设置为0可以达到到最大的吞吐量。
  • acks =-1或 acks =all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。

· retries和retry.backoff.ms:

retries参数用来配置生产者重试的次数,默认值为0。

重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100 , 它用来设定两次重试之间的时间间隔,避免无效的频繁重试。

Kafka保证同一分区中的消息有序:
一般而言,在需要保证消息顺序的场合建议把参数max.in.flght.requestser.connection 配置为 1,而不是把 retries配置为 0, 不过这样也会影响整体的吞吐。

max.in.flght.requestser.connection:生产者在收到服务器响应之前可以发送多少消息;值越大,吞吐量越大(值为1可以保证消息按发送顺序写入服务器,即使发生了重试)。

·compression.type

指定消息的压缩方式,默认值为"none"。默认情况下消息不会压缩。可以配置为"gzip"等。消息压缩是一时间换空间的优化方式,如果对时延有要求,则不推荐压缩。

· linge.ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入
ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过
linger.ms 值时发迭出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞
吐量。

部分其他客户端参数:

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/u013308490/article/details/127116085
版权归原作者 代码搬运工. 所有, 如有侵权,请联系我们删除。

“【深入理解Kafka系列】 第二章 生产者”的评论:

还没有评论