0


FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis

需求

在kafka发送plainText消息,以逗号分割。逗号前的作为key,逗号后的作为value。

然后把kafka发过来的东西以Redis的HashMap结构存入flink这个主Key中去。

进入开发

为了解决这个问题,我们需要在前两个的范围内解决掉以下三个问题:

  1. flink如何接入kafka
  2. flink如何不作统计(前两课我们用的是烂网上的wordcount例子)只接入流和折分
  3. flink如何sink到Redis

flink如何接入kafka

pom.xml

  1. <!-- redis特性-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-redis_2.11</artifactId>
  5. <version>1.1.5</version>
  6. <exclusions>
  7. <exclusion>
  8. <groupId>org.slf4j</groupId>
  9. <artifactId>slf4j-log4j12</artifactId>
  10. </exclusion>
  11. </exclusions>
  12. </dependency>
  13. <!-- kafka特性-->
  14. <!--kafka-->
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-connector-kafka</artifactId>
  18. <version>1.15.2</version>
  19. </dependency>

kafka在flink内核心API的用法

  1. KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
  2. .setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
  3. .setValueOnlyDeserializer(new SimpleStringSchema()).build();
  4. DataStream<String> testDataStreamSource =
  5. env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

以上代码相当的简单。

有一处需要注意,如果我把以上代码改成了如下那么它的效果就是每次这个flink应用重启,都会把kafka从test这个topic发过来的第一条消息全部重新读一遍,区别就在于这个“OffsetsInitializer.earliest()”。我们取的是最近一条kafka消息,因此我们才用了:OffsetsInitializer.latest()。

  1. KafkaSource<String> source =
  2. KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("test")
  3. .setGroupId("test01").setStartingOffsets(OffsetsInitializer.earliest())
  4. .setValueOnlyDeserializer(new SimpleStringSchema()).build();

flink如何不作统计(前两几篇我们用的是烂网上的wordcount例子)只接入流和折分数据

  1. DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());

我们接着来看LineSplitter这个类

LineSplitter.java

  1. /* 系统项目名称 com.aldi.flink.demo LineSplitter.java
  2. *
  3. * 2022年9月23日-下午4:31:36 2022XX公司-版权所有
  4. *
  5. */
  6. package com.aldi.com.cnflink.demo;
  7. import org.apache.flink.api.common.functions.FlatMapFunction;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10. /**
  11. *
  12. * LineSplitter
  13. *
  14. *
  15. * 2022年9月23日 下午4:31:36
  16. *
  17. * @version 1.0.0
  18. *
  19. */
  20. public class LineSplitter implements FlatMapFunction<String, Tuple2<String, String>> {
  21. public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception {
  22. String[] tokens = s.toLowerCase().split(",");
  23. if (tokens != null && tokens.length > 0) {
  24. collector.collect(new Tuple2<String, String>(tokens[0], tokens[1]));
  25. //System.out.println(">>>>>>key->" + tokens[0] + " value->" + tokens[1]+" into redis...");
  26. }
  27. }
  28. }

非常简单,只读入流核心起作用的就是这个collector.collect,看,它按照逗号对读入的流进行折分。

flink如何sink到Redis

  1. DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
  2. FlinkJedisPoolConfig conf =
  3. new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
  4. data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
  5. env.execute();

我们来看这边的SinkRedisMapper这个类

SinkRedisMapper.java

  1. /* 系统项目名称 com.aldi.flink.demo SinkRedisMapper.java
  2. *
  3. * 2022年9月23日-下午3:52:49 2022XX公司-版权所有
  4. *
  5. */
  6. package com.aldi.com.cnflink.demo;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  9. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  10. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  11. /**
  12. *
  13. * SinkRedisMapper
  14. *
  15. *
  16. * 2022年9月23日 下午3:52:49
  17. *
  18. * @version 1.0.0
  19. *
  20. */
  21. public class SinkRedisMapper implements RedisMapper<Tuple2<String, String>> {
  22. @Override
  23. public RedisCommandDescription getCommandDescription() {
  24. // hset
  25. return new RedisCommandDescription(RedisCommand.HSET, "flink");
  26. }
  27. @Override
  28. public String getKeyFromData(Tuple2<String, String> stringIntegerTuple2) {
  29. return stringIntegerTuple2.f0;
  30. }
  31. @Override
  32. public String getValueFromData(Tuple2<String, String> stringIntegerTuple2) {
  33. return stringIntegerTuple2.f1.toString();
  34. }
  35. }

它的作用就是使用Redis HashMap结构,把读入的流Sink到Redis里以flink这个key开头的内容中去。

所以整个SimpleKafka内容如下

完整SimpleKafka.java

  1. /* 系统项目名称 com.aldi.com.cnflink.demo SimpleKafka.java
  2. *
  3. * 2022年9月26日-下午12:28:31 2022XX公司-版权所有
  4. *
  5. */
  6. package com.aldi.com.cnflink.demo;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.connector.kafka.source.KafkaSource;
  9. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  10. import org.apache.flink.streaming.api.datastream.DataStream;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. import org.apache.flink.streaming.connectors.redis.RedisSink;
  13. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  14. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  15. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  16. /**
  17. *
  18. * SimpleKafka
  19. *
  20. *
  21. * 2022年9月26日 下午12:28:31
  22. *
  23. * @version 1.0.0
  24. *
  25. */
  26. public class SimpleKafka {
  27. /**
  28. * main(这里用一句话描述这个方法的作用) (这里描述这个方法适用条件 – 可选)
  29. *
  30. * @param args
  31. * void
  32. * @exception
  33. * @since 1.0.0
  34. */
  35. public static void main(String[] args) throws Exception {
  36. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  37. /**
  38. * OffsetsInitializer.earliest()会导至每次启动这个应用进行全量刷新kafka最早一条消息生成起至今
  39. */
  40. // KafkaSource<String> source =
  41. // KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("test")
  42. // .setGroupId("test01").setStartingOffsets(OffsetsInitializer.earliest())
  43. // .setValueOnlyDeserializer(new SimpleStringSchema()).build();
  44. /**
  45. * 因此以下使用OffsetsInitializer.latest(),这样只消息最近一条消息不会每次启动进行全量刷新
  46. */
  47. KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
  48. .setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
  49. .setValueOnlyDeserializer(new SimpleStringSchema()).build();
  50. DataStream<String> testDataStreamSource =
  51. env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  52. DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
  53. FlinkJedisPoolConfig conf =
  54. new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
  55. data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
  56. env.execute();
  57. }
  58. }

项目运行

第一步:把zk运行起来

第二步:把kafka运行起来

第三步:在kafka上创建一条command窗口的producer

  1. ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

第四步:把SimpleKafka运行起来

第五步:在kafka的producer内输入点东西如下:

然后在eclipse工程中我们看到了这样的内容

我们来看我们的Redis里

看,sink成功。

标签: kafka java flink

本文转载自: https://blog.csdn.net/lifetragedy/article/details/127139476
版权归原作者 TGITCIC 所有, 如有侵权,请联系我们删除。

“FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis”的评论:

还没有评论