依赖
Flink附带了一个通用的Kafka连接器,它试图跟踪Kafka客户端的最新版本。Kafka的客户端版本会在Flink不同版本间发生变化。现代Kafka客户端向后兼容broker 0.10.0版本及以后的版本。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.4</version></dependency>
Kafka Source
用法
Kafka Source 提供了一个构造器类来构建KafkaSource的实例。下面代码展示如何构建一个KafkaSource来消费主题“input-topic”最早偏移量的消息,使用“my-group”消费组并将消息的值反序列化为字符串。
KafkaSource<String> source =KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("input-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(newSimpleStringSchema()).build();
env.fromSource(source,WatermarkStrategy.noWatermarks(),"Kafka Source");
构建Kafka Source需要以下属性:
- Bootstrap servers,通过setBootstrapServers配置(字符串)
- 要订阅的主题和分区
- 解析Kafka消息的序列化器
主题分区订阅
Kafka Source提供了3种主题分区订阅的方式
- 主题列表,从主题列表中的所有分区订阅消息。eg:
KafkaSource.builder().setTopics("topic-a", "topic-b")
- 主题模式,订阅名称与提供正则表达式匹配的所有主题消息。eg:
KafkaSource.builder().setTopicPattern("topic.*")
- 分区集合,订阅提供的分区集合的分区。eg:
finalHashSet<TopicPartition> partitionSet =newHashSet<>(Arrays.asList(newTopicPartition("topic-a",0),// 主题"topic-a"的第0个分区newTopicPartition("topic-b",5)));// 主题"topic-b"的第5个分区KafkaSource.builder().setPartitions(partitionSet);
反序列化器
解析Kafka消息需要一个反序列化器。反序列化器(反序列化模式)可以通过setDeserializer(KafkaRecordDeserializationSchema)配置,其中KafkaRecordDeserializationSchema定义了如何反序列一个Kafka消费记录(ConsumerRecord)。
如果只需要Kafka ConsumerRecord的值,你可以在构建器中使用setValueOnlyDeserializer(DeserializationSchema),其中DeserializationSchema定义了如何反序列化K阿发消息值的二进制文件。
你也可以使用Kafka反序列化器来反序列化Kafka消息值。例如使用StringDeserializer将Kafka消息值序列化为字符串:
importorg.apache.kafka.common.serialization.StringDeserializer;KafkaSource.<String>builder().setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
起始偏移量
Kafka Source可以通过指定OffsetsInitializer从不同偏移量开始消费消息。内置的初始值包括:
- 从消费组的已提交偏移量开始,没有重置策略
.setStartingOffsets(OffsetsInitializer.committedOffsets())
- 从提交的偏移量开始,如果提交的偏移量不存在,使用early作为重置策略
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
- 从第一个时间戳大于或等于时间戳(毫秒)的记录开始
.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
- 从最早的偏移量开始
.setStartingOffsets(OffsetsInitializer.earliest())
- 从最新的偏移量开始
.setStartingOffsets(OffsetsInitializer.latest())
如果上面的内置初始化器不能满足你的需求,你可以实现自定义偏移量初始化器。
如果你没有指定初始化器,默认使用 OffsetsInitializer.earliest()
有界性
Kafka Source被设计成同时支持流和批处理运行模式。默认情况下,Kafka Source被设置为以流的方式运行,因此在Flink作业失败或取消之前从不停止。你可以使用setBounded(OffsetsInitializer)来指定停止偏移量,并设置Source以批量模式运行。当所有分区都达到停止偏移量时,Source将退出。
你也可以使用setUnbounded(OffsetsInitializer)来设置KafkaSource在流模式下运行,但仍然在停止偏移量处停止。
当所有分区达到指定的停止偏移量时,Source将会退出。
其他属性
除了上述描述的的属性,你可以通过使用setProperties(properties)和setProperty(String, String)为Kafka Source和Kafka Consumer设置任意的属性。
- client.id.prefix 定义了Kafka消费者客户端ID的前缀
- partition.discovery.interval.ms 定义了Kafka Source发现新分区的间隔毫秒数。
- register.consumer.metrics 指定是否在Flink度量组中注册KafkaConsumer的度量
- commit.offsets.on.checkpoint 指定是否在检查点上将消费偏移量提交给Kafka Broker
请注意,以下键将被构建器覆盖,即使它被设置了:
key.deserializer
通常设置为ByteArrayDeserializer
value.deserializer
通常设置为ByteArrayDeserializer
auto.offset.reset.strategy
被OffsetsInitializer#getAutoOffsetResetStrategy()
覆盖,用于初始偏移量- 当
setBounded(OffsetsInitializer)
被调用时,partition.discovery.interval.ms
被重写为-1
自动发现分区
为了在不重启Flink作业情况下处理主题向外扩展和主题创建等场景,Kafka Source可以配置为在提供主题分区订阅模式下定期发现新分区。如果要启动分区发现,请为
partition.discovery.interval.ms
属性设置一个非负值:
KafkaSource.builder().setProperty("partition.discovery.interval.ms","10000")// 每10秒查看一次新分区
默认情况下,分区发现是被禁止的。你需要显示地设置分区发现间隔以启动此功能。
事件时间和水位线
默认情况下,记录将使用Kafka ConsumerRecord中嵌入的时间戳作为事件时间。 你可以定义自己的WatermarkStrategy(水位策略)从记录本身提取事件时间,并向下游发送水印:
env.fromSource(kafkaSource,newCustomWatermarkStrategy(),"Kafka Source With Custom Watermark Strategy")
空闲性
如果并行度高于分区的数量,Kafka Source不会进入空闲状态。你需要降低并行度,或者向水印策略添加空闲超时。如果在同一段时间内流的一个分区中没有任何记录流,那么该分区就被认为是“空闲的”,并且不会阻碍下游算子的水印进程。
消费者偏移量提交
Kafka Source在检查点完成时提交当前消费的偏移量,以确保Flink的检查点状态与Kafka Broker上提交的偏移量之间的一致性。
如果检查点没有启用,Kafka Source依赖Kafka消费者内部自动周期偏移量提交逻辑,通过Kafka Consumer属性中的
enable.auto.commit
和
auto.commit.interval.ms
配置。
注意,Kafka Source不依赖提交的偏移量实现容错。提交偏移量只是为了暴露消费者和消费者组的进度以便监控。
监控
TODO
安全
为了启用包含加密和身份验证在内的 安全配置,你只需要将安全配置设置为Kafka Source的附加属性。
下面代码片段展示了如何配置Kafka Source使用PLAIN作为SASL机制,并提供JAAS配置:
KafkaSource.builder().setProperty("security.protocol","SASL_PLAINTEXT").setProperty("sasl.mechanism","PLAIN").setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
对于一个更复杂的例子,使用SASL_SSL作为安全协议,使用SCRAM-SHA-256作为SASL机制:
KafkaSource.builder().setProperty("security.protocol","SASL_SSL")// SSL configurations// Configure the path of truststore (CA) provided by the server.setProperty("ssl.truststore.location","/path/to/kafka.client.truststore.jks").setProperty("ssl.truststore.password","test1234")// Configure the path of keystore (private key) if client authentication is required.setProperty("ssl.keystore.location","/path/to/kafka.client.keystore.jks").setProperty("ssl.keystore.password","test1234")// SASL configurations// Set SASL mechanism as SCRAM-SHA-256.setProperty("sasl.mechanism","SCRAM-SHA-256")// Set JAAS configurations.setProperty("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
请注意,如果你在作业JAR中重新定位Kafka客户端依赖,那么登陆模块的类路径可能会不同,所以你可能需要用Jar中模块实际类路径重写它。
版权归原作者 豪仔数据之路 所有, 如有侵权,请联系我们删除。