在 Apache Flink 的 Local 模式下,通过 IDE(如 IntelliJ IDEA 或 Eclipse)来运行 Flink 程序非常方便。Local 模式指的是在本地环境下执行 Flink 程序,而不是在分布式集群中运行。下面是具体的步骤和一个简单的 Flink 示例:
1. 配置开发环境
- 下载 Flink 依赖: 在 Maven 项目中,你需要在
pom.xml
文件中添加 Flink 依赖:<dependencies><!-- Apache Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.16.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.16.0</version></dependency></dependencies>
或者
<!-- Apache Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.6</version></dependency>
- IDE 配置: 选择 IntelliJ IDEA 或 Eclipse,并确保项目的 JDK 和 Maven 正确配置。如果使用 IntelliJ IDEA,建议安装 Flink 插件来增强对 Flink 项目的支持。
2. Flink计算从socket输入的词频
2.1. 编写 Flink 程序
在 IDE 中创建一个 Java 类,编写一个简单的 Flink 程序,计算从 socket 输入的词频。
importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;publicclassWordCount{publicstaticvoidmain(String[] args)throwsException{// 获取执行环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 从 socket 读取数据 (假设本地 socket 服务运行在端口 9999 上)DataStream<String> text = env.socketTextStream("localhost",9999);// 分割字符串,计算单词出现次数DataStream<Tuple2<String,Integer>> counts = text
.flatMap(newTokenizer()).keyBy(value -> value.f0).sum(1);// 打印结果
counts.print();// 执行程序
env.execute("Flink Streaming WordCount");}// Tokenizer: 将每行文本分割成单词publicstaticfinalclassTokenizerimplementsFlatMapFunction<String,Tuple2<String,Integer>>{@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out){// 按空格分割字符串String[] tokens = value.toLowerCase().split("\\W+");// 输出每个单词和1的Tuplefor(String token : tokens){if(token.length()>0){
out.collect(newTuple2<>(token,1));}}}}}
2.2. 运行 Flink 程序
- 启动 socket 服务:在运行程序之前,打开终端并启动一个简单的 socket 服务:
nc -lk 9999
这将创建一个监听 9999 端口的 socket 服务,用于接收输入。 - 运行 Flink 程序:在 IDE 中运行主类
WordCount
,然后在终端的 socket 服务中输入一些文本。例如:hello flinkflink is awesomehello world
- 查看输出:Flink 程序将计算词频并输出到控制台。例如:
(hello,1)(flink,1)(is,1)(awesome,1)(hello,2)
3. Flink连接kafka从中消费数据
使用 Apache Flink 从 Kafka 中消费数据并进行处理是一个常见的场景,尤其是在实时流处理的应用中。下面是一个简单的 Flink 示例,从 Kafka 中读取数据,处理后输出到控制台。
3.1. 配置 Kafka 和 Flink
你需要确保 Kafka 已经在本地或服务器上运行,并且有一个 Kafka 主题准备好用于消费数据。
3.2. Flink 程序示例
importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importjava.util.Properties;publicclassKafkaFlinkExample{publicstaticvoidmain(String[] args)throwsException{// 创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties properties =newProperties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"flink-consumer-group");// 从 Kafka 读取数据,使用 SimpleStringSchema 解码器FlinkKafkaConsumer<String> kafkaConsumer =newFlinkKafkaConsumer<>("test-topic",// Kafka 主题名称newSimpleStringSchema(),
properties
);// 让 Kafka 从最新的 offset 开始消费
kafkaConsumer.setStartFromLatest();// 添加 Kafka 作为数据源DataStream<String> stream = env.addSource(kafkaConsumer);// 简单处理:把每条消息转换为大写DataStream<String> processedStream = stream.map(String::toUpperCase);// 打印处理结果到控制台
processedStream.print();// 启动程序
env.execute("Flink Kafka Consumer Example");}}
3.3. 运行步骤
- 启动 Kafka:确保 Kafka 和 Zookeeper 服务已经启动。可以参考以下命令:
# 启动 Zookeeperzookeeper-server-start.sh /usr/local/etc/kafka/zookeeper.properties# 启动 Kafkakafka-server-start.sh /usr/local/etc/kafka/server.properties
- 创建 Kafka 主题: 使用以下命令创建一个名为
test-topic
的 Kafka 主题:kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- 发送消息到 Kafka: 打开一个生产者终端,向
test-topic
发送一些测试消息:kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
输入一些消息,比如:hello flinkkafka integrationflink kafka example
- 运行 Flink 程序: 在 IDE 中运行
KafkaFlinkExample
,你会看到程序从 Kafka 中消费的消息,并将其转换为大写后输出到控制台:HELLO FLINKKAFKA INTEGRATIONFLINK KAFKA EXAMPLE
小结
这个示例展示了如何使用 Flink 连接 Kafka,从中消费数据并进行简单的处理。你可以根据实际需求扩展处理逻辑,比如复杂的流计算、窗口操作等。
版权归原作者 weixin_44594317 所有, 如有侵权,请联系我们删除。