需求
在kafka发送plainText消息,以逗号分割。逗号前的作为key,逗号后的作为value。
然后把kafka发过来的东西以Redis的HashMap结构存入flink这个主Key中去。
进入开发
为了解决这个问题,我们需要在前两个的范围内解决掉以下三个问题:
- flink如何接入kafka
- flink如何不作统计(前两课我们用的是烂网上的wordcount例子)只接入流和折分
- flink如何sink到Redis
flink如何接入kafka
pom.xml
<!-- redis特性-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- kafka特性-->
<!--kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
kafka在flink内核心API的用法
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
.setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema()).build();
DataStream<String> testDataStreamSource =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
以上代码相当的简单。
有一处需要注意,如果我把以上代码改成了如下那么它的效果就是每次这个flink应用重启,都会把kafka从test这个topic发过来的第一条消息全部重新读一遍,区别就在于这个“OffsetsInitializer.earliest()”。我们取的是最近一条kafka消息,因此我们才用了:OffsetsInitializer.latest()。
KafkaSource<String> source =
KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("test")
.setGroupId("test01").setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema()).build();
flink如何不作统计(前两几篇我们用的是烂网上的wordcount例子)只接入流和折分数据
DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
我们接着来看LineSplitter这个类
LineSplitter.java
/* 系统项目名称 com.aldi.flink.demo LineSplitter.java
*
* 2022年9月23日-下午4:31:36 2022XX公司-版权所有
*
*/
package com.aldi.com.cnflink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
*
* LineSplitter
*
*
* 2022年9月23日 下午4:31:36
*
* @version 1.0.0
*
*/
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, String>> {
public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception {
String[] tokens = s.toLowerCase().split(",");
if (tokens != null && tokens.length > 0) {
collector.collect(new Tuple2<String, String>(tokens[0], tokens[1]));
//System.out.println(">>>>>>key->" + tokens[0] + " value->" + tokens[1]+" into redis...");
}
}
}
非常简单,只读入流核心起作用的就是这个collector.collect,看,它按照逗号对读入的流进行折分。
flink如何sink到Redis
DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
FlinkJedisPoolConfig conf =
new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
env.execute();
我们来看这边的SinkRedisMapper这个类
SinkRedisMapper.java
/* 系统项目名称 com.aldi.flink.demo SinkRedisMapper.java
*
* 2022年9月23日-下午3:52:49 2022XX公司-版权所有
*
*/
package com.aldi.com.cnflink.demo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
*
* SinkRedisMapper
*
*
* 2022年9月23日 下午3:52:49
*
* @version 1.0.0
*
*/
public class SinkRedisMapper implements RedisMapper<Tuple2<String, String>> {
@Override
public RedisCommandDescription getCommandDescription() {
// hset
return new RedisCommandDescription(RedisCommand.HSET, "flink");
}
@Override
public String getKeyFromData(Tuple2<String, String> stringIntegerTuple2) {
return stringIntegerTuple2.f0;
}
@Override
public String getValueFromData(Tuple2<String, String> stringIntegerTuple2) {
return stringIntegerTuple2.f1.toString();
}
}
它的作用就是使用Redis HashMap结构,把读入的流Sink到Redis里以flink这个key开头的内容中去。
所以整个SimpleKafka内容如下
完整SimpleKafka.java
/* 系统项目名称 com.aldi.com.cnflink.demo SimpleKafka.java
*
* 2022年9月26日-下午12:28:31 2022XX公司-版权所有
*
*/
package com.aldi.com.cnflink.demo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
/**
*
* SimpleKafka
*
*
* 2022年9月26日 下午12:28:31
*
* @version 1.0.0
*
*/
public class SimpleKafka {
/**
* main(这里用一句话描述这个方法的作用) (这里描述这个方法适用条件 – 可选)
*
* @param args
* void
* @exception
* @since 1.0.0
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* OffsetsInitializer.earliest()会导至每次启动这个应用进行全量刷新kafka最早一条消息生成起至今
*/
// KafkaSource<String> source =
// KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("test")
// .setGroupId("test01").setStartingOffsets(OffsetsInitializer.earliest())
// .setValueOnlyDeserializer(new SimpleStringSchema()).build();
/**
* 因此以下使用OffsetsInitializer.latest(),这样只消息最近一条消息不会每次启动进行全量刷新
*/
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
.setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema()).build();
DataStream<String> testDataStreamSource =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
FlinkJedisPoolConfig conf =
new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
env.execute();
}
}
项目运行
第一步:把zk运行起来
第二步:把kafka运行起来
第三步:在kafka上创建一条command窗口的producer
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
第四步:把SimpleKafka运行起来
第五步:在kafka的producer内输入点东西如下:
然后在eclipse工程中我们看到了这样的内容
我们来看我们的Redis里
看,sink成功。
版权归原作者 TGITCIC 所有, 如有侵权,请联系我们删除。