0


【Kafka系列二-生产者】

系列文章目录

第一章 初识kafka
第二章 生产者
第三章 消费者
第四章 kafka的可靠性保证及其运行机制
第五章 参数配置及如何排除线上问题

文章目录


前言

Kafka系列第二篇章,生产者详解。

一、kafka生产者概念

一个应用程序在很多情况下需要往 Kafka 写入消息:记录用户的活动(用于审计和分析)、记录度量指标、保存日志消息、与其他应用程序进行异步通信、缓冲即将写入到数据库的数据,等等。
多样的使用场景意味着多样的需求:是否每个消息都很重要?是否允许丢失一小部分消息?偶尔出现重复消息是否可以接受?是否有严格的延迟和吞吐量要求?
比如:

  • 信用卡事务处理系统里,消息丢失或消息重复是不允许的,可以接受的延迟最大为500ms,对吞吐量要求较高——我们希望每秒钟可以处理一百万个消息。
  • 保存网站的点击信息是另一种使用场景。在这个场景里,允许丢失少量的消息或出现少量的消息重复,延迟可以高一些,只要不影响用户体验就行。换句话说,只要用户点击链接后可以马上加载页面,那么我们并不介意消息要在几秒钟之后才能到达Kafka 服务器。吞吐量则取决于网站用户使用网站的频度。 不同的使用场景对生产者 API 的使用和配置会有直接的影响。尽管生产者 API 使用起来很简单,但消息的发送过程还是有点复杂的。图 1-1 展示了向Kafka 发送消息的主要步骤。在这里插入图片描述图1-1
  • 从创建一个 ProducerRecord 对象开始,ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
  • 接下来,数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。
  • 紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker上。
  • 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。
  • 如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

二、Kafka的使用

2.1 创建Kafka生产者

要往 Kafka 写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka 生产者有 3个必选的属性。

  • bootstrap.servers 该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
  • key.serializer Kafka broker 期望接收到的消息的键和值都是字节数组。sarama 生产者接口支持灵活的编码方式,因此可以将 Go 语言中的任意数据类型作为键和值发送给 broker。为了实现这一点,生产者需要知道如何将这些数据类型转换为字节数组。在 sarama 中,你可以通过实现 sarama.Encoder 接口来自定义键或值的序列化方式。sarama 默认提供了 StringEncoder 和 ByteEncoder,如果你只使用常见的类型(如字符串或字节数组),可以直接使用这些默认的序列化器,而无需自定义序列化器。不过,即使你不打算发送键,只发送值内容,也需要为消息的 Key 字段设置一个序列化器,例如使用 nil 或空字符串的 StringEncoder。我们可以自定义序列化器,只需要实现以下的两个接口。在这里插入图片描述
  • value.serializer 与 key.serializer 一样,value.serializer 指定的类会将值序列化。如果键和值都是字符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。 下面的代码片段演示了如何创建一个新的生产者,这里只指定了必要的属性,其他使用默认设置。
// Kafka broker 地址
    brokers :=[]string{"localhost:9092"}// 要发送的 topic
    topic :="test_topic"// 创建一个同步生产者配置
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有的 replica ack
    config.Producer.Retry.Max =5// 重试次数
    config.Producer.Return.Successes =true// 成功交付的消息将返回在 success channel// 创建同步生产者
    producer, err := sarama.NewSyncProducer(brokers, config)if err !=nil{
        log.Fatalln("Failed to start Sarama producer:", err)}defer producer.Close()// 要发送的消息内容
    msg :=&sarama.ProducerMessage{
        Topic: topic,
        Key:   sarama.StringEncoder("key"),
        Value: sarama.StringEncoder("Hello Kafka"),}// 发送消息
    partition, offset, err := producer.SendMessage(msg)if err !=nil{
        log.Fatalln("Failed to send message:", err)}

    fmt.Printf("Message is stored in partition %d, offset %d\n", partition, offset)

2.2 发送消息到Kafka

kafka客户端有两种发送消息的方式:同步发送和异步发送

  • 同步发送:使用 NewSyncProducer 创建生产者,调用 SendMessage 方法阻塞发送消息,并等待 Kafka 确认。
  • 异步发送:使用 NewAsyncProducer 创建异步生产者,消息通过 producer.Input()管道异步发送,成功的消息会通过 producer.Successes() 通道返回,失败的消息通过 producer.Errors()通道返回。(在Java中是通过回调函数实现异步处理)

异步发送适用于需要更高的吞吐量和非阻塞的生产消息场景,而同步发送则适合需要确保消息可靠传递的情况。 在 sarama
中,异步发送本身并不直接支持回调函数,但可以通过使用 Goroutine 或其他方式来模拟回调函数的行为。可以通过监听
producer.Successes() 和 producer.Errors()
通道来处理发送成功和失败的情况,这样的处理方式类似于回调机制。 在 Kafka的异步生产过程中,当消息发送成功或失败时,生产者会将结果发送到相应的通道,你可以定义回调函数来处理这些结果。虽然 sarama没有直接支持回调的功能,但可以通过以下方式实现类似回调的效果。

package main

import("fmt""log""time""github.com/Shopify/sarama")// 定义发送成功的回调函数funconSuccess(message *sarama.ProducerMessage){
    fmt.Printf("Message sent successfully to partition %d at offset %d\n", message.Partition, message.Offset)}// 定义发送失败的回调函数funconError(err *sarama.ProducerError){
    fmt.Printf("Failed to send message: %v\n", err.Err)}funcmain(){
    brokers :=[]string{"localhost:9092"}
    topic :="test_topic"// 配置生产者
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有的 replica ack
    config.Producer.Retry.Max =5// 发送失败时重试次数
    config.Producer.Return.Successes =true// 成功消息返回
    config.Producer.Return.Errors =true// 错误消息返回// 创建异步生产者
    producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{
        log.Fatalln("Failed to start Sarama async producer:", err)}defer producer.Close()// 使用 Goroutine 处理成功和失败的消息,模拟回调函数gofunc(){for{select{case success :=<-producer.Successes():onSuccess(success)// 成功时调用回调函数case err :=<-producer.Errors():onError(err)// 失败时调用回调函数}}}()// 异步发送消息for i :=0; i <10; i++{
        msg :=&sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
            Value: sarama.StringEncoder(fmt.Sprintf("this is message %d", i)),}
        producer.Input()<- msg // 异步发送
        time.Sleep(500* time.Millisecond)// 模拟发送间隔}// 等待所有消息发送完成
    producer.AsyncClose()// 等待所有回调函数处理完
    time.Sleep(2* time.Second)}

2.3 生产者的配置

生产者还有很多可配置的参数,在 Kafka 文档里都有说明,它们大部分都有合理的默认值,所以没有必要去修改它们。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来我们会一一说明。

  1. acks acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项。 • 如果 acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。 • 如果 acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应,显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用异步方式,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息由max.in.flight.requests.per.connection参数决定)。 • 如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
  2. buffer.memory 该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full 参数(在 0.9.0.0 版本里被替换成了 max.block.ms,表示在抛出异常之前可以阻塞一段时间)。
  3. compression.type 默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
    config := sarama.NewConfig()
    config.Producer.Compression = sarama.CompressionGZIP // 使用 GZIP 压缩
  1. retries 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。
  2. batch.size 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
  3. linger.ms 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
  4. client.id 该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
  5. max.in.flight.requests.per.connection 该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
  6. timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。
  7. max.block.ms 该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
  8. max.request.size 该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。所以batch.size(批次大小)<=max.request.size(请求大小)<=message.max.bytes(Kafka服务端能够一次接受的消息大小)
  9. receive.buffer.bytes 和 send.buffer.bytes 这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

2.4 分区

默认分区算法
在之前的例子里,ProducerRecord 对象包含了目标主题、键和值。Kafka 的消息是一个个键值对,ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的 null,不过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是说,如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取。

  • 键为 null 且使用默认分区器:客户端使用轮询算法决定分区。
  • 键不为 null 且使用默认分区器:客户端根据 key 的散列值选择分区。
  • 键不为 null 且使用自定义分区器:客户端根据自定义分区器决定分区。

注意只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在分区数量保持不变的情况下,可以保证用户 045189
的记录总是被写到分区 34。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证了——旧数据仍然留在分区34,但新的记录可能被写到其他分区上。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。

自定义分区
可以通过实现 sarama.Partitioner 接口来自定义分区算法。这个接口需要实现以下几个方法:

  • Partition(message *ProducerMessage, numPartitions int32) (int32, error):该方法返回消息的目标分区编号。
  • RequiresConsistency() bool:该方法指示是否对同一个 key 保持分区一致性。如果返回 true,则相同的 key会一直被分配到相同的分区中。在这里插入图片描述 假设你想实现一个简单的分区器算法,按照消息 key 的长度进行分区。
package main

import("fmt""log""hash/crc32""github.com/Shopify/sarama")// 自定义分区器type CustomPartitioner struct{}// 这个方法实现了具体的分区逻辑func(p *CustomPartitioner)Partition(message *sarama.ProducerMessage, numPartitions int32)(int32,error){if message.Key ==nil{// 如果消息没有 key,随机选择一个分区return sarama.PartitionRandom.Partition(message, numPartitions)}// 根据 key 的哈希值分区
    keyBytes, err := message.Key.Encode()if err !=nil{return-1, err
    }
    partition := crc32.ChecksumIEEE(keyBytes)%uint32(numPartitions)returnint32(partition),nil}// 如果这个方法返回 true,相同的 key 会被分配到相同的分区中func(p *CustomPartitioner)RequiresConsistency()bool{returntrue}// 创建一个新的自定义分区器funcCustomPartitionerConstructor(topic string) sarama.Partitioner {return&CustomPartitioner{}}funcmain(){
    brokers :=[]string{"localhost:9092"}
    topic :="test_topic"// 配置生产者
    config := sarama.NewConfig()
    config.Producer.Partitioner = CustomPartitionerConstructor // 使用自定义分区器
    config.Producer.RequiredAcks = sarama.WaitForAll            // 等待所有的 replica ack
    config.Producer.Retry.Max =5// 发送失败时重试次数
    config.Producer.Return.Successes =true// 成功交付的消息将返回在 success channel// 创建同步生产者
    producer, err := sarama.NewSyncProducer(brokers, config)if err !=nil{
        log.Fatalln("Failed to start Sarama producer:", err)}defer producer.Close()// 构建要发送的消息
    msg :=&sarama.ProducerMessage{
        Topic: topic,
        Key:   sarama.StringEncoder("my_custom_key"),
        Value: sarama.StringEncoder("this is a message with custom partitioner"),}// 同步发送消息
    partition, offset, err := producer.SendMessage(msg)if err !=nil{
        log.Fatalln("Failed to send message:", err)}

    fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)}

总结

我们先从生产者从创建到发送消息到Kafka服务端整体流程为引入,然后详细介绍了生产者的创建,同步发送,异步发送,异步发送之后如何处理成功或者失败的消息,Kafka生产者的各参数含义及序列化器,分区器,并给出了相应的示例代码。读者要理解客户端的消息的发送是按批次发送的,并懂得如何根据自己的业务场景设置合理的参数和实现自定义的序列化器和分区器。感谢您的关注,期待您的点赞和评论。

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/qq_41705360/article/details/143061157
版权归原作者 花箫乱 所有, 如有侵权,请联系我们删除。

“【Kafka系列二-生产者】”的评论:

还没有评论