综合案例
今天来实现一个综合案例:Flink从独立部署的Kafka读取数据流,处理后,通过自定义的Sink函数写入到MySQL中
视频
配置
参考
FLINK -1 WordCount
FLINK -2 读取Kafka
FLINK -3 写入MySQL
Kafka
部署一个Kafka服务,源源不断的向主题kafka发送数据
参考
SpringBoot整合Kafka
效果
创建数据表
SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS =0;-- ------------------------------ Table structure for user-- ----------------------------DROPTABLEIFEXISTS`user`;CREATETABLE`user`(`id`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`username`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`password`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`address`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`email`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`profile`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`birthday`dateNOTNULL,`register_day`dateNOTNULL,`login_day`dateNOTNULL,`status`intNOTNULL,`account`decimal(10,2)NOTNULL,`balance`decimal(10,2)NOTNULL,`age`intNOTNULL,`sex`intNOTNULL,`avatar`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`level`intNOTNULL)ENGINE=InnoDBCHARACTERSET= utf8mb4 COLLATE= utf8mb4_bin ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS =1;
Flink
思路:从Kafka源源不断获取的数据为User实体的JSON字符串,需要将JSON字符串解析成对象,然后通过自定义的Sink保存进MySQL。所以需要Kafka反序列化Schema,JSON转实体工具类,自定义Sink类,下面来依次看一下吧
User实体
packageorg.example.flink.user;importlombok.Data;importjava.math.BigDecimal;importjava.util.Date;@DatapublicclassUser{privateString id;privateString username;privateString password;privateString address;privateString email;privateString profile;privateDate birthday;privateDate registerDay;privateDate loginDay;privateInteger status;privateBigDecimal account;privateBigDecimal balance;privateInteger age;privateInteger sex;privateString avatar;privateInteger level;}
UserInfoSchema
主要是FlinkKafkaConsumer获取数据时反序列化为User对象
packageorg.example.flink.user;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.example.flink.util.JSONUtil;importjava.nio.charset.Charset;importjava.nio.charset.StandardCharsets;publicclassUserInfoSchemaimplementsKafkaDeserializationSchema<User>{publicstaticfinalCharset UTF_8 =StandardCharsets.UTF_8;privatestaticUserbuildMsg(String jsonString){User user =JSONUtil.toBean(jsonString,User.class);return user;}@OverridepublicbooleanisEndOfStream(User user){returnfalse;}@OverridepublicUserdeserialize(ConsumerRecord<byte[],byte[]> consumerRecord)throwsException{String value =newString(consumerRecord.value(), UTF_8.name());returnbuildMsg(value);}@OverridepublicTypeInformation<User>getProducedType(){returnTypeInformation.of(User.class);}}
自定义Sink
主要使用原生JDBC向MySQL数据库写入数据
packageorg.example.flink.user;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importjava.sql.Connection;importjava.sql.Date;importjava.sql.DriverManager;importjava.sql.PreparedStatement;publicclassUserSinkextendsRichSinkFunction<User>{Connection connection =null;PreparedStatement statement =null;@Overridepublicvoidopen(Configuration parameters)throwsException{String url ="jdbc:mysql://localhost:3306/flink?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false";
connection =DriverManager.getConnection(url,"root","root");
statement = connection.prepareStatement("replace into user (id,username,password,address,email,profile,birthday,register_day,login_day,status,account,balance,age,sex,avatar,`level`) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");}@Overridepublicvoidinvoke(User user,Context context)throwsException{//直接执行更新语句
statement.setString(1, user.getId());
statement.setString(2, user.getUsername());
statement.setString(3, user.getPassword());
statement.setString(4, user.getAddress());
statement.setString(5, user.getEmail());
statement.setString(6, user.getProfile());
statement.setDate(7,newDate(user.getBirthday().getTime()));
statement.setDate(8,newDate(user.getRegisterDay().getTime()));
statement.setDate(9,newDate(user.getLoginDay().getTime()));
statement.setInt(10, user.getStatus());
statement.setBigDecimal(11, user.getAccount());
statement.setBigDecimal(12, user.getBalance());
statement.setInt(13, user.getAge());
statement.setInt(14, user.getSex());
statement.setString(15, user.getAvatar());
statement.setInt(16, user.getLevel());
statement.execute();}@Overridepublicvoidclose()throwsException{
statement.close();
connection.close();}}
UserRunner
该类包含运行的主函数,运行时调用之前声明的类进行读取数据,处理数据,写入数据的操作
注:Flink为Maven项目
packageorg.example.flink.user;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importjava.util.Properties;publicclassUserRunner{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//kafka配置Properties properties =newProperties();
properties.setProperty("bootstrap.servers","121.5.160.142:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");//从Kafka获取数据流,这里使用UserInfoSchema反序列化数据DataStreamSource<User> stream = env.addSource(newFlinkKafkaConsumer<User>("kafka",newUserInfoSchema(),
properties
)).setParallelism(1);
stream.print("kafka").setParallelism(1);//写入MySQL
stream.addSink(newUserSink()).setParallelism(1);
env.execute();}}
总结
数据源源不断的写入MySQL数据库,实现了一个流式数据处理的总流程,当然这里也可以写入其他Flink支持写入的数据库
版权归原作者 BirdMan98 所有, 如有侵权,请联系我们删除。