本篇我们将从 Kafka 生产者的设计和组件讲起,学习如何使用 Kafka 生产者。
将演示如何创建 KafkaProducer 和 ProducerRecords 对象、如何将记录发送给 Kafka,以及如何处理Kafka 返回的错误,然后介绍用于控制生产者行为的重要配置选项,最后深入探讨如何使用不同的分区方法和序列化器,以及如何自定义序列化器和分区器。
生产者概览
很多情况下我们需要往 Kafka 写入消息,然而不同的场景对写入消息的要求也不一样,比如:是否允许消息丢失?是否允许重复消息?是否有严格的延迟和吞吐量要求?
不同的场景对上述要求往往都是不一样的。
因此,不同的使用场景对生产者 API 的使用和配置会有直接的影响。尽管生产者 API 使用起来很简单,但消息的发送过程还是有点复杂的。
Kafka发送消息的主要步骤如下图所示:
- 创建包含目标主题和要发送的内容的ProducerRecord对象,还可以指定键或分区。在发送ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
- 然后数据被传给分区器。若之前在ProducerRecord对象内指定了分区,则直接把指定的分区返回。如果没有指定分区,则分区器会根据ProducerRecord对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。
- 紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker上。
- 服务器在收到消息时会返回一个响应。 1. 如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。2. 如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
创建Kafka生产者
向Kafka 写入消息的第一步是要创建一个生产者对象,并设置一些属性。
Kafka 生产者有 3个必选的属性。
- bootstrap.servers
该属性指定 broker 的地址清单,地址的格式为 host:port。
清单里不需要包含所有的broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。
不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
- key.serializer
broker 希望接收到的消息的键和值都是字节数组。
因此生产者需要知道如何把这些 Java 对象转换成字节数组。
key.serializer 必须被设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了 ByteArraySerializer(这个只做很少的事情)、StringSerializer 和 IntegerSerializer,因此,如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。
注意,key.serializer 是必须设置的,就算你打算只发送值内容。
- value.serializer
与 key.serializer 一样,value.serializer 指定的类会将值序列化。
如果键和值都是字符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。
下面用实际代码演示如何创建一个新的生产者(只指定了必要的属性,其他使用默认设置):
privateProperties kafkaProps =newProperties();// 1
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 2
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer =newKafkaProducer<String,String>(kafkaProps);// 3
- 1,新建一个 Properties 对象。
- 2,因为我们打算把键和值定义成字符串类型,所以使用内置的 StringSerializer。
- 3,这里创建了一个新的生产者对象,并为键和值设置了恰当的类型,然后把Properties 对象传给它。
这个接口很简单,通过配置生产者的不同属性就可以很大程度地控制它的行为。我们将在后面部分介绍其中几个比较重要的参数。
实例化生产者对象后,接下来就可以开始发送消息了。发送消息主要有以下 3 种方式:
- 发送并忘记(fire-and-forget)
把消息发送给服务器,但并不关心它是否正常到达。
大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。
- 同步发送
我们使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。
- 异步发送
调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。
在下面的几个例子中,我们会介绍如何使用上述几种方式来发送消息,以及如何处理可能发生的异常情况。
发送消息到Kafka
最简单的消息发送方式如下所示:
ProducerRecord<String,String>record=newProducerRecord<>("CustomerCountry","Precision Products","France");// ➊ try{
producer.send(record);// ➋ }catch(Exception e){
e.printStackTrace();// ➌ }
- ➊生产者的 send()方法将ProducerRecord对象作为参数, 所以要先创建一ProducerRecord 对象。ProducerRecord 有多个构造函数,后面详细讨论。
这里的构造函数需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。
- ➋用生产者的 send() 方法发送 ProducerRecord 对象。从生产者的架构图里可知,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。
send() 方法会返回一个包含 RecordMetadata 的 Future 对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。
- ➌可以忽略服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。
这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。
同步发送消息
最简单的同步发送消息方式如下所示:
ProducerRecord<String,String>record=newProducerRecord<>("CustomerCountry","Precision Products","France");try{
producer.send(record).get(); ➊
}catch(Exception e){
e.printStackTrace(); ➋
}
- ➊这里,producer.send() 方法先返回一个 Future 对象,然后调用 Future 对象的 get()方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata 对象,可以用它获取消息的偏移量。
- ➋如果发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。这里只简单地把异常信息打印出来。
KafkaProducer 一般会发生两类错误:
- 一类是可重试错误,这类错误可以通过重发消息来解决。
比如对于连接错误,可以通过再次建立连接来解决,“
无主(no leader)”错误则可以通过重新为分区选举首领来解决。
KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
- 另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
异步发送消息
由于消息发送需要时间,获取响应需要等待。但大多数时候,我们并不需要等待响应——尽管 Kafka
会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。
在遇到消息发送失败时,我们可以抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。
为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。
下面是使用回调的一个例子:
privateclassDemoProducerCallbackimplementsCallback{// ➊@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){if(e !=null){
e.printStackTrace();// ➋}}}ProducerRecord<String,String>record=newProducerRecord<>("CustomerCountry","Biomedical Materials","USA");// ➌
producer.send(record,newDemoProducerCallback());// ➍
- ➊ 使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion 方法。
- ➋ 如果 Kafka 返回错误,onCompletion 方法会抛出一个非空(non null)异常。
- ➌ 初始化record。
- ➍ 发送消息时将回调对象传进去。
注意:
这些回调在生产者的主线程中执行。这保证了当我们向同一个分区相继发送两条消息时,它们的回调将按照我们发送它们的相同顺序执行。
但这也意味着回调应该相当快,以避免延迟生产者并阻止发送其他消息。不建议在回调中执行阻塞操作。
相反,应该使用另一个线程来并发地执行任何阻塞操作。
生产者的配置
上文只介绍了生产者的几个必要配置参数——bootstrap.servers API 以及序列化器。
生产者还有很多可配置的参数,在 Kafka 文档里都有说明,一般情况没必要去修改它们。
不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来我们会一一说明:
client.id
client.id是客户端和它所使用的应用程序的一个逻辑标识符。
它可以是任何字符串,Broker将使用它来识别从客户端发送的消息。
它用于日志记录和metrics以及quotas,选择一个好的客户端名称将使故障排除更加容易。
这就是“我们发现来自 IP 104.27.155.134 的身份验证失败率很高”和“订单验证服务似乎无法通过身份验证——你能请 Laura 看一下吗?”之间的区别。
acks
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
这个参数对消息丢失的可能性有重要影响。该参数有如下选项:
- 如果 acks=0,生产者在确认消息写入成功之前不会等待任何来自服务器的响应。
也就是说, 生产者无法知道服务器有没有收到消息,即无法知道消息是否丢失。
不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大 速度发送消息,从而达到很高的吞吐量。
- 如果 acks=1,只要集群的Leader节点收到消息,生产者就会收到一个来自服务器的成功响应。
如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。
- 如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的。
它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。
不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
虽然,在较低和较不可靠的acks配置下,生产者将能够更快地发送记录。因为你用可靠性换取了生产者的延迟。然而,端到端的延迟是指从记录产生到消费者可以读取的时间,对于这三个选项来说是相同的。
原因是,为了保持一致性,Kafka直到它们被写入所有同步的副本,才允许消费者读取记录。
因此,如果你关心的是端到端的延迟,而不仅仅是生产者的延迟,就不需要做任何权衡:即使你选择最可靠的选项,你会得到相同的端到端延迟。
Message Delivery Time
生产者有多个配置参数,它们相互作用以控制开发者最感兴趣的下列行为:从调用send()到返回成功或失败需要多长时间。这个时间,是我们愿意等待直到Kafka成功响应,或者直到我们愿意放弃并认为调用失败的时间。
多年来,这些配置和它们的行为被多次修改。
从Apache Kafka 2.1开始,我们将发送ProduceRecord的时间分为两个时间间隔,分别进行处理:
- 异步调用 send() 返回之前的时间。在此间隔期间,调用 send() 的线程将被阻塞
- 从异步调用send()成功返回到回调被触发(成功或失败)的时间。
这与将 Produce Record 放入批处理中进行发送开始,直到Kafka回应成功、不可逆转的失败,或者我们分配的发送时间用完为止的时间是相同的。
注意:
如果你同步使用send(),发送线程将在两个时间间隔内连续阻塞,将无法知道每个时间间隔内分别花了多少时间。
所我们将推荐和讨论使用带有回调的异步send()。
生产者内部的数据流以及不同的配置参数如何相互影响,可以概括为下图:
我们将通过不同的配置参数来控制这两个区间的等待时间,以及它们如何相互作用。
max.block.ms
这个参数控制生产者在调用send()和通过partitionsFor()显式请求元数据时可以阻塞多长时间。
当生产者的发送缓冲区已满或元数据不可用时,这些方法可能会阻塞。
当达阻塞时间到max.block.ms时,会抛出一个超时异常。
delivery.timeout.ms
此配置将限制从记录准备好发送(send() 成功返回并且记录被放入批处理)到Broker响应或客户端放弃所花费的时间,包括重试所花费的时间。
如上图所示,这个时间应该大于linger.ms和request.timeout.ms。如果你尝试创建一个超时配置不一致的生产者,你会得到一个异常(这句话没太理解)。消息通常会比 delivery.timeout.ms 更快地成功发送。
如果生产者在重试时超过了 delivery.timeout.ms,回调将被调用,其异常与Broker在重试前返回的错误相对应。
如果在记录批处理仍在等待发送时超过delivery.timeout.ms,则回调将被调用并出现超时异常。
你可以将发送超时配置为你希望等待信息发送的最长时间,通常是几分钟,然后保留默认的重试次数(实际上是无限的)。
在这种配置下,只要没超时iu继续尝试,生产者就会不断重试(或直到成功),这是一种更合理的重试的方式。
request.timeout.ms
该参数控制生产者在发送数据时等待服务器回复的时间。
注意,这是在放弃之前等待每个生产者请求所花费的时间; 它不包括重试、发送前花费的时间等.
如果达到超时而没有回复,生产者将重试发送,或以一个TimeoutException完成回调。
retries and retry.backoff.ms
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,
不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。
一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。
你只需要处理那些不可重试的错误或重试次数超出上限的情况。
linger.ms
linger.ms 控制在发送当前批次前等待更多消息的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。
把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)
buffer.memory
buffer.memory用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
这个时候,额外的 send() 调用将阻塞 max.block.ms的时间, 并在抛出异常之前等待空间释放。
compression.type
默认情况下,消息发送时不会被压缩。
该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。
- snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。
- gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。
该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。
当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满
的批次,甚至只包含一个消息的批次也有可能被发送。
所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。
但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。
它的值越高,就会占用越多的内存,不过也会提升吞吐量。
把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
Kafka会保留分区内消息的顺序。
这意味着如果消息以特定顺序从生产者发送,Broker将按该顺序将它们写入分区,所有消费者将按该顺序读取它们。
max.request.size
该参数用于控制生产者发送的请求大小。
它同时限制了发送的单个消息的最大值,和单个请求里所有消息总的大小。
例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。
另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
receive.buffer.bytes and send.buffer.bytes
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
enable.idempotence
从 0.11 版本开始,Kafka 支持 exactly once 语义。
Exactly once 是一个相当大的话题,后面专门讨论它,但idempotent producer(幂等生产者)是其中比较简单且非常有用的部分。
假设你配置你的生产者以最大化可靠性:acks=all和一个适当大的delivery.timeout.ms以允许足够的重试。这些配置确保每条消息至少写入一次 Kafka。
例如,假设一个 broker 从生产者那里收到一条记录,将其写入本地磁盘,并且记录已成功复制到其他brokers,但随后第一个broker在向生产者发送响应之前崩溃了。生产者将等待,直到到达request. timeout.ms,然后重试。重试将转到新的leader,该leader已经拥有该记录的副本,因为之前的写被成功复制了,所以记录就会重复。
为避免这种情况,您可以设置 enable.idempotence=true。当闲置的idempotent producer被启用时,生产者将为其发送的每条记录附上一个序列号。
如果broker收到具有相同序列号的记录,它将拒绝第二个副本,生产者将收到无害的DuplicateSequenceException。
参考
版权归原作者 zxu_er 所有, 如有侵权,请联系我们删除。