0


flink启动报错Failed to construct kafka producer

flink local模式下启动 sink2kafka报错,具体报错如下

apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:56)
......................
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)

提取报错信息

Failed to construct kafka producer

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

代码

flink版本是14.6

kafkaProperties里存的是kafka的信息

   println(s"========kafka properties========\r\n$kafkaProperties");
    val broker: String = kafkaProperties.getProperty("broker")
    val topic: String = kafkaProperties.getProperty("topic")
    val kafkaSink: KafkaSink[String] = KafkaSink.builder()
      .setBootstrapServers(broker)
      .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
          .setTopic(topic)
          .setValueSerializationSchema(new SimpleStringSchema())
          .build()
      )
      .setKafkaProducerConfig(kafkaProperties)
      .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .build();
    data.map(record=>JacksonManager.mapper.writeValueAsString(record))
      .sinkTo(kafkaSink).name("sink2kafka")

本地起了一个sink2kafka的demo 也没问题,但是在服务器启动的时候就报错了,试了多次无果,开始分析报错原因。

我们要sink2kafka,那么flink肯定根据我们的kafka信息创建一个kafkaProducer

对应的报错,这里是kafkaProducer的构造器init失败了

org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)

那么为什么init失败了呢?因为这个类ByteArraySerializer 不是Serializer 的实例

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

那么这个为什么不是实例呢?我们在idea里看下

package org.apache.kafka.common.serialization;

public class ByteArraySerializer implements Serializer<byte[]> {
    @Override
    public byte[] serialize(String topic, byte[] data) {
        return data;
    }
}

这里明明就是,为啥说不是啊。。。需要思考下。

当时我最开始就考虑是jar包冲突,再看下是否冲突,突然想到一个问题,项目中的有两个人

a喜欢打非依赖的jar的包,也就是flink的jar都不打进去,全放到服务器的flink_home/jar里

b喜欢打全依赖的jar包,也就是所有flink的jar都打进去,然后执行。

目前是b的工程,那么会不会是jar冲突了,是自己工程冲突了 还是打的jar和flink_home/jar里的jar冲突了?

先看工程

然后我看了服务器的

那么原因就出来的,排除多余的jar。就正常启动了

标签: flink 大数据

本文转载自: https://blog.csdn.net/cclovezbf/article/details/131695813
版权归原作者 cclovezbf 所有, 如有侵权,请联系我们删除。

“flink启动报错Failed to construct kafka producer”的评论:

还没有评论