0


Flink之Kafka Sink

  • 代码内容
  1. packagecom.jin.demo;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.connector.base.DeliveryGuarantee;importorg.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;importorg.apache.flink.connector.kafka.sink.KafkaSink;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.kafka.clients.producer.ProducerConfig;importjava.util.Properties;/**
  2. * @Author: J
  3. * @Version: 1.0
  4. * @CreateTime: 2023/6/29
  5. * @Description: 测试
  6. **/publicclassFlinkKafkaSink{publicstaticvoidmain(String[] args)throwsException{// 创建流环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1
  7. env.setParallelism(1);// 添加数据源(CustomizeSource为自定义数据源,便于测试)SingleOutputStreamOperator<String> mapStream = env.addSource(newCustomizeSource()).map(bean -> bean.toString());// 设置生产者事务超时时间Properties properties =newProperties();
  8. properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");// 构建KafkaSinkKafkaSink<String> kafkaSink =KafkaSink.<String>builder()// 配置Kafka服务.setBootstrapServers("lx01:9092")// 配置消息序列化类型.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()// 配置kafka topic信息.setTopic("tpc-02")// 配置value序列化类型.setValueSerializationSchema(newSimpleStringSchema()).build())// 设置语义.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)// 设置事务ID前缀.setTransactionalIdPrefix("JL-").build();// 将结果输出到kafka
  9. mapStream.sinkTo(kafkaSink);
  10. env.execute("Kafka Sink");}}

结果数据

  1. [root@lx01 bin]# ./kafka-console-consumer.sh --bootstrap-server lx01:9092 --topic tpc-02
  2. CustomizeBean(name=AAA-274, age=64, gender=W, hobbit=钓鱼爱好者)
  3. CustomizeBean(name=AAA-973, age=45, gender=W, hobbit=钓鱼爱好者)
  4. CustomizeBean(name=AAA-496, age=71, gender=W, hobbit=非遗文化爱好者)
  5. CustomizeBean(name=AAA-263, age=45, gender=M, hobbit=天文知识爱好者)
  6. CustomizeBean(name=AAA-790, age=77, gender=W, hobbit=书法爱好者)
  7. CustomizeBean(name=AAA-806, age=38, gender=M, hobbit=非遗文化爱好者)
  8. CustomizeBean(name=AAA-498, age=58, gender=M, hobbit=篮球运动爱好者)
  9. CustomizeBean(name=AAA-421, age=63, gender=M, hobbit=书法爱好者)
  10. CustomizeBean(name=AAA-938, age=56, gender=W, hobbit=乒乓球运动爱好者)
  11. CustomizeBean(name=AAA-278, age=18, gender=M, hobbit=乒乓球运动爱好者)
  12. CustomizeBean(name=AAA-614, age=74, gender=W, hobbit=钓鱼爱好者)
  13. CustomizeBean(name=AAA-249, age=67, gender=W, hobbit=天文知识爱好者)
  14. CustomizeBean(name=AAA-690, age=72, gender=W, hobbit=网吧战神)
  15. CustomizeBean(name=AAA-413, age=69, gender=M, hobbit=美食爱好者)
标签: flink kafka linq

本文转载自: https://blog.csdn.net/AnameJL/article/details/131923237
版权归原作者 飞天小老头 所有, 如有侵权,请联系我们删除。

“Flink之Kafka Sink”的评论:

还没有评论