Flink-Exactly-once系列实践-KafkaToKafka
文章目录
一、Kafka输入输出流工具类
代码如下(示例):
//获取kafkaStream流publicstatic<T>DataStream<T>getKafkaDataStream(ParameterTool parameterTool,Class<?extendsDeserializationSchema> clazz,StreamExecutionEnvironment env)throwsIllegalAccessException,InstantiationException{//加入到flink的环境全局配置中,后续可以通过上下文获取该工具类,总而得到想要的值
env.getConfig().setGlobalJobParameters(parameterTool);//kafka配置项Properties properties =newProperties();
properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
properties.setProperty("group.id",parameterTool.get("group.idsource"));
properties.setProperty("auto.offset.reset",parameterTool.get("auto.offset.reset"));
properties.setProperty("enable.auto.commit",parameterTool.get("enable.auto.commit",String.valueOf(false)));String topics = parameterTool.get("Consumertopics");//序列化类实例化DeserializationSchema<T> deserializationSchema = clazz.newInstance();FlinkKafkaConsumer<T> flinkKafkaConsumer =newFlinkKafkaConsumer<>(topics, deserializationSchema, properties);
flinkKafkaConsumer.setStartFromEarliest();//开启kafka的offset与checkpoint绑定
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);return env.addSource(flinkKafkaConsumer);}//获取kafka生产者通用方法/**
* offsets.topic.replication.factor 用于配置offset记录的topic的partition的副本个数
* transaction.state.log.replication.factor 事务主题的复制因子
* transaction.state.log.min.isr 覆盖事务主题的min.insync.replicas配置
*
* num.partitions 新建Topic时默认的分区数
*
* default.replication.factor 自动创建topic时的默认副本的个数
*
*
*
* 注意:这些参数,设置得更高以确保高可用性!
*
* 其中 default.replication.factor 是真正决定,topi的副本数量的
* @param parameterTool
* @param kafkaSerializationSchema
* @param <T>
* @return
*/publicstatic<T>FlinkKafkaProducer<T>getFlinkKafkaProducer(ParameterTool parameterTool,KafkaSerializationSchema<T> kafkaSerializationSchema){Properties properties =newProperties();
properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
properties.setProperty("group.id",parameterTool.get("group.idsink"));// properties.setProperty("transaction.max.timeout.ms",parameterTool.get("transaction.max.timeout.ms"));
properties.setProperty("transaction.timeout.ms",parameterTool.get("transaction.timeout.ms"));
properties.setProperty("client.id","flinkOutputTopicClient");String topics = parameterTool.get("Producetopice");returnnewFlinkKafkaProducer<T>(topics,kafkaSerializationSchema,properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}
注意点事项
一、消费者注意项
1.flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true),将kafka自动提交offset关闭并且与flink的CheckPoint绑定。
2.bootstrap.servers kafka的broker host
3.setStartFromEarliest()设置kafka的消息消费从最初位置开始
二、生产者注意项
1.transaction.timeout.ms 默认情况下Kafka Broker 将transaction.max.timeout.ms设置为15分钟,我们需要将此值设置低于15分钟
2.FlinkKafkaProducer.Semantic.EXACTLY_ONCE设置kafka为精确一次
二、统计字符个数案例
代码如下(示例):
publicstaticvoidmain(String[] args)throwsException{//1.创建流式执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//2.设置并行度
env.setParallelism(4);//3.设置CK和状态后端CkAndStateBacked.setCheckPointAndStateBackend(env,"FS");//4.获取kafkaStream流InputStream kafkaPropertiesStream =KafkaToKafkaExacitly.class.getClassLoader().getResourceAsStream("kafka.properties");ParameterTool parameterTool=ParameterTool.fromPropertiesFile(kafkaPropertiesStream);//将配置流放到全局flink运行时环境
env.getConfig().setGlobalJobParameters(parameterTool);SimpleStringSchema simpleStringSchema =newSimpleStringSchema();Class<?extendsSimpleStringSchema> stringSchemaClass = simpleStringSchema.getClass();DataStream<String> kafkaDataStream =KafkaUtil.getKafkaDataStream(parameterTool, stringSchemaClass, env);System.out.println("==================================================");
kafkaDataStream.print();//5.map包装成value,1SingleOutputStreamOperator<Tuple2<String,Integer>> tupleStream = kafkaDataStream.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{if("error".equals(value)){thrownewRuntimeException("发生异常!!!");}returnnewTuple2<>(value,1);}});
tupleStream.print();//6.按照value进行分组,并且统计value的个数SingleOutputStreamOperator<Tuple2<String,Integer>> reduceStream = tupleStream.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){@OverridepublicStringgetKey(Tuple2<String,Integer> value)throwsException{return value.f0;}}).reduce(newReduceFunction<Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>reduce(Tuple2<String,Integer> value1,Tuple2<String,Integer> value2)throwsException{returnnewTuple2<>(value1.f0, value1.f1 + value2.f1);}});System.out.println("=====================================================");
reduceStream.print();//7.将数据输出到kafkaFlinkKafkaProducer<Tuple2<String,Integer>> flinkKafkaProducer =KafkaUtil.getFlinkKafkaProducer(parameterTool,newKafkaSerializationSchema<Tuple2<String,Integer>>(){@Overridepublicvoidopen(SerializationSchema.InitializationContext context)throwsException{System.out.println("=========正在向KafkaProduce输出数据!!!=============");}@OverridepublicProducerRecord<byte[],byte[]>serialize(Tuple2<String,Integer> element,@NullableLong timestamp){String producetopics = parameterTool.get("Producetopice");String result = element.toString();returnnewProducerRecord<byte[],byte[]>(producetopics, result.getBytes(StandardCharsets.UTF_8));}});
reduceStream.addSink(flinkKafkaProducer).name("kafkasinktest").uid("kafkasink");//任务执行
env.execute("KafkaToKafkaTest");}
注意事项:
这里使用的是本地FSstateBackend,注意你的路径的设置,以hdfs://或者file://为地址标识符,否则Flink的文件系统将无法识别。
三、消费者消费kafka的事务数据
ublic staticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Properties sourceProperties =newProperties();
sourceProperties.setProperty("bootstrap.servers","*****");
sourceProperties.setProperty("group.id","****");//端到端一致性:消费数据时需要配置isolation.level=read_committed(默认值为read_uncommitted)
sourceProperties.put("isolation.level","read_committed");FlinkKafkaConsumer<String>ConsumerKafka=newFlinkKafkaConsumer<>("*****",newSimpleStringSchema(), sourceProperties);ConsumerKafka.setStartFromEarliest();DataStreamSource<String> dataStreamSource = env.addSource(ConsumerKafka);
dataStreamSource.print();
env.execute();}
isolation.level这里设置为read_committed(默认为read_uncommitted)
这里可以看到以你CheckPoint设置的时间,来批量展示kafka生产者的消息。
总结与可能出现的问题
以上是flink 实现kafka的精确一次的测试例子,这里还有一点要注意,就是小伙伴们的kafka的配置里面
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
default.replication.factor=1
这四个参数里面default.replication.factor是你kafka真正每个topic的副本数量,但是在开启事务也就是flink的addsink的时候会默认继承两阶段提交的方式,这里transaction.state.log.replication.factor一定要大于或者等于transaction.state.log.min.isr,否则你的kafka集群不满足事务副本复制的基本属性,会一直不成功,那么你的CheckPoint就会超时过期,从而导致任务的整体失败。
kafka集群第一次有消费者消费消息时会自动创建 __consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50,在开启事务性的情况下就会首先会获得一个全局的TransactionCoordinator id和transactional producer并且生成唯一的序列号等
类似于一下的例子来唯一标识当前事务的消息对应的offset,以及标识
[2022-03-24 21:07:40,022] INFO [TransactionCoordinator id=0] Initialized transactionalId Keyed Reduce -> (Sink: Print to Std. Out, Sink: kafkasinktest)-b0c5e26be6392399cc3c8a38581a81c2-8 with producerId 11101 and producer epoch 8 on partition __transaction_state-18 (kafka.coordinator.transaction.TransactionCoordinator)
当flink任务出现异常的情况下,kafka会把以及提交但是未标记可以消费的数据直接销毁,或者正常的情况下,会正式提交(本质是修改消息的标志位),之后对于消费者在开启isolation.level的时候就可以读取以及标记为可以读取的message!
版权归原作者 瘦瘦的肥羊 所有, 如有侵权,请联系我们删除。