0


Flink生产数据到kafka


前言

近期开始学习Flink程序开发,使用java语言,此文以生产数据至kafka为例记录下遇到的问题以及代码实现,若有错误请提出。

一、版本

Flink版本:1.15.4
kafka版本:3.0.0

二、使用步骤

1.maven引入库

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  4. <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
  5. <flink.version>1.15.4</flink.version>
  6. <target.java.version>1.8</target.java.version>
  7. <scala.binary.version>2.11</scala.binary.version>
  8. <maven.compiler.source>${target.java.version}</maven.compiler.source>
  9. <maven.compiler.target>${target.java.version}</maven.compiler.target>
  10. <log4j2.version>2.14.1</log4j2.version>
  11. </properties>
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-streaming-java</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-clients</artifactId>
  9. <version>${flink.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-connector-kafka</artifactId>
  14. <version>${flink.version}</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-runtime-web</artifactId>
  19. <version>${flink.version}</version>
  20. </dependency>

2.上代码

以下代码将Flink环境初始化、配置、生产数据至kafka代码放在一个独立类的原因,主要是在我近期做的业务中需要有多个实例往kafka生产数据,后面两个启动类调用此类。

  1. import com.alibaba.fastjson.JSON;
  2. import com.dogame.data.report.comsume.bean.KafkaEnvParameter;
  3. import com.dogame.data.report.comsume.bean.Order;
  4. import com.dogame.data.report.comsume.utils.EnvironmentUtils;
  5. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  6. import org.apache.flink.connector.base.DeliveryGuarantee;
  7. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
  8. import org.apache.flink.connector.kafka.sink.KafkaSink;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
  12. import java.util.Random;
  13. import java.util.UUID;
  14. public class WorkerService {
  15. public void run (String[] args, String bizType) throws Exception {
  16. StreamExecutionEnvironment ENV = EnvironmentUtils.initEnvironmentConfig(args, bizType);
  17. DataStream<String> orderDataStream = ENV.addSource(new ParallelSourceFunction<String>() {
  18. private boolean running = true;
  19. @Override
  20. public void run(SourceContext<String> ctx) throws Exception {
  21. String [] channels = {"www.xxx.com","www.sdfs.com","www.qqq.com","www.dfff.com"};
  22. while(running){
  23. Order order = new Order();
  24. order.setOrderId(String.valueOf(System.currentTimeMillis()+new Random().nextInt()));
  25. order.setTime(System.currentTimeMillis());
  26. order.setFee(1000* new Random().nextDouble());
  27. order.setChannel(channels[new Random().nextInt(channels.length)]);
  28. ctx.collect(JSON.toJSONString(order)); //发送出去
  29. Thread.sleep(10);
  30. }
  31. }
  32. @Override
  33. public void cancel() {
  34. running = false;
  35. }
  36. });
  37. orderDataStream.print();
  38. KafkaSink<String> sink = KafkaSink.<String>builder()
  39. .setKafkaProducerConfig(KafkaEnvParameter.productProperties)
  40. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  41. .setTopic(KafkaEnvParameter.responseTopics)
  42. .setValueSerializationSchema(new SimpleStringSchema())
  43. .build()
  44. )
  45. .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  46. //.setTransactionalIdPrefix(UUID.randomUUID().toString())
  47. .build();
  48. orderDataStream.sinkTo(sink);
  49. orderDataStream.print();
  50. ENV.execute("SourceFunctionTest");
  51. }
  52. }
  1. @Slf4j
  2. public class SourceFunctionTest {
  3. public static void main(String[] args) {
  4. try {
  5. String bizType = "t0";
  6. WorkerService workerService = new WorkerService();
  7. workerService.run(args, bizType);
  8. } catch (Exception e) {
  9. log.error(" fail ", e);
  10. }
  11. }
  12. }
  13. @Slf4j
  14. public class SourceFunctionTest1 {
  15. public static void main(String[] args) throws Exception {
  16. try {
  17. String bizType = "t1";
  18. WorkerService workerService = new WorkerService();
  19. workerService.run(args, bizType);
  20. } catch (Exception e) {
  21. log.error(" fail ", e);
  22. }
  23. }
  24. }

需要强调一点的是:经过测试,发现以下问题:若只启动一个实例,代码能稳定运行,若启动两个以上实例,运行几分钟后会有程序报错,最终只有一个实例能持续正常运行,经过2天对这个问题的排查(问chatGPT结合相关代码走读),发现居然是KafkaSinkBuilder这个方法未调用(以上workerService类已注释,打开即可):

  1. /**
  2. * Sets the prefix for all created transactionalIds if {@link DeliveryGuarantee#EXACTLY_ONCE} is
  3. * configured.
  4. *
  5. * <p>It is mandatory to always set this value with {@link DeliveryGuarantee#EXACTLY_ONCE} to
  6. * prevent corrupted transactions if multiple jobs using the KafkaSink run against the same
  7. * Kafka Cluster. The default prefix is {@link #transactionalIdPrefix}.
  8. *
  9. * <p>The size of the prefix is capped by {@link #MAXIMUM_PREFIX_BYTES} formatted with UTF-8.
  10. *
  11. * <p>It is important to keep the prefix stable across application restarts. If the prefix
  12. * changes it might happen that lingering transactions are not correctly aborted and newly
  13. * written messages are not immediately consumable until the transactions timeout.
  14. *
  15. * @param transactionalIdPrefix
  16. * @return {@link KafkaSinkBuilder}
  17. */
  18. public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefix) {
  19. this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix");
  20. checkState(
  21. transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length
  22. <= MAXIMUM_PREFIX_BYTES,
  23. "The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
  24. return this;
  25. }
标签: kafka flink java

本文转载自: https://blog.csdn.net/gaozhi0/article/details/130304854
版权归原作者 橄榄丝 所有, 如有侵权,请联系我们删除。

“Flink生产数据到kafka”的评论:

还没有评论