问题描述
由于 flink / kafka 的版本不断更新,创建项目的时候就应当考虑清楚这几个依赖库的版本问题,尽可能地与实际场景保持一致,比如服务器上部署的 kafka 是哪个版本,flink 是哪个版本,从而确定我们需要开发的是哪个版本,并且在真正的开发工作开始之前,应当先测试一下保证 kafka 的版本 、 flink 的版本一致,至少大版本一致,不存在冲突问题,不要为以后的部署埋坑。
解决方案
步骤 1 确定 flink / scala / flink-connect-kafka 的版本
比如 flink 选择的是
1.12.7
这个版本,我们前去 maven 仓库查看 flink-connect-kafka 的版本。首先访问
链接1
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka 找到对应的
1.12.7
这个版本,也就是根据 flink 的版本去寻找 flink-connect-kafka 的版本,记作
链接2
即 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.12.7。
进入
链接2
对应的地址后,可以发现提供的 maven 地址如下:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.12.7</version></dependency>
这个地方已经明确地指出,
kafka_2.12
指的是对应的是使用
scala
的版本是
2.12
编写的 kakfa ,也就是对应的是 scala 2.12 的版本。为了确保无误,请确保安装的
kafka
也是这个版本。
类似地
,如果是其他版本的
flink
也要找到对应的
flink-connector-kafka
版本,确保 kafka 的版本的 scala 是一致的。
pom.xml 案例
为了规范,我们把版本号写在前面,然后再引用这些依赖。
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.smileyan.demo</groupId><artifactId>flink-kafka</artifactId><version>1.0-SNAPSHOT</version><packaging>pom</packaging><properties><java.version>8</java.version><!-- flink 的版本 --><flink.version>1.12.7</flink.version><!-- scala 的版本(也就是 kafka 的源码的版本)--><scala.binary.version>2.12</scala.binary.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven-shade-plugin.version>3.2.4</maven-shade-plugin.version><slf4j.version>2.0.7</slf4j.version></properties><profiles><profile><id>local</id><activation><activeByDefault>true</activeByDefault></activation><properties><flink.scope>compile</flink.scope></properties></profile><profile><id>prod</id><activation><activeByDefault>false</activeByDefault></activation><properties><flink.scope>provided</flink.scope></properties></profile></profiles><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>${maven-shade-plugin.version}</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin></plugins></build></project>
扩展
根据实际需要,调整 flink 的版本以及 scala 的版本。一定要确保最终我们在 maven 仓库中能找到对应的版本。
JAVA 代码示例
再次强调
:一定要找到对应的版本的示例。新版本的 flink 不再支持 new FlinkKafkaProducer 以及 new FlinkKafkaConsumer 这类操作,所以一定要结合实际情况进行调整。
flink version <= 1.13
消费者
finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Properties properties =newProperties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","test");FlinkKafkaConsumer<String> myConsumer =newFlinkKafkaConsumer<>(java.util.regex.Pattern.compile("test-topic-[0-9]"),newSimpleStringSchema(),
properties);DataStream<String> stream = env.addSource(myConsumer);
生产者
DataStream<String> stream =...Properties properties =newProperties();
properties.setProperty("bootstrap.servers","localhost:9092");KafkaSerializationSchema<String> serializationSchema =newKafkaSerializationSchema<String>(){@OverridepublicProducerRecord<byte[],byte[]>serialize(String element,@NullableLong timestamp){returnnewProducerRecord<>("my-topic",// target topic
element.getBytes(StandardCharsets.UTF_8));// record contents}};FlinkKafkaProducer<String> myProducer =newFlinkKafkaProducer<>("my-topic",// target topic
serializationSchema,// serialization schema
properties,// producer configFlinkKafkaProducer.Semantic.EXACTLY_ONCE);// fault-tolerance
stream.addSink(myProducer);
flink version > 1.13
消费者
KafkaSource<String> source =KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("input-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(newSimpleStringSchema()).build();
env.fromSource(source,WatermarkStrategy.noWatermarks(),"Kafka Source");
生产者
DataStream<String> stream =...KafkaSink<String> sink =KafkaSink.<String>builder().setBootstrapServers(brokers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("topic-name").setValueSerializationSchema(newSimpleStringSchema()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build()).build();
stream.sinkTo(sink);
依赖<scope> 设置为 provided
flink 的 job 开发完成以后,需要打包上传到服务器端运行,实际上
flink-java
flink-stream
等等依赖包在服务器端的 flink 都提供了这些依赖,所以打包的时候可以去除这些依赖,以减小打包后的 jar 文件的大小。
所以如上面的 pom.xml 文件所示,我们把很多依赖的 scope 设置为
provided
,但是带来的新问题就是运行 flink job 的main方法时会出现报错提示这些依赖找不到:
这时我们需要进行配置,步骤如下:
再次运行就不会报错了。
参考链接
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
- https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
总结
flink-kafka 的项目创建本身应当是一件很容易的事情,但是为了避免为以后的开发埋雷,一定要规范地编写依赖,并结合实际情况对版本进行调整,并非一切都应当用最新版本的。
Smileyan
2023-03-25 00:29
版权归原作者 smile-yan 所有, 如有侵权,请联系我们删除。