0


Apache Flink简单示例以及连接kafka消费数据

在 Apache Flink 的 Local 模式下,通过 IDE(如 IntelliJ IDEA 或 Eclipse)来运行 Flink 程序非常方便。Local 模式指的是在本地环境下执行 Flink 程序,而不是在分布式集群中运行。下面是具体的步骤和一个简单的 Flink 示例:

1. 配置开发环境

  • 下载 Flink 依赖: 在 Maven 项目中,你需要在 pom.xml 文件中添加 Flink 依赖:<dependencies><!-- Apache Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.16.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.16.0</version></dependency></dependencies>

或者

<!-- Apache Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.6</version></dependency>
  • IDE 配置: 选择 IntelliJ IDEA 或 Eclipse,并确保项目的 JDK 和 Maven 正确配置。如果使用 IntelliJ IDEA,建议安装 Flink 插件来增强对 Flink 项目的支持。

2. Flink计算从socket输入的词频

2.1. 编写 Flink 程序

在 IDE 中创建一个 Java 类,编写一个简单的 Flink 程序,计算从 socket 输入的词频。

importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;publicclassWordCount{publicstaticvoidmain(String[] args)throwsException{// 获取执行环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 从 socket 读取数据 (假设本地 socket 服务运行在端口 9999 上)DataStream<String> text = env.socketTextStream("localhost",9999);// 分割字符串,计算单词出现次数DataStream<Tuple2<String,Integer>> counts = text
            .flatMap(newTokenizer()).keyBy(value -> value.f0).sum(1);// 打印结果
        counts.print();// 执行程序
        env.execute("Flink Streaming WordCount");}// Tokenizer: 将每行文本分割成单词publicstaticfinalclassTokenizerimplementsFlatMapFunction<String,Tuple2<String,Integer>>{@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out){// 按空格分割字符串String[] tokens = value.toLowerCase().split("\\W+");// 输出每个单词和1的Tuplefor(String token : tokens){if(token.length()>0){
                    out.collect(newTuple2<>(token,1));}}}}}

2.2. 运行 Flink 程序

  • 启动 socket 服务:在运行程序之前,打开终端并启动一个简单的 socket 服务:nc -lk 9999这将创建一个监听 9999 端口的 socket 服务,用于接收输入。
  • 运行 Flink 程序:在 IDE 中运行主类 WordCount,然后在终端的 socket 服务中输入一些文本。例如:hello flinkflink is awesomehello world
  • 查看输出:Flink 程序将计算词频并输出到控制台。例如:(hello,1)(flink,1)(is,1)(awesome,1)(hello,2)

3. Flink连接kafka从中消费数据

使用 Apache Flink 从 Kafka 中消费数据并进行处理是一个常见的场景,尤其是在实时流处理的应用中。下面是一个简单的 Flink 示例,从 Kafka 中读取数据,处理后输出到控制台。

3.1. 配置 Kafka 和 Flink

你需要确保 Kafka 已经在本地或服务器上运行,并且有一个 Kafka 主题准备好用于消费数据。

3.2. Flink 程序示例

importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importjava.util.Properties;publicclassKafkaFlinkExample{publicstaticvoidmain(String[] args)throwsException{// 创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties properties =newProperties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"flink-consumer-group");// 从 Kafka 读取数据,使用 SimpleStringSchema 解码器FlinkKafkaConsumer<String> kafkaConsumer =newFlinkKafkaConsumer<>("test-topic",// Kafka 主题名称newSimpleStringSchema(),
                properties
        );// 让 Kafka 从最新的 offset 开始消费
        kafkaConsumer.setStartFromLatest();// 添加 Kafka 作为数据源DataStream<String> stream = env.addSource(kafkaConsumer);// 简单处理:把每条消息转换为大写DataStream<String> processedStream = stream.map(String::toUpperCase);// 打印处理结果到控制台
        processedStream.print();// 启动程序
        env.execute("Flink Kafka Consumer Example");}}

3.3. 运行步骤

  1. 启动 Kafka:确保 Kafka 和 Zookeeper 服务已经启动。可以参考以下命令:# 启动 Zookeeperzookeeper-server-start.sh /usr/local/etc/kafka/zookeeper.properties# 启动 Kafkakafka-server-start.sh /usr/local/etc/kafka/server.properties
  2. 创建 Kafka 主题: 使用以下命令创建一个名为 test-topic 的 Kafka 主题:kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  3. 发送消息到 Kafka: 打开一个生产者终端,向 test-topic 发送一些测试消息:kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092输入一些消息,比如:hello flinkkafka integrationflink kafka example
  4. 运行 Flink 程序: 在 IDE 中运行 KafkaFlinkExample,你会看到程序从 Kafka 中消费的消息,并将其转换为大写后输出到控制台:HELLO FLINKKAFKA INTEGRATIONFLINK KAFKA EXAMPLE

小结

这个示例展示了如何使用 Flink 连接 Kafka,从中消费数据并进行简单的处理。你可以根据实际需求扩展处理逻辑,比如复杂的流计算、窗口操作等。

标签: apache flink kafka

本文转载自: https://blog.csdn.net/weixin_44594317/article/details/142868378
版权归原作者 weixin_44594317 所有, 如有侵权,请联系我们删除。

“Apache Flink简单示例以及连接kafka消费数据”的评论:

还没有评论