0


Flink 之 Kafka连接器

依赖

Flink附带了一个通用的Kafka连接器,它试图跟踪Kafka客户端的最新版本。Kafka的客户端版本会在Flink不同版本间发生变化。现代Kafka客户端向后兼容broker 0.10.0版本及以后的版本。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.4</version></dependency>

Kafka Source

用法

Kafka Source 提供了一个构造器类来构建KafkaSource的实例。下面代码展示如何构建一个KafkaSource来消费主题“input-topic”最早偏移量的消息,使用“my-group”消费组并将消息的值反序列化为字符串。

KafkaSource<String> source =KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("input-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(newSimpleStringSchema()).build();

env.fromSource(source,WatermarkStrategy.noWatermarks(),"Kafka Source");

构建Kafka Source需要以下属性:

  1. Bootstrap servers,通过setBootstrapServers配置(字符串)
  2. 要订阅的主题和分区
  3. 解析Kafka消息的序列化器

主题分区订阅

Kafka Source提供了3种主题分区订阅的方式

  1. 主题列表,从主题列表中的所有分区订阅消息。eg:KafkaSource.builder().setTopics("topic-a", "topic-b")
  2. 主题模式,订阅名称与提供正则表达式匹配的所有主题消息。eg:KafkaSource.builder().setTopicPattern("topic.*")
  3. 分区集合,订阅提供的分区集合的分区。eg:finalHashSet<TopicPartition> partitionSet =newHashSet<>(Arrays.asList(newTopicPartition("topic-a",0),// 主题"topic-a"的第0个分区newTopicPartition("topic-b",5)));// 主题"topic-b"的第5个分区KafkaSource.builder().setPartitions(partitionSet);

反序列化器

解析Kafka消息需要一个反序列化器。反序列化器(反序列化模式)可以通过setDeserializer(KafkaRecordDeserializationSchema)配置,其中KafkaRecordDeserializationSchema定义了如何反序列一个Kafka消费记录(ConsumerRecord)。

如果只需要Kafka ConsumerRecord的值,你可以在构建器中使用setValueOnlyDeserializer(DeserializationSchema),其中DeserializationSchema定义了如何反序列化K阿发消息值的二进制文件。

你也可以使用Kafka反序列化器来反序列化Kafka消息值。例如使用StringDeserializer将Kafka消息值序列化为字符串:

importorg.apache.kafka.common.serialization.StringDeserializer;KafkaSource.<String>builder().setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));

起始偏移量

Kafka Source可以通过指定OffsetsInitializer从不同偏移量开始消费消息。内置的初始值包括:

  1. 从消费组的已提交偏移量开始,没有重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())
  2. 从提交的偏移量开始,如果提交的偏移量不存在,使用early作为重置策略 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
  3. 从第一个时间戳大于或等于时间戳(毫秒)的记录开始.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
  4. 从最早的偏移量开始.setStartingOffsets(OffsetsInitializer.earliest())
  5. 从最新的偏移量开始 .setStartingOffsets(OffsetsInitializer.latest())

如果上面的内置初始化器不能满足你的需求,你可以实现自定义偏移量初始化器。

如果你没有指定初始化器,默认使用 OffsetsInitializer.earliest()

有界性

Kafka Source被设计成同时支持流和批处理运行模式。默认情况下,Kafka Source被设置为以流的方式运行,因此在Flink作业失败或取消之前从不停止。你可以使用setBounded(OffsetsInitializer)来指定停止偏移量,并设置Source以批量模式运行。当所有分区都达到停止偏移量时,Source将退出。

你也可以使用setUnbounded(OffsetsInitializer)来设置KafkaSource在流模式下运行,但仍然在停止偏移量处停止。

当所有分区达到指定的停止偏移量时,Source将会退出。

其他属性

除了上述描述的的属性,你可以通过使用setProperties(properties)和setProperty(String, String)为Kafka Source和Kafka Consumer设置任意的属性。

  1. client.id.prefix 定义了Kafka消费者客户端ID的前缀
  2. partition.discovery.interval.ms 定义了Kafka Source发现新分区的间隔毫秒数。
  3. register.consumer.metrics 指定是否在Flink度量组中注册KafkaConsumer的度量
  4. commit.offsets.on.checkpoint 指定是否在检查点上将消费偏移量提交给Kafka Broker

请注意,以下键将被构建器覆盖,即使它被设置了:

  1. key.deserializer通常设置为ByteArrayDeserializer
  2. value.deserializer 通常设置为 ByteArrayDeserializer
  3. auto.offset.reset.strategyOffsetsInitializer#getAutoOffsetResetStrategy()覆盖,用于初始偏移量
  4. setBounded(OffsetsInitializer)被调用时,partition.discovery.interval.ms被重写为-1

自动发现分区

为了在不重启Flink作业情况下处理主题向外扩展和主题创建等场景,Kafka Source可以配置为在提供主题分区订阅模式下定期发现新分区。如果要启动分区发现,请为

partition.discovery.interval.ms

属性设置一个非负值:

KafkaSource.builder().setProperty("partition.discovery.interval.ms","10000")// 每10秒查看一次新分区

默认情况下,分区发现是被禁止的。你需要显示地设置分区发现间隔以启动此功能。

事件时间和水位线

默认情况下,记录将使用Kafka ConsumerRecord中嵌入的时间戳作为事件时间。 你可以定义自己的WatermarkStrategy(水位策略)从记录本身提取事件时间,并向下游发送水印:

env.fromSource(kafkaSource,newCustomWatermarkStrategy(),"Kafka Source With Custom Watermark Strategy")

空闲性

如果并行度高于分区的数量,Kafka Source不会进入空闲状态。你需要降低并行度,或者向水印策略添加空闲超时。如果在同一段时间内流的一个分区中没有任何记录流,那么该分区就被认为是“空闲的”,并且不会阻碍下游算子的水印进程。

消费者偏移量提交

Kafka Source在检查点完成时提交当前消费的偏移量,以确保Flink的检查点状态与Kafka Broker上提交的偏移量之间的一致性。

如果检查点没有启用,Kafka Source依赖Kafka消费者内部自动周期偏移量提交逻辑,通过Kafka Consumer属性中的

enable.auto.commit

auto.commit.interval.ms

配置。

注意,Kafka Source不依赖提交的偏移量实现容错。提交偏移量只是为了暴露消费者和消费者组的进度以便监控。

监控

TODO

安全

为了启用包含加密和身份验证在内的 安全配置,你只需要将安全配置设置为Kafka Source的附加属性。

下面代码片段展示了如何配置Kafka Source使用PLAIN作为SASL机制,并提供JAAS配置:

KafkaSource.builder().setProperty("security.protocol","SASL_PLAINTEXT").setProperty("sasl.mechanism","PLAIN").setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

对于一个更复杂的例子,使用SASL_SSL作为安全协议,使用SCRAM-SHA-256作为SASL机制:

KafkaSource.builder().setProperty("security.protocol","SASL_SSL")// SSL configurations// Configure the path of truststore (CA) provided by the server.setProperty("ssl.truststore.location","/path/to/kafka.client.truststore.jks").setProperty("ssl.truststore.password","test1234")// Configure the path of keystore (private key) if client authentication is required.setProperty("ssl.keystore.location","/path/to/kafka.client.keystore.jks").setProperty("ssl.keystore.password","test1234")// SASL configurations// Set SASL mechanism as SCRAM-SHA-256.setProperty("sasl.mechanism","SCRAM-SHA-256")// Set JAAS configurations.setProperty("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

请注意,如果你在作业JAR中重新定位Kafka客户端依赖,那么登陆模块的类路径可能会不同,所以你可能需要用Jar中模块实际类路径重写它。

标签: kafka flink java

本文转载自: https://blog.csdn.net/weixin_42454473/article/details/127130703
版权归原作者 豪仔数据之路 所有, 如有侵权,请联系我们删除。

“Flink 之 Kafka连接器”的评论:

还没有评论