【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive
需求描述:
1、数据从 Kafka 写入 Hive。
2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。
3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。
4、Flink 集成 Kafka 写入 Hive 需要进行 checkpoint 才能落盘至 HDFS。
5、先在 Hive 中创建表然后动态获取 Hive 的表结构。
6、Kafka 数据为 Json 格式,通过 FlatMap 扁平化处理后,根据表结构封装到 Row 中后完成写入。
7、写入时转换成临时视图模式,利用 Flink-Sql 实现数据写入。
8、本地测试时 Hive 相关文件要放置到 resources 目录下。
9、本地测试时可以编辑 resources.flink_backup_local.yml 通过 ConfigTools.initConf 方法获取配置。
1)导入相关依赖
这里的依赖比较冗余,大家可以根据各自需求做删除或保留。
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>example.cn.test</groupId><artifactId>kafka2hive</artifactId><version>1.0.0</version><properties><hbase.version>2.3.3</hbase.version><hadoop.version>3.1.1</hadoop.version><spark.version>3.0.2</spark.version><scala.version>2.12.10</scala.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.0</flink.version><scala.binary.version>2.12</scala.binary.version><target.java.version>1.8</target.java.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.2</log4j.version><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.2</hive.version></properties><dependencies><dependency><groupId>gaei.cn.x5l.bigdata.common</groupId><artifactId>x5l-bigdata-common</artifactId><version>1.1-SNAPSHOT</version><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-dist --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-dist_2.12</artifactId><version>1.14.0-csa1.7.0.0</version><scope>provided</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.jyaml</groupId><artifactId>jyaml</artifactId><version>1.3</version></dependency><dependency><groupId>gaei.cn.x5l</groupId><artifactId>tsp-gb-decode</artifactId><version>1.0.0</version><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion></exclusions></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version><scope>runtime</scope></dependency><dependency><groupId>gaei.cn.x5l.flink.common</groupId><artifactId>x5l-flink-common</artifactId><version>1.2-SNAPSHOT</version><scope>compile</scope><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-1.2-api</artifactId></exclusion></exclusions></dependency><!-- Flink Dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.14.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-3</artifactId><version>3.1.1.7.2.8.0-224-9.0</version><scope>provided</scope><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.10</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.11.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 基础依赖 开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 基础依赖 结束--><!-- TABLE 开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>1.14.0</version><scope>provided</scope></dependency><!-- 使用 hive sql时注销,其他时候可以放开 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- TABLE 结束--><!-- sql 开始--><!-- sql解析 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- sql解析 结束 --><!-- sql连接 kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- sql 结束--><!-- 检查点 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 本地监控任务 结束 --><!-- DataStream 开始 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!-- hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version></dependency><!-- 重点,容易被忽略的jar --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><!-- rocksdb_2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 其他 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><!-- kafka2mongo 离线任务 --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>org.apache.flink:flink-runtime-web_2.11</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass></transformer><!-- flink sql 需要 --><!-- The service transformer is needed to merge META-INF/services files --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build><repositories><repository><id>cdh.releases.repo</id><url>https://repository.cloudera.com/artifactory/libs-release-local/</url><name>Releases Repository</name></repository></repositories></project>
2)代码实现
2.1.resources
2.1.1.appconfig.yml
mysql.url:"jdbc:mysql://1.1.1.1:3306/test?useSSL=false"mysql.username:"test"mysql.password:"123456"mysql.driver:"com.mysql.jdbc.Driver"
2.1.2.log4j.properties
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
2.1.3.log4j2.xml
<?xml version="1.0"encoding="UTF-8"?><configuration monitorInterval="5"><Properties><property name="LOG_PATTERN"value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><property name="LOG_LEVEL"value="ERROR" /></Properties><appenders><console name="console"target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}"onMatch="ACCEPT"onMismatch="DENY"/></console><File name="log"fileName="tmp/log/job.log"append="false"><PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/></File></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="console"/><appender-ref ref="log"/></root></loggers></configuration>
2.1.4.flink_backup_local.yml
hdfs:checkPointPath:'hdfs://nameserver/user/flink/rocksdbcheckpoint'checkpointTimeout:360000checkpointing:300000maxConcurrentCheckpoints:1minPauseBetweenCheckpoints:10000restartInterval:60restartStrategy:3hive:defaultDatabase:'ods'hiveConfDir:'D:/WorkSpace/bigdata-flink-backup/kafka2hive/src/main/resources/'sourceTopic:'topicA,topicB'tableName:'table_name'kafka-consumer:prop:auto.offset.reset:'earliest'bootstrap.servers:'kfk01:9092,kfk02:9092,kfk03:9092'enable.auto.commit:'false'fetch.max.bytes:'52428700'group.id:'test'isKerberized:'1'keytab:'D:/keytab/test.keytab'krb5Conf:'D:/keytab/krb5.conf'max.poll.interval.ms:'300000'max.poll.records:'1000'principal:'[email protected]'security_protocol:'SASL_PLAINTEXT'serviceName:'kafka'session.timeout.ms:'600000'useTicketCache:'false'topics:'topicA,topicB'kafka-producer:defaultTopic:'kafka2hive_error'prop:acks:'all'batch.size:'1048576'bootstrap.servers:'kfk01:9092,kfk02:9092,kfk03:9092'compression.type:'lz4'key.serializer:'org.apache.kafka.common.serialization.StringSerializer'retries:'3'value.serializer:'org.apache.kafka.common.serialization.StringSerializer'
2.2.utils
2.2.1.DBConn
importjava.sql.*;publicclassDBConn{privatestaticfinalString driver ="com.mysql.jdbc.Driver";//mysql驱动privatestaticConnection conn =null;privatestaticPreparedStatement ps =null;privatestaticResultSet rs =null;privatestaticfinalCallableStatement cs =null;/**
* 连接数据库
* @return
*/publicstaticConnectionconn(String url,String username,String password){Connection conn =null;try{Class.forName(driver);//加载数据库驱动try{
conn =DriverManager.getConnection(url, username, password);//连接数据库}catch(SQLException e){
e.printStackTrace();}}catch(ClassNotFoundException e){
e.printStackTrace();}return conn;}/**
* 关闭数据库链接
* @return
*/publicstaticvoidclose(){if(conn !=null){try{
conn.close();//关闭数据库链接}catch(SQLException e){
e.printStackTrace();}}}}
2.2.2.CommonUtils
@Slf4jpublicclassCommonUtils{publicstaticStreamExecutionEnvironmentsetCheckpoint(StreamExecutionEnvironment env)throwsIOException{// ConfigTools.initConf("local");Map hdfsMap =(Map)ConfigTools.mapConf.get("hdfs");
env.enableCheckpointing(((Integer) hdfsMap.get("checkpointing")).longValue(),CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(((Integer) hdfsMap.get("minPauseBetweenCheckpoints")).longValue());
env.getCheckpointConfig().setCheckpointTimeout(((Integer) hdfsMap.get("checkpointTimeout")).longValue());
env.getCheckpointConfig().setMaxConcurrentCheckpoints((Integer) hdfsMap.get("maxConcurrentCheckpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart((Integer) hdfsMap.get("restartStrategy"),// 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次Time.of(((Integer) hdfsMap.get("restartInterval")).longValue(),TimeUnit.SECONDS)// 延时));//设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);//设置状态后端存储方式
env.setStateBackend(newRocksDBStateBackend((String) hdfsMap.get("checkPointPath"),true));// env.setStateBackend(new FsStateBackend((String) hdfsMap.get("checkPointPath"), true));// env.setStateBackend(new HashMapStateBackend(());return env;}publicstaticFlinkKafkaConsumer<ConsumerRecord<String,String>>getKafkaConsumer(Map<String,Object> kafkaConf)throwsIOException{String[] topics =((String) kafkaConf.get("topics")).split(",");
log.info("监听的topic: {}", topics);Properties properties =newProperties();Map<String,String> kafkaProp =(Map<String,String>) kafkaConf.get("prop");for(String key : kafkaProp.keySet()){
properties.setProperty(key, kafkaProp.get(key).toString());}if(!StringUtils.isBlank((String) kafkaProp.get("isKerberized"))&&"1".equals(kafkaProp.get("isKerberized"))){System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));
properties.put("security.protocol", kafkaProp.get("security_protocol"));
properties.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required "+"useTicketCache="+ kafkaProp.get("useTicketCache")+" "+"serviceName=\""+ kafkaProp.get("serviceName")+"\" "+"useKeyTab=true "+"keyTab=\""+ kafkaProp.get("keytab").toString()+"\" "+"principal=\""+ kafkaProp.get("principal").toString()+"\";");}
properties.put("key.serializer","org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put("value.serializer","org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");FlinkKafkaConsumer<ConsumerRecord<String,String>> consumerRecordFlinkKafkaConsumer =newFlinkKafkaConsumer<ConsumerRecord<String,String>>(Arrays.asList(topics),newKafkaDeserializationSchema<ConsumerRecord<String,String>>(){@OverridepublicTypeInformation<ConsumerRecord<String,String>>getProducedType(){returnTypeInformation.of(newTypeHint<ConsumerRecord<String,String>>(){});}@OverridepublicbooleanisEndOfStream(ConsumerRecord<String,String> stringStringConsumerRecord){returnfalse;}@OverridepublicConsumerRecord<String,String>deserialize(ConsumerRecord<byte[],byte[]> record)throwsException{returnnewConsumerRecord<String,String>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),newString(record.key()==null?"".getBytes(StandardCharsets.UTF_8): record.key(),StandardCharsets.UTF_8),newString(record.value()==null?"{}".getBytes(StandardCharsets.UTF_8): record.value(),StandardCharsets.UTF_8));}}, properties);return consumerRecordFlinkKafkaConsumer;}}
2.3.conf
2.3.1.ConfigTools
@Slf4jpublicclassConfigTools{publicstaticMap<String,Object> mapConf;/**
* 获取对应的配置文件
*
* @param option
*/publicstaticvoidinitConf(String option){String confFile ="/flink_backup_"+ option +".yml";try{InputStream dumpFile =ConfigTools.class.getResourceAsStream(confFile);
mapConf =Yaml.loadType(dumpFile,HashMap.class);}catch(Exception e){
e.printStackTrace();}}/**
* 获取对应的配置文件
*
* @param option
*/publicstaticvoidinitMySqlConf(String option,Class clazz){String className = clazz.getName();String confFile ="/appconfig.yml";Map<String,String> mysqlConf;try{InputStream dumpFile =ConfigTools.class.getResourceAsStream(confFile);
mysqlConf =Yaml.loadType(dumpFile,HashMap.class);String username = mysqlConf.get("mysql.username");String password = mysqlConf.get("mysql.password");String url = mysqlConf.get("mysql.url");Connection conn =DBConn.conn(url, username, password);Map<String,Object> config =getConfig(conn, className, option);if(config ==null|| config.size()==0){
log.error("获取配置文件失败");return;}
mapConf = config;}catch(Exception e){
e.printStackTrace();}}privatestaticMap<String,Object>getConfig(Connection conn,String className,String option)throwsSQLException{PreparedStatement preparedStatement =null;try{String sql ="select config_context from app_config where app_name = '%s' and config_name = '%s'";
preparedStatement = conn.prepareStatement(String.format(sql, className, option));ResultSet rs = preparedStatement.executeQuery();Map<String,String> map =newLinkedHashMap<>();String config_context ="";while(rs.next()){
config_context = rs.getString("config_context");}System.out.println("配置信息config_context:"+config_context);// if(StringUtils.isNotBlank(config_context)){// System.out.println(JSONObject.toJSONString(JSONObject.parseObject(config_context), SerializerFeature.PrettyFormat));// }Map<String,Object> mysqlConfMap =JSON.parseObject(config_context,Map.class);return mysqlConfMap;}finally{if(preparedStatement !=null){
preparedStatement.close();}if(conn !=null){
conn.close();}}}publicstaticvoidmain(String[] args){// initMySqlConf("local", TboxPeriodBackoutA3K.class);initConf("local");String s =JSON.toJSONString(mapConf);System.out.println(s);}}
2.4.po
2.4.1.SchemaPo
/**
* 字段属性对象
*/@Data@AllArgsConstructor@NoArgsConstructorpublicclassSchemaPoimplementsSerializable{privateString signal;privateString type;}
2.5.kafka2hive
2.5.1.Kafka2Hive-ODS
从 Kafka 中获取到的数据不做任何处理直接写入到 Hive 的 ODS 层
publicclassKafka2Hive_ODS{publicstaticLogger logger =Logger.getLogger(Kafka2Hive_ODS.class);publicstaticvoidmain(String[] args)throwsException{ConfigTools.initMySqlConf(args[0],AcpBackoutAll_X9E_ORIGINAL.class);Map<String,Object> mapConf =ConfigTools.mapConf;Map<String,Object> kafkaConsumerConf =(Map<String,Object>) mapConf.get("kafka-consumer");Map<String,Object> hiveConf =(Map<String,Object>) mapConf.get("hive");StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();CommonUtils.setCheckpoint(env);EnvironmentSettings fsSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env, fsSettings);StreamStatementSet statementSet = tableEnv.createStatementSet();// 使用Hive的sql方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);//自定义一个名字(没有限制随意取)String name ="myhive";//这里需要配置hive表中的默认库(不是mysql的hive元数据库)String defaultDatabase ="ods";//hive-site.xml文件目录String hiveConfDir =(String) hiveConf.get("hiveConfDir");//创建HiveCatalogHiveCatalog hive =newHiveCatalog(name, defaultDatabase, hiveConfDir,"3.1.2");//注册HiveCatalog
tableEnv.registerCatalog("myhive", hive);//使用HiveCatalog
tableEnv.useCatalog("myhive");// FlinkKafkaConsumer<String> myConsumer = CommonUtils.getKafkaConsumer();FlinkKafkaConsumer<ConsumerRecord<String,String>> myConsumer =CommonUtils.getKafkaConsumer(kafkaConsumerConf);DataStream<ConsumerRecord<String,String>> stream = env.addSource(myConsumer);String tableName =(String) hiveConf.get("tableName");List<FieldSchema> schemas = hive.getHiveTable(newObjectPath(defaultDatabase, tableName)).getSd().getCols();List<FieldSchema> partitionKeys = hive.getHiveTable(newObjectPath(defaultDatabase, tableName)).getPartitionKeys();
schemas.addAll(partitionKeys);List<SchemaPo> schemaPos =newArrayList<>();List<String> fieldLists =newArrayList<>();List<TypeInformation> typeList =newArrayList<>();for(FieldSchema schema : schemas){SchemaPo schemaPo =newSchemaPo();
schemaPo.setSignal(schema.getName());
schemaPo.setType(schema.getType());
schemaPos.add(schemaPo);
fieldLists.add(schema.getName());String type = schema.getType();if(type.equalsIgnoreCase("bigint")){
typeList.add(Types.LONG);}else{
typeList.add(Types.STRING);}}String[] fieldNames = fieldLists.toArray(newString[fieldLists.size()]);TypeInformation[] types = typeList.toArray(newTypeInformation[typeList.size()]);SingleOutputStreamOperator<Row> originalRow = stream.flatMap(newKafkaMsgFormatFunction(schemaPos),newRowTypeInfo(types, fieldNames)).uid("ORIGINAL");
tableEnv.createTemporaryView("originalRow", originalRow);StringBuilder sql =newStringBuilder();
tableEnv.executeSql("alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore')");
sql.append("insert into `table_name` select ");
sql.append(" * ");
sql.append(" from `myhive`.`ods`.originalRow");
tableEnv.executeSql(sql.toString());// env.execute();}staticclassKafkaMsgFormatFunctionextendsRichFlatMapFunction<ConsumerRecord<String,String>,Row>{privateList<SchemaPo> schemaPos;publicKafkaMsgFormatFunction(List<SchemaPo> schemaPos){this.schemaPos = schemaPos;}@Overridepublicvoidopen(Configuration parameters){}@OverridepublicvoidflatMap(ConsumerRecord<String,String> record,Collector<Row> out){String key =null;try{HashMap<String,Object> infoMap =JSON.parseObject((String) record.value(),HashMap.class);for(String signalkey : infoMap.keySet()){
resultMap.put(signalkey.toLowerCase(),String.valueOf(infoMap.get(signalkey)));}Row row =newRow(schemaPos.size());for(int i =0; i < schemaPos.size(); i++){SchemaPo schemaPo = schemaPos.get(i);String v = resultMap.get(schemaPo.getSignal());if(StringUtils.isBlank(v)){
row.setField(i,null);continue;}if("bigint".equalsIgnoreCase(schemaPo.getType())){Long svalue =Long.valueOf(resultMap.get(schemaPo.getSignal()));
row.setField(i, svalue);}else{String svalue = resultMap.get(schemaPo.getSignal());
row.setField(i, svalue);}}
out.collect(row);}catch(Exception e){
e.printStackTrace();}}}}
版权归原作者 bmyyyyyy 所有, 如有侵权,请联系我们删除。