flink local模式下启动 sink2kafka报错,具体报错如下
apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:56)
......................
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
提取报错信息
Failed to construct kafka producer
class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
代码
flink版本是14.6
kafkaProperties里存的是kafka的信息
println(s"========kafka properties========\r\n$kafkaProperties");
val broker: String = kafkaProperties.getProperty("broker")
val topic: String = kafkaProperties.getProperty("topic")
val kafkaSink: KafkaSink[String] = KafkaSink.builder()
.setBootstrapServers(broker)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setKafkaProducerConfig(kafkaProperties)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
data.map(record=>JacksonManager.mapper.writeValueAsString(record))
.sinkTo(kafkaSink).name("sink2kafka")
本地起了一个sink2kafka的demo 也没问题,但是在服务器启动的时候就报错了,试了多次无果,开始分析报错原因。
我们要sink2kafka,那么flink肯定根据我们的kafka信息创建一个kafkaProducer
对应的报错,这里是kafkaProducer的构造器init失败了
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
那么为什么init失败了呢?因为这个类ByteArraySerializer 不是Serializer 的实例
class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
那么这个为什么不是实例呢?我们在idea里看下
package org.apache.kafka.common.serialization;
public class ByteArraySerializer implements Serializer<byte[]> {
@Override
public byte[] serialize(String topic, byte[] data) {
return data;
}
}
这里明明就是,为啥说不是啊。。。需要思考下。
当时我最开始就考虑是jar包冲突,再看下是否冲突,突然想到一个问题,项目中的有两个人
a喜欢打非依赖的jar的包,也就是flink的jar都不打进去,全放到服务器的flink_home/jar里
b喜欢打全依赖的jar包,也就是所有flink的jar都打进去,然后执行。
目前是b的工程,那么会不会是jar冲突了,是自己工程冲突了 还是打的jar和flink_home/jar里的jar冲突了?
先看工程
然后我看了服务器的
那么原因就出来的,排除多余的jar。就正常启动了
版权归原作者 cclovezbf 所有, 如有侵权,请联系我们删除。