前言
近期开始学习Flink程序开发,使用java语言,此文以生产数据至kafka为例记录下遇到的问题以及代码实现,若有错误请提出。
一、版本
Flink版本:1.15.4
kafka版本:3.0.0
二、使用步骤
1.maven引入库
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <flink.version>1.15.4</flink.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j2.version>2.14.1</log4j2.version> </properties>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency>
2.上代码
以下代码将Flink环境初始化、配置、生产数据至kafka代码放在一个独立类的原因,主要是在我近期做的业务中需要有多个实例往kafka生产数据,后面两个启动类调用此类。
import com.alibaba.fastjson.JSON;
import com.dogame.data.report.comsume.bean.KafkaEnvParameter;
import com.dogame.data.report.comsume.bean.Order;
import com.dogame.data.report.comsume.utils.EnvironmentUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
public class WorkerService {
public void run (String[] args, String bizType) throws Exception {
StreamExecutionEnvironment ENV = EnvironmentUtils.initEnvironmentConfig(args, bizType);
DataStream<String> orderDataStream = ENV.addSource(new ParallelSourceFunction<String>() {
private boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
String [] channels = {"www.xxx.com","www.sdfs.com","www.qqq.com","www.dfff.com"};
while(running){
Order order = new Order();
order.setOrderId(String.valueOf(System.currentTimeMillis()+new Random().nextInt()));
order.setTime(System.currentTimeMillis());
order.setFee(1000* new Random().nextDouble());
order.setChannel(channels[new Random().nextInt(channels.length)]);
ctx.collect(JSON.toJSONString(order)); //发送出去
Thread.sleep(10);
}
}
@Override
public void cancel() {
running = false;
}
});
orderDataStream.print();
KafkaSink<String> sink = KafkaSink.<String>builder()
.setKafkaProducerConfig(KafkaEnvParameter.productProperties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(KafkaEnvParameter.responseTopics)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//.setTransactionalIdPrefix(UUID.randomUUID().toString())
.build();
orderDataStream.sinkTo(sink);
orderDataStream.print();
ENV.execute("SourceFunctionTest");
}
}
@Slf4j
public class SourceFunctionTest {
public static void main(String[] args) {
try {
String bizType = "t0";
WorkerService workerService = new WorkerService();
workerService.run(args, bizType);
} catch (Exception e) {
log.error(" fail ", e);
}
}
}
@Slf4j
public class SourceFunctionTest1 {
public static void main(String[] args) throws Exception {
try {
String bizType = "t1";
WorkerService workerService = new WorkerService();
workerService.run(args, bizType);
} catch (Exception e) {
log.error(" fail ", e);
}
}
}
需要强调一点的是:经过测试,发现以下问题:若只启动一个实例,代码能稳定运行,若启动两个以上实例,运行几分钟后会有程序报错,最终只有一个实例能持续正常运行,经过2天对这个问题的排查(问chatGPT结合相关代码走读),发现居然是KafkaSinkBuilder这个方法未调用(以上workerService类已注释,打开即可):
/**
* Sets the prefix for all created transactionalIds if {@link DeliveryGuarantee#EXACTLY_ONCE} is
* configured.
*
* <p>It is mandatory to always set this value with {@link DeliveryGuarantee#EXACTLY_ONCE} to
* prevent corrupted transactions if multiple jobs using the KafkaSink run against the same
* Kafka Cluster. The default prefix is {@link #transactionalIdPrefix}.
*
* <p>The size of the prefix is capped by {@link #MAXIMUM_PREFIX_BYTES} formatted with UTF-8.
*
* <p>It is important to keep the prefix stable across application restarts. If the prefix
* changes it might happen that lingering transactions are not correctly aborted and newly
* written messages are not immediately consumable until the transactions timeout.
*
* @param transactionalIdPrefix
* @return {@link KafkaSinkBuilder}
*/
public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefix) {
this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix");
checkState(
transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length
<= MAXIMUM_PREFIX_BYTES,
"The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
return this;
}
版权归原作者 橄榄丝 所有, 如有侵权,请联系我们删除。