0


Flink从Kafka读取数据流写入到MySQL

综合案例

今天来实现一个综合案例:Flink从独立部署的Kafka读取数据流,处理后,通过自定义的Sink函数写入到MySQL中

视频

请添加图片描述

配置

参考

FLINK -1 WordCount

FLINK -2 读取Kafka

FLINK -3 写入MySQL

Kafka

部署一个Kafka服务,源源不断的向主题kafka发送数据

参考

SpringBoot整合Kafka

效果

在这里插入图片描述

创建数据表

SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS =0;-- ------------------------------ Table structure for user-- ----------------------------DROPTABLEIFEXISTS`user`;CREATETABLE`user`(`id`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`username`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`password`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`address`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`email`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`profile`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`birthday`dateNOTNULL,`register_day`dateNOTNULL,`login_day`dateNOTNULL,`status`intNOTNULL,`account`decimal(10,2)NOTNULL,`balance`decimal(10,2)NOTNULL,`age`intNOTNULL,`sex`intNOTNULL,`avatar`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_bin NOTNULL,`level`intNOTNULL)ENGINE=InnoDBCHARACTERSET= utf8mb4 COLLATE= utf8mb4_bin ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS =1;

Flink

思路:从Kafka源源不断获取的数据为User实体的JSON字符串,需要将JSON字符串解析成对象,然后通过自定义的Sink保存进MySQL。所以需要Kafka反序列化Schema,JSON转实体工具类,自定义Sink类,下面来依次看一下吧

User实体

packageorg.example.flink.user;importlombok.Data;importjava.math.BigDecimal;importjava.util.Date;@DatapublicclassUser{privateString id;privateString username;privateString password;privateString address;privateString email;privateString profile;privateDate birthday;privateDate registerDay;privateDate loginDay;privateInteger status;privateBigDecimal account;privateBigDecimal balance;privateInteger age;privateInteger sex;privateString avatar;privateInteger level;}

UserInfoSchema

主要是FlinkKafkaConsumer获取数据时反序列化为User对象

packageorg.example.flink.user;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.example.flink.util.JSONUtil;importjava.nio.charset.Charset;importjava.nio.charset.StandardCharsets;publicclassUserInfoSchemaimplementsKafkaDeserializationSchema<User>{publicstaticfinalCharset UTF_8 =StandardCharsets.UTF_8;privatestaticUserbuildMsg(String jsonString){User user =JSONUtil.toBean(jsonString,User.class);return user;}@OverridepublicbooleanisEndOfStream(User user){returnfalse;}@OverridepublicUserdeserialize(ConsumerRecord<byte[],byte[]> consumerRecord)throwsException{String value =newString(consumerRecord.value(), UTF_8.name());returnbuildMsg(value);}@OverridepublicTypeInformation<User>getProducedType(){returnTypeInformation.of(User.class);}}

自定义Sink

主要使用原生JDBC向MySQL数据库写入数据

packageorg.example.flink.user;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importjava.sql.Connection;importjava.sql.Date;importjava.sql.DriverManager;importjava.sql.PreparedStatement;publicclassUserSinkextendsRichSinkFunction<User>{Connection connection =null;PreparedStatement statement =null;@Overridepublicvoidopen(Configuration parameters)throwsException{String url ="jdbc:mysql://localhost:3306/flink?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false";
        connection =DriverManager.getConnection(url,"root","root");
        statement = connection.prepareStatement("replace into user (id,username,password,address,email,profile,birthday,register_day,login_day,status,account,balance,age,sex,avatar,`level`) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");}@Overridepublicvoidinvoke(User user,Context context)throwsException{//直接执行更新语句
        statement.setString(1, user.getId());
        statement.setString(2, user.getUsername());
        statement.setString(3, user.getPassword());
        statement.setString(4, user.getAddress());
        statement.setString(5, user.getEmail());
        statement.setString(6, user.getProfile());
        statement.setDate(7,newDate(user.getBirthday().getTime()));
        statement.setDate(8,newDate(user.getRegisterDay().getTime()));
        statement.setDate(9,newDate(user.getLoginDay().getTime()));
        statement.setInt(10, user.getStatus());
        statement.setBigDecimal(11, user.getAccount());
        statement.setBigDecimal(12, user.getBalance());
        statement.setInt(13, user.getAge());
        statement.setInt(14, user.getSex());
        statement.setString(15, user.getAvatar());
        statement.setInt(16, user.getLevel());
        statement.execute();}@Overridepublicvoidclose()throwsException{
        statement.close();
        connection.close();}}

UserRunner

该类包含运行的主函数,运行时调用之前声明的类进行读取数据,处理数据,写入数据的操作

注:Flink为Maven项目

packageorg.example.flink.user;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importjava.util.Properties;publicclassUserRunner{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//kafka配置Properties properties =newProperties();
        properties.setProperty("bootstrap.servers","121.5.160.142:9092");
        properties.setProperty("group.id","consumer-group");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset","latest");//从Kafka获取数据流,这里使用UserInfoSchema反序列化数据DataStreamSource<User> stream = env.addSource(newFlinkKafkaConsumer<User>("kafka",newUserInfoSchema(),
                properties
        )).setParallelism(1);

        stream.print("kafka").setParallelism(1);//写入MySQL
        stream.addSink(newUserSink()).setParallelism(1);

        env.execute();}}

总结

数据源源不断的写入MySQL数据库,实现了一个流式数据处理的总流程,当然这里也可以写入其他Flink支持写入的数据库

在这里插入图片描述

标签: kafka mysql flink

本文转载自: https://blog.csdn.net/weixin_41405524/article/details/125766266
版权归原作者 BirdMan98 所有, 如有侵权,请联系我们删除。

“Flink从Kafka读取数据流写入到MySQL”的评论:

还没有评论