0


Flink生产数据到kafka


前言

近期开始学习Flink程序开发,使用java语言,此文以生产数据至kafka为例记录下遇到的问题以及代码实现,若有错误请提出。

一、版本

Flink版本:1.15.4
kafka版本:3.0.0

二、使用步骤

1.maven引入库

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
    <flink.version>1.15.4</flink.version>
    <target.java.version>1.8</target.java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <maven.compiler.source>${target.java.version}</maven.compiler.source>
    <maven.compiler.target>${target.java.version}</maven.compiler.target>
    <log4j2.version>2.14.1</log4j2.version>
</properties>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
</dependency>

2.上代码

以下代码将Flink环境初始化、配置、生产数据至kafka代码放在一个独立类的原因,主要是在我近期做的业务中需要有多个实例往kafka生产数据,后面两个启动类调用此类。


import com.alibaba.fastjson.JSON;
import com.dogame.data.report.comsume.bean.KafkaEnvParameter;
import com.dogame.data.report.comsume.bean.Order;
import com.dogame.data.report.comsume.utils.EnvironmentUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Random;
import java.util.UUID;

public class WorkerService {

    public void run (String[] args, String bizType) throws Exception {

        StreamExecutionEnvironment ENV = EnvironmentUtils.initEnvironmentConfig(args, bizType);
        DataStream<String> orderDataStream = ENV.addSource(new ParallelSourceFunction<String>() {
            private boolean running = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                String [] channels = {"www.xxx.com","www.sdfs.com","www.qqq.com","www.dfff.com"};
                while(running){
                    Order order = new Order();
                    order.setOrderId(String.valueOf(System.currentTimeMillis()+new Random().nextInt()));
                    order.setTime(System.currentTimeMillis());
                    order.setFee(1000* new Random().nextDouble());
                    order.setChannel(channels[new Random().nextInt(channels.length)]);
                    ctx.collect(JSON.toJSONString(order)); //发送出去
                    Thread.sleep(10);
                }
            }

            @Override
            public void cancel() {
                running = false;
            }
        });
        orderDataStream.print();

        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setKafkaProducerConfig(KafkaEnvParameter.productProperties)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(KafkaEnvParameter.responseTopics)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                //.setTransactionalIdPrefix(UUID.randomUUID().toString())
                .build();

        orderDataStream.sinkTo(sink);
        orderDataStream.print();
        ENV.execute("SourceFunctionTest");
    }
}
@Slf4j
public class SourceFunctionTest {

    public static void main(String[] args)  {
        try {
            String bizType = "t0";
            WorkerService workerService = new WorkerService();
            workerService.run(args, bizType);
        } catch (Exception e) {
            log.error(" fail ", e);
        }
    }
}

@Slf4j
public class SourceFunctionTest1 {
    
    public static void main(String[] args) throws Exception {
        try {
            String bizType = "t1";
            WorkerService workerService = new WorkerService();
            workerService.run(args, bizType);
        } catch (Exception e) {
            log.error(" fail ", e);
        }
    }
}

需要强调一点的是:经过测试,发现以下问题:若只启动一个实例,代码能稳定运行,若启动两个以上实例,运行几分钟后会有程序报错,最终只有一个实例能持续正常运行,经过2天对这个问题的排查(问chatGPT结合相关代码走读),发现居然是KafkaSinkBuilder这个方法未调用(以上workerService类已注释,打开即可):


    /**
     * Sets the prefix for all created transactionalIds if {@link DeliveryGuarantee#EXACTLY_ONCE} is
     * configured.
     *
     * <p>It is mandatory to always set this value with {@link DeliveryGuarantee#EXACTLY_ONCE} to
     * prevent corrupted transactions if multiple jobs using the KafkaSink run against the same
     * Kafka Cluster. The default prefix is {@link #transactionalIdPrefix}.
     *
     * <p>The size of the prefix is capped by {@link #MAXIMUM_PREFIX_BYTES} formatted with UTF-8.
     *
     * <p>It is important to keep the prefix stable across application restarts. If the prefix
     * changes it might happen that lingering transactions are not correctly aborted and newly
     * written messages are not immediately consumable until the transactions timeout.
     *
     * @param transactionalIdPrefix
     * @return {@link KafkaSinkBuilder}
     */
    public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefix) {
        this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix");
        checkState(
                transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length
                        <= MAXIMUM_PREFIX_BYTES,
                "The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
        return this;
    }
标签: kafka flink java

本文转载自: https://blog.csdn.net/gaozhi0/article/details/130304854
版权归原作者 橄榄丝 所有, 如有侵权,请联系我们删除。

“Flink生产数据到kafka”的评论:

还没有评论