前言
概述
这年头IT发展很快,稍不留神,Flink已经1.14.4了,Fine BI居然能做实时BI了。。。遂拿经典的Sougoulogs小项目练练手,体验下一步一个坑的快感。
得益于Flink1.14实现了API层面真正的流批一体,批处理也可以用流的方式实现,Kappa架构运维起来还是要比流批分离的Lambda架构容易很多。当然也有软件厂将原有的Spark与Streaming任务切换为离线跑Hive(当然是跑Map Reduce,都不会跑Tez),好腾出一些内存条给Flink集群跑实时流计算。各有各的想法。
本小项目主要是从csv读取Log,sink到MQ Kafka中,模拟数据的采集生成部分。之后从MQ读取数据,处理后sink到MySQL。最终使用Fine BI制作实时BI实现数据可视化。适合像笔者这样用过老版Flink,会SQL,又不咋会Echarts的学徒工学习。
效果预览
B站直达。
CDSN视频。
使用Flink+Kafka+FineBI练手搜狗日志BI小项目
组件版本
使用之前搭建好的USDP2.0集群。
Flink:手动更新的1.14.3;
Kafka:自带的2.0.1;还有自带的KafkaEagle;
FineBI:5.1个人免费版。
准备工作
Kafka建Topic
[root@zhiyong2 lib]# kafka-topics.sh --zookeeper zhiyong2:2181 --delete --topic sougoulogs
Topic sougoulogs is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --create --topic sougoulogs --replication-factor 3 --partitions 3
Created topic "sougoulogs".[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list
sougoulogs
使用Kafka
Kafka是实时MQ使用最多的组件,还有个Pulsar最近也流行起来了。对广大学徒工来说,有GUI还是要友好很多。
启动Kafka及KafkaEagle
USDP的Web UI直接启动即可,灰常方便。还可以打开可视化类的KafkaEagle:
http://zhiyong4:8048/
admin,123456
测试Kafka功能
可以直接在KafkaEagle的Mock中选取sougoulogs这个topic,然后造数据:
00:00:01123456[哈哈\]23 www.baidu.com
Send之后还可以在KSQL中写SQL看到数据:
在框子里敲:
select*from sougoulogs where`partition`in(0,1,2)limit5
当然也可以用命令行开启个Consumer查看数据:
[root@zhiyong2 ~]# kafka-console-consumer.sh --bootstrap-server zhiyong2:9092 --topic sougoulogs
00:00:01 123456[哈哈\]23 www.baidu.com
这年头大数据组件都SQL化了,实在是便宜了那些SQL Boy,只会SQL就能胜任数仓相关的岗位。主要还是数据源比较规整,都是结构化数据。处理半结构化、非结构化数据,或者搭建平台,只会SQL,技能栈就捉襟见肘了。
MySQL建库表
MySQL建的库表用于将运算后的数据展示到BI。
createdatabase sougoulogs;use sougoulogs;createtableifnotexists newscount (
name varchar(50)notnull,
count int(11)notnull);createtableifnotexists periodcount(
logtime varchar(50)notnull,
count int(11)notnull);
实时BI
总共3个组件,只用到2个表和3条SQL:
SELECT`name`,`count`FROM newscount ORDERby`count`descLIMIT10--newscountselectcount(1)from newscount --countsselect`logtime`,`count`from periodcount orderby logtime desclimit10--periodcount
新版本Fine BI有点像Tableau,对广大学徒工很友好,官网也有详细教程,不赘述。大概流程:
添加连接→添加实时数据源→添加数据集→配置组件绘制Dashboard。
哪怕不会写Spring和Echarts也不影响展示数据。
编写Flink程序
pom依赖
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>sougoulogs</artifactId><groupId>com.zhiyong</groupId><version>1.0.0</version></parent><modelVersion>4.0.0</modelVersion><!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url></repository></repositories><artifactId>flinkDemo</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.14.3</flink.version><scala.version>2.12.14</scala.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink操作hadoop--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-filesystems</artifactId><version>${flink.version}</version><type>pom</type></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version><!-- <version>1.14.4</version>--><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><!-- 可以使用Lombok的@注解--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.20</version></dependency><!-- MySQL驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 可以设置jar包的入口类(可选) --><!--<mainClass>com.aa.flink.StreamWordCount</mainClass>--></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>
Mock数据
packagecom.zhiyong.flinkDemo;importorg.apache.commons.lang3.StringUtils;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;importjava.io.BufferedReader;importjava.io.FileInputStream;importjava.io.InputStreamReader;/**
* @program: sougoulogs
* @description: 模拟生成数据的方法
* @author: zhiyong
* @create: 2022-04-13 18:01
**/publicclassMockData{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<String> data = env.addSource(newCsvSource());
data.print("自定义数据源产生的数据:");
env.execute("模拟数据");}staticclassCsvSourceextendsRichSourceFunction<String>{FileInputStream fis =null;InputStreamReader isr =null;BufferedReader br =null;String line =null;String csvPath =ConstConfig.csvPath;String lastStr ="";//上一条数据boolean needRun =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{while(needRun !=false){while(null!=(line = br.readLine())){//System.out.println("读取到数据" + line);
ctx.collect(line);if(StringUtils.isNoneBlank(lastStr)&& line.split(",")[0]!= lastStr.split(",")[0]){Thread.sleep((Long.parseLong(line.split(",")[0])-Long.parseLong(lastStr.split(",")[0]))*1000);}
lastStr = line;}}}@Overridepublicvoidcancel(){
needRun =false;}@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
fis =newFileInputStream(csvPath);
isr =newInputStreamReader(fis,"UTF-8");
br =newBufferedReader(isr);}@Overridepublicvoidclose()throwsException{super.close();{// 最后要关流,防止文件持续占用if(null!= br){
br.close();}if(null!= isr){
isr.close();}if(null!= fis){
fis.close();}}}}}
自定义数据源的目的是防止Flink按照批处理的模式,将CSV一次性全部读完,会自动退出,看不出流的过程。同时限速也能防止开发时出现内存溢出等资源不够的问题。
Sink数据到Kafka
packagecom.zhiyong.flinkDemo;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.connector.base.DeliveryGuarantee;importorg.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;importorg.apache.flink.connector.kafka.sink.KafkaSink;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importjava.util.Properties;/**
* @program: sougoulogs
* @description: 从Log读数据写入Kafka
* @author: zhiyong
* @create: 2022-04-13 12:02
**/publicclassFile2Kafka{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);String inputPath ="E:/study/flink/data/test1";//DataStreamSource<String> data = env.readTextFile(inputPath);//实际为批处理,跑完数据直接结束,太快DataStreamSource<String> data = env.addSource(newMockData.CsvSource());// 构建Kafka生产者Properties kafkaProperties =newProperties();
kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10+"");//需要<15分钟KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")//解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)..setKafkaProducerConfig(kafkaProperties)//.setTransactionalIdPrefix("").setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("sougoulogs").setValueSerializationSchema(newSimpleStringSchema()).build()).build();
data.sinkTo(kafkaSink);
env.execute("File2Kafka");}}
Sink数据到MySQL
packagecom.zhiyong.flinkDemo;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.connector.kafka.source.KafkaSource;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.flink.util.Collector;importjava.io.Serializable;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.Statement;/**
* @program: sougoulogs
* @description: 使用Flink读Kafka数据写入MySQL
* @author: zhiyong
* @create: 2022-04-06 22:49
**/publicclassKafka2MySQL_FlinkimplementsSerializable{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// Properties prop = new Properties();// prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");// prop.setProperty("group.id","sougoulogs");//// FlinkKafkaConsumer<String> kafkaConsumer =// new FlinkKafkaConsumer<>("sougoulogs",new SimpleStringSchema(),prop);//过时方法//// DataStreamSource<String> data = env.addSource(kafkaConsumer);// SimpleStringSchema simpleStringSchema = new SimpleStringSchema();// SimpleStringSchema simpleStringSchema = new SimpleStringSchema();KafkaSource<String> kafkaSource =KafkaSource.<String>builder()//在.后指定泛型.setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092").setTopics("sougoulogs").setGroupId("sougoulogs").setStartingOffsets(OffsetsInitializer.earliest())// .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)).setValueOnlyDeserializer(newSimpleStringSchema()).build();DataStreamSource<String> source = env
.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"Kafka Source");System.out.println("构建环境完成");
source.print();SingleOutputStreamOperator<String> cleanStream = source
.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String value)throwsException{return value.split(",").length ==6;}});SingleOutputStreamOperator<Tuple2<String,Integer>> newsCountsStream = cleanStream
.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{
out.collect(Tuple2.of(value.toLowerCase().split(",")[2],1));}}).keyBy(newKeySelector<Tuple2<String,Integer>,Object>(){@OverridepublicObjectgetKey(Tuple2<String,Integer> value)throwsException{return value.f0;}}).sum(1);SingleOutputStreamOperator<Tuple2<String,Integer>> periodCountsStream = cleanStream
.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{
out.collect(newTuple2<>(value.toLowerCase().split(",")[0],1));}}).keyBy(newKeySelector<Tuple2<String,Integer>,Object>(){@OverridepublicObjectgetKey(Tuple2<String,Integer> value)throwsException{return value.f0;}}).sum(1);
newsCountsStream.print("新闻计数流数据:");
periodCountsStream.print("阶段汇总流数据:");// 落数据
newsCountsStream.addSink(newRichSinkFunction<Tuple2<String,Integer>>(){privateConnection conn =null;privateStatement stmt =null;// stream应用使用同一个连接持续写入@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);Class.forName(ConstConfig.JDBC_Driver);
conn =DriverManager.getConnection(ConstConfig.URL,ConstConfig.user,ConstConfig.password);}// 自动生成,没什么用// @Override// public void setRuntimeContext(RuntimeContext t) {// super.setRuntimeContext(t);// }// 托管写数据的方法@Overridepublicvoidinvoke(Tuple2<String,Integer> value,Context context)throwsException{try{String name = value.f0.replaceAll("[\\[\\]]","");Integer count = value.f1;String sql1 ="select 1 from newscount where name ='"+ name +"'";String sql2 ="update newscount set count ="+ count +" where name ='"+ name +"'";String sql3 ="insert into newscount(name,count) values('"+ name +"',"+ count +")";
stmt = conn.createStatement();ResultSet resultSet = stmt.executeQuery(sql1);if(resultSet.next()){
stmt.execute(sql2);}else{
stmt.execute(sql3);}}catch(Exception e){
e.printStackTrace();}}// stream应用正常情况不会关闭@Overridepublicvoidclose()throwsException{super.close();if(null!= stmt){
stmt.close();}if(null!= conn){
conn.close();}}});
periodCountsStream.addSink(newRichSinkFunction<Tuple2<String,Integer>>(){privateConnection conn =null;privateStatement stmt =null;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);Class.forName(ConstConfig.JDBC_Driver);
conn =DriverManager.getConnection(ConstConfig.URL,ConstConfig.user,ConstConfig.password);}@Overridepublicvoidinvoke(Tuple2<String,Integer> value,Context context)throwsException{super.invoke(value, context);try{String logtime = value.f0;Integer count = value.f1;String sql1 ="select 1 from periodcount where logtime ='"+ logtime +"'";String sql2 ="update periodcount set count ="+ count +" where logtime ='"+ logtime +"'";String sql3 ="insert into periodcount(logtime,count) values('"+ logtime +"',"+ count +")";
stmt = conn.createStatement();ResultSet resultSet = stmt.executeQuery(sql1);if(resultSet.next()){
stmt.execute(sql2);}else{
stmt.execute(sql3);}}catch(Exception e){
e.printStackTrace();}}@Overridepublicvoidclose()throwsException{super.close();if(null!= stmt){
stmt.close();}if(null!= conn){
conn.close();}}});
env.execute("kafka2MySQL");}}
配置类
packagecom.zhiyong.flinkDemo;importjava.io.Serializable;/**
* @program: sougoulogs
* @description: 写死的配置类
* @author: zhiyong
* @create: 2022-04-06 22:43
**/publicclassConstConfigimplementsSerializable{publicstaticfinalStringJDBC_Driver="com.mysql.cj.jdbc.Driver";publicstaticfinalString URL ="jdbc:mysql://192.168.88.100:3306/sougoulogs";publicstaticfinalString user ="root";publicstaticfinalString password ="123456";publicstaticfinalString csvPath ="E:/study/studyBigDataProj/sougouLogs/data/sougou500w_2.txt";}
该类用于集中存储配置信息。方便修改。流计算一般是一次开启,跑很久都不关,指标计算也基本是事先定好的,修改次数不多,大部分情况写死都没啥大问题。像离线批处理那样需要动态传参,做组件或者搭平台会用得到。
遇到的问题
Flink1.14全新的Kafka连接器
版本太新,还得翻源码才能知道怎么使用,各种坑。之前的老版本可能是使用:
Properties prop =newProperties();
prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");
prop.setProperty("group.id","sougoulogs");FlinkKafkaConsumer<String> kafkaConsumer =newFlinkKafkaConsumer<>("sougoulogs",newSimpleStringSchema(),prop);//过时方法DataStreamSource<String> data = env.addSource(kafkaConsumer);
不过,过时方法当然能不用就不用!!!
参照源码:
packageorg.apache.flink.connector.kafka.source;/**
* The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link
* KafkaSource}. The following example shows how to create a KafkaSource emitting records of <code>
* String</code> type.
*
* <pre>{@code
* KafkaSource<String> source = KafkaSource
* .<String>builder()
* .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
* .setGroupId("MyGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(new TestingKafkaRecordDeserializationSchema())
* .setStartingOffsets(OffsetsInitializer.earliest())
* .build();
* }</pre>
*
* <p>See {@link KafkaSourceBuilder} for more details.
*
* @param <OUT> the output type of the source.
*/publicclassKafkaSource<OUT>implementsSource<OUT,KafkaPartitionSplit,KafkaSourceEnumState>,ResultTypeQueryable<OUT>{}
还好注释有写怎么构建对象,照猫画虎抄过来魔改:
KafkaSource<String> kafkaSource =KafkaSource.<String>builder()//在.后指定泛型.setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092").setTopics("sougoulogs").setGroupId("sougoulogs").setStartingOffsets(OffsetsInitializer.earliest())// .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)).setValueOnlyDeserializer(newSimpleStringSchema()).build();
使用消费者对象时,Kafka有Exactly Once,当然是能用则用,然后就出现了报错:
Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
由于Kafka默认超时15分钟,先随便设置个10分钟防止报错:
Properties kafkaProperties =newProperties();
kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10+"");//需要<15分钟KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")//解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)..setKafkaProducerConfig(kafkaProperties)//.setTransactionalIdPrefix("").setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("sougoulogs").setValueSerializationSchema(newSimpleStringSchema()).build()).build();
data.sinkTo(kafkaSink);
MySQL字符编码
执行时会出现:
构建环境完成
00:00:01,123456,[哈哈],2,3,www.baidu.com
新闻计数流数据:>([哈哈],1)
阶段汇总流数据:>([哈哈],1)java.sql.SQLException:Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE)for operation '='
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:184)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:161)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)java.sql.SQLException:Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE)for operation '='
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:137)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:107)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
这是因为USDP自带的MySQL有Latin和UTF-8、GBK混编。当然不能为了这个小项目影响集群稳定性。
执行:
执行:
[root@zhiyong1 usdp]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor. Commands endwith;or \g.
Your MySQL connection id is171
Server version: 5.7.30 MySQL Community Server (GPL)
Copyright (c)2000,2020, Oracle and/or its affiliates.All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.Type'help;'or'\h'for help.Type'\c'to clear the current input statement.
mysql>SHOW VARIABLES LIKE'character_set_%';+--------------------------+----------------------------+| Variable_name |Value|+--------------------------+----------------------------+| character_set_client | utf8 || character_set_connection | utf8 || character_set_database | latin1 || character_set_filesystem |binary|| character_set_results | utf8 || character_set_server | latin1 || character_set_system | utf8 || character_sets_dir |/usr/share/mysql/charsets/|+--------------------------+----------------------------+8rowsinset(0.00 sec)
mysql>SHOW VARIABLES LIKE'collation_%';+----------------------+-------------------+| Variable_name |Value|+----------------------+-------------------+| collation_connection | utf8_general_ci || collation_database | latin1_swedish_ci || collation_server | latin1_swedish_ci |+----------------------+-------------------+3rowsinset(0.00 sec)
mysql>showdatabases;+--------------------+|Database|+--------------------+| information_schema || db_hive_metastore || db_hue || db_ranger || db_udp || dolphinscheduler || mysql || performance_schema || sougoulogs || sys |+--------------------+10rowsinset(0.00 sec)
mysql>use sougoulogs;
Reading table information for completion oftableandcolumn names
You can turn off this feature to get a quicker startup with-A
Database changed
mysql>
mysql>showtables;+----------------------+| Tables_in_sougoulogs |+----------------------+| newscount || periodcount |+----------------------+2rowsinset(0.00 sec)
mysql>
mysql>altertable`newscount`charset=utf8
->;
Query OK,0rows affected (0.00 sec)
Records: 0 Duplicates: 0Warnings: 0
mysql>altertable`periodcount`charset=utf8;
Query OK,0rows affected (0.00 sec)
Records: 0 Duplicates: 0Warnings: 0
mysql>altertable`newscount`converttocharacterset utf8;
Query OK,17rows affected (0.03 sec)
Records: 17 Duplicates: 0Warnings: 0
mysql>altertable`periodcount`converttocharacterset utf8;
Query OK,12rows affected (0.03 sec)
Records: 12 Duplicates: 0Warnings: 0
只需要修改这2个ADS表的编码方式即可。
Fine BI不能自动刷新
本以为Fine BI有了实时数据源就能自动刷新,结果。。。还是太天真了。参照官网指导:
本地路径:
C:\FineBI5.1\webapps\webroot\WEB-INF\lib\fine-bi-adapter-5.1.jar
fine-bi-adapter-5.1.jar\com\finebi\web\html\show.html
在Jar包该文件的尾部插入:
</body><!-- 这个位置插入 --><!-- 增加刷新功能 --><scripttype="text/javascript"src="/webroot/refresh.js"></script><!-- 这个位置插入 --></html>
在C:\FineBI5.1\webapps\webroot创建refresh.js:
setTimeout(function(){var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//对仪表板id进行判断,实现指定仪表板刷新if(a=="7fcbb0a362314ca4b015ee26f39f5a79"){setInterval(function(){BI.SharingPool.put("controlFilters",BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);},5000);//定时刷新的频率,单位ms}},2000)
刷新太快会导致占用内存和CPU比较多,GT730亮机卡凑合着能胜任。
古老的做法
当然还有很多古老的方式,也可以实现这种效果。但是迟早要随时间泯没。
以下是记录的是大势已去的历史。
Flume
采集日志可能还会用得上这货,最新版本是2018.12.18,太成(gu)熟(lao)了。。。
配置
编写一个Flume的File2Avro.properties配置文件,指定读取文件并且以Avro序列化落到另外的2个Flume节点。
[root@zhiyong2 flume]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume
[root@zhiyong2 flume]# ll
总用量 164
drwxrwxrwx. 2 hadoop hadoop 628月 102021 bin
-rwxrwxrwx. 1 hadoop hadoop 8560212月 182018 CHANGELOG
drwxrwxrwx. 2 hadoop hadoop 1974月 121:27 conf
-rwxrwxrwx. 1 hadoop hadoop 568112月 182018 DEVNOTES
-rwxrwxrwx. 1 hadoop hadoop 287312月 182018 doap_Flume.rdf
drwxrwxrwx. 2 hadoop hadoop 81928月 102021 lib
-rwxrwxrwx. 1 hadoop hadoop 4340512月 182018 LICENSE
-rwxrwxrwx. 1 hadoop hadoop 24912月 182018 NOTICE
drwxr-xr-x 3 root root 244月 1220:30 plugins.d
-rwxrwxrwx. 1 hadoop hadoop 248312月 182018 README.md
-rwxrwxrwx. 1 hadoop hadoop 195812月 182018 RELEASE-NOTES
drwxrwxrwx. 2 hadoop hadoop 63月 123:10 run
drwxrwxrwx. 2 hadoop hadoop 687月 262021 tools
[root@zhiyong2 flume]# cd conf/[root@zhiyong2 conf]# ll
总用量 28
-rwxr-xr-x 1 root root 16614月 121:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 166112月 182018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 14553月 123:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 145512月 182018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 15933月 123:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 156812月 182018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 31173月 123:10 log4j.properties
[root@zhiyong2 conf]# touch File2Avro.properties[root@zhiyong2 conf]# ll
总用量 28
-rw-r--r-- 1 root root 04月 13 00:00 File2Avro.properties
-rwxr-xr-x 1 root root 16614月 121:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 166112月 182018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 14553月 123:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 145512月 182018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 15933月 123:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 156812月 182018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 31173月 123:10 log4j.properties
[root@zhiyong2 conf]# vim File2Avro.properties#Properties文件的内容在下边[root@zhiyong2 conf]# chmod 777 ./File2Avro.properties
这个Properties文件的内容:
agent1.sources = source1
agent1.channels = fileChannel
agent1.sinkgroups = sinkgroup1
agent1.sinks = sink1 sink2
agent1.sources.source1.type = TAILDIR
# 通过JSON记录偏移量
agent1.sources.source1.positionFile = /bigdataproj/data/source/logs/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /bigdataproj/data/source/logs/sogou.log
agent1.sources.source1.channels = fileChannel
# 配置channel的检查点
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /bigdataproj/data/channel/checkpointDir
agent1.channels.fileChannel.dataDirs = /bigdataproj/data/channel/dataDirs
# 配置sink为负载均衡,轮询
agent1.sinkgroups.sinkgroup1.sinks = sink1 sink2
agent1.sinkgroups.sinkgroup1.processor.type = load_balance
agent1.sinkgroups.sinkgroup1.processor.backoff = true
agent1.sinkgroups.sinkgroup1.processor.selector = round_robin
agent1.sinkgroups.sinkgroup1.processor.selector.maxTimeOut=10000
# 以Avro序列化落到zhiyong3
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.channel = fileChannel
agent1.sinks.sink1.batchSize = 1
agent1.sinks.sink1.hostname = zhiyong3
agent1.sinks.sink1.port = 12345
# 以Avro序列化落到zhiyong4
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.channel = fileChannel
agent1.sinks.sink2.batchSize = 1
agent1.sinks.sink2.hostname = zhiyong4
agent1.sinks.sink2.port = 12345
编写一个Flume的Avro2HBaseAndKafka.properties配置文件,读取Avro并且都把数据落入HBase和Kafka。
[root@zhiyong3 conf]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume/conf
[root@zhiyong3 conf]# touch Avro2HBaseAndKafka.properties[root@zhiyong3 conf]# chmod 777 ./Avro2HBaseAndKafka.properties[root@zhiyong3 conf]# ll
总用量 24
-rwxrwxrwx 1 root root 04月 13 00:04 Avro2HBaseAndKafka.properties
-rw-rw-r--. 1 hadoop hadoop 166112月 182018 flume-conf.properties.template
-rwxr-xr-x. 1 hadoop hadoop 14553月 123:10 flume-env.ps1
-rw-rw-r--. 1 hadoop hadoop 145512月 182018 flume-env.ps1.template
-rwxr-xr-x. 1 hadoop hadoop 15933月 123:10 flume-env.sh
-rw-rw-r--. 1 hadoop hadoop 156812月 182018 flume-env.sh.template
-rwxrwxr-x. 1 hadoop hadoop 31173月 123:10 log4j.properties
[root@zhiyong3 conf]# vim Avro2HBaseAndKafka.properties#Properties的内容在下边[root@zhiyong3 conf]# scp ./Avro2HBaseAndKafka.properties root@zhiyong4:$PWD
Avro2HBaseAndKafka.properties 100% 1617466.5KB/s 00:00
[root@zhiyong3 conf]#
这个Properties文件的内容:
agent1.sources = source1
agent1.channels = HBaseChannel KafkaChannel
agent1.sinks = HBaseSink kafkaSink
# Define and configure an avro
agent1.sources.source1.type = avro
agent1.sources.source1.channels = KafkaChannel HBaseChannel
# 监听所有主机
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 12345
agent1.sources.source1.selector.type = replicating
# 配置HBase的channel
agent1.channels.HBaseChannel.type = memory
agent1.channels.HBaseChannel.capacity = 10000
agent1.channels.HBaseChannel.transactionCapacity = 10000
agent1.channels.HBaseChannel.keep-alive = 5
# Define and configure a sink
agent1.sinks.HBaseSink.type = asynchbase
agent1.sinks.HBaseSink.channel = HBaseChannel
agent1.sinks.HBaseSink.table = sougoulogs
agent1.sinks.HBaseSink.serializer = org.apache.flume.sink.hbase.SougoulogsAsyncHbaseEventSerializer
agent1.sinks.HBaseSink.zookeeperQuorum = zhiyong2:2181,zhiyong3:2181,zhiyong4:2181
agent1.sinks.HBaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
agent1.sinks.HBaseSink.znodeParent = /hbase
agent1.sinks.HBaseSink.columnFamily = info
# 配置Kafka的channel
agent1.channels.KafkaChannel.type = memory
agent1.channels.KafkaChannel.capacity = 10000
agent1.channels.KafkaChannel.transactionCapacity = 10000
agent1.channels.KafkaChannel.keep-alive = 5
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = sougoulogs
agent1.sinks.kafkaSink.brokerList = zhiyong2:9092,zhiyong3:9092,zhiyong4:9092
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.kafkaSink.producer.acks = 1
agent1.sinks.kafkaSink.producer.requiredAcks = 1
agent1.sinks.kafkaSink.channel = KafkaChannel
补上就可以成功跑起来:
[root@zhiyong3 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:18:08 INFO source.AvroSource: Avro source source1 started.
同样的方式启动另一个Flume:
[root@zhiyong3 ~]# ssh zhiyong4
Last login: Tue Apr 1220:39:23 2022 from zhiyong3
[root@zhiyong4 ~]# date2022年 04月 13日 星期三 00:11:17 CST
[root@zhiyong4 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:19:58 INFO source.AvroSource: Avro source source1 started.
之后启动采集数据的Flume:
[root@zhiyong2 ~]# mkdir -p /bigdataproj/data/source/logs[root@zhiyong2 ~]# cd /bigdataproj/data/source/logs[root@zhiyong2 logs]# touch sogou.log[root@zhiyong2 logs]# chmod 777 ./sogou.log[root@zhiyong2 logs]# cd[root@zhiyong2 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/File2Avro.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1: Building RpcClient with hostname: zhiyong3, port: 123452022-04-13 00:25:59 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2022-04-13 00:25:59 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1 started.
Flink直接写HBase貌似比二开Flume还简单。Flume写HBase常用于HBase1,Flume1.9才能二开HBase2,HBase3都出现很久了。就让前浪拍x在沙滩上。采集日志Tail到到Kafka的功能还比较实用。
Spark
批处理用的挺多,2018年还有人用Streaming做简单的micro-batch处理,不过这都4年过去了。。。Structured Streaming又只能保证at least once,做不到Exactly Once。。。单独批处理当然还是优先Spark。
pom依赖
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>sougoulogs</artifactId><groupId>com.zhiyong</groupId><version>1.0.0</version></parent><modelVersion>4.0.0</modelVersion><artifactId>sparkDemo</artifactId><!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url></repository></repositories><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.version>2.12.12</scala.version><scala.binary.version>2.12</scala.binary.version><spark.version>3.0.1</spark.version><encoding>UTF-8</encoding></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- 添加spark依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- 可以使用Lombok的@注解--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.20</version></dependency><!-- MySQL驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 可以设置jar包的入口类(可选) --><!--<mainClass>com.aa.flink.StreamWordCount</mainClass>--></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>
配置类
packagecom.zhiyong.sparkDemo// 该类存储配置
object Constants{
val url ="jdbc:mysql://192.168.88.100/sougoulogs"
val userName ="root"
val passWord ="123456"
val kafkaServer ="zhiyong2:9092,zhiyong3:9092,zhiyong4:9092"
val groupId ="sougoulogs"
val offset ="earliest"
val topic ="sougoulogs"
val mysql8driver ="com.mysql.cj.jdbc.Driver"}
定义连接池
packagecom.zhiyong.sparkDemo;importjava.io.Serializable;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.SQLException;importjava.util.LinkedList;/**
* @program: sougoulogs
* @description: MySQL的连接池
* @author: zhiyong
* @create: 2022-04-06 19:46
**/publicclassMySQLPoolimplementsSerializable{privateString url ="";privateString user ="";privateString password ="";privateint maxNum =5;// 最大同时使用的连接数privateint currentNum =0;//当前产生的连接数privateLinkedList<Connection> connections =newLinkedList<>();//可用连接/**
* 传参用的构造方法
* @param url JDBC连接串
* @param user 用户名
* @param password 密码
*/publicvoidinit(String url,String user,String password){this.url=url;this.user=user;this.password=password;}/**
* 获取连接池的连接对象
* @return 返回连接对象
* @throws SQLException 抛异常
*/publicsynchronizedConnectiongetConn()throwsException{if(connections.isEmpty()){addDrive();Connection connection =DriverManager.getConnection(url, user, password);
connections.push(connection);//入栈
currentNum++;}return connections.poll();//弹栈}/**
* 加载驱动
*/privatevoidaddDrive()throwsException{if(maxNum>currentNum&&(!connections.isEmpty())){Thread.sleep(1000);//等待获取连接addDrive();}else{Class.forName(Constants.mysql8driver());//加载驱动}}/**
* 清除连接
* @param conn 传入连接对象
*/privatevoidclearConn(Connection conn){
connections.push(conn);}}
由于Spark的源码为Scala,实际上可以实现Java与Scala混编。
Streaming方式
packagecom.zhiyong.sparkDemoimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimportorg.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimportorg.apache.spark.streaming.kafka010.KafkaUtilsimportorg.apache.spark.streaming.{Seconds,StreamingContext}importjava.sql.{Connection,DriverManager,Statement}// 使用SparkStreaming:读取Kafka数据,处理后落入MySQL
object Kafka2MySQL_Streaming{// 统计新闻浏览量
def newsCount(records:Iterator[(String,Int)]):Unit={var conn:Connection=null//声明SQL连接var statement:Statement=null//生命SQL执行对象try{
conn =DriverManager.getConnection(Constants.url,Constants.userName,Constants.passWord)//获取连接对象
records.foreach(data =>{
val name = data._1.replaceAll("[\\[\\]]","")
val count = data._2
val sql1 ="select 1 from newscount where name = '"+ name +"'"//用于判断是否已经有这条数据
val sql2 ="update newscount set count = "+ count +" where name = '"+ name +"'"
val sql3 ="insert into newscount(name,count) values('"+ name +"',"+ count +")"
statement = conn.createStatement()
val resultSet = statement.executeQuery(sql1)if(resultSet.next()){
statement.executeUpdate(sql2)//有数据就更新}else{
statement.execute(sql3)//没有数据就插入}})}catch{case e:Exception=> e.printStackTrace()}finally{//最后要关流if(null!= statement){
statement.close()}if(null!= conn){
conn.close()}}}// 统计时段浏览量
def periodCount(records:Iterator[(String,Int)]):Unit={var conn:Connection=nullvar statement:Statement=nulltry{
conn =DriverManager.getConnection(Constants.url,Constants.userName,Constants.passWord)
records.foreach(data =>{
val logtime = data._1
val count = data._2
val sql1 ="select 1 from periodcount where logtime = '"+ logtime +"'"
val sql2 ="update periodcount set count = "+ count +" where logtime='"+ logtime +"'"
val sql3 ="insert into periodcount(logtime,count) values('"+ logtime +"',"+ count +")"
statement = conn.createStatement()
val resultSet = statement.executeQuery(sql1)if(resultSet.next()){
statement.execute(sql2)}else{
statement.execute(sql3)}})}catch{case e:Exception=> e.printStackTrace()}finally{if(null!= statement){
statement.close()}if(null!= conn){
conn.close()}}}// 主方法
def main(args:Array[String]):Unit={print("start:Kafka2MySQL_Streaming")
val sparkConf =newSparkConf().setAppName("sougoulogs_Streaming").setMaster("local[2]")
val ssc =newStreamingContext(sparkConf,Seconds(1))// 每秒做一次micro-batch// 统一配置Kafka的参数
val kafkaParams =Map[String,Object]{"bootstrap.servers"->Constants.kafkaServer
"key.deserializer"-> classOf[StringDeserializer]"value.deserializer"-> classOf[StringDeserializer]"group.id"->Constants.groupId
"auto.offset.reset"->Constants.offset
"enable.auto.commit"->java.lang.Boolean.TRUE
}
val topics =Array(Constants.topic)
val stream =KafkaUtils.createDirectStream[String,String](
ssc,PreferConsistent,Subscribe[String,String](topics, kafkaParams))// val lines = stream.map(record => record.value)// 去除脏数据
val cleanStream = stream.map(_.value()).map(_.split(",")).filter(_.length ==6)// 获取新闻浏览量指标的流
val newsCountDStream = cleanStream
.map(x =>(x(2),1)).reduceByKey(_ + _)// 获取时段浏览量的流
val periodCountDStream = cleanStream
.map(x =>(x(0),1)).reduceByKey(_ + _)// 处理流
newsCountDStream.foreachRDD(rdd =>{print("每个分区都执行读取Kafka数据计算新闻话题浏览量并将结果写入MySQL")
rdd.foreachPartition(newsCount)})
periodCountDStream.foreachRDD(rdd =>{print("每个分区都执行读取Kafka数据计算时段浏览量并将结果写入MySQL")
rdd.foreachPartition(periodCount)})//启动流
ssc.start()// Spark的Streaming(DStream)必须有这句,否则会报错
ssc.awaitTermination()}}
Structured Streaming方式
packagecom.zhiyong.sparkDemoimportorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.streaming.{OutputMode,Trigger}// 使用SparkStructuredStreaming:读取Kafka数据,处理后写入MySQL
object Kafka2MySQL_StructuredStreaming{//// 主方法
def main(args:Array[String]):Unit={print("start:Kafka2MySQL_StructuredStreaming")
val spark =SparkSession.builder().appName("").master("local[2]").getOrCreate()importspark.implicits._//导入隐式推测防止报错
val cleanSs = spark.readStream
.format("kafka").option("kafka.bootstrap.servers",Constants.kafkaServer).option("subscribe",Constants.topic).load().selectExpr("cast(key as string)","cast(value as string)")//Java版Spark可以传String[].as[(String,String)].map(_._2).map(_.split(",")).filter(6== _.length)
val newsCountsSs = cleanSs.map(_ (2)).groupBy("value").count().toDF("name","count")
val periodCountsSs = cleanSs.map(_ (0)).groupBy("value").count().toDF("logtime","count")
val sink1 =newSink1(Constants.url,Constants.userName,Constants.passWord)
val sink2 =newSink2(Constants.url,Constants.userName,Constants.passWord)
val query1 = newsCountsSs.writeStream
.foreach(sink1).outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("1 seconds")).start()
val query2 = periodCountsSs.writeStream
.foreach(sink2).outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("1 seconds")).start()
query1.awaitTermination()
query2.awaitTermination()}}
对应的2个Sink:
packagecom.zhiyong.sparkDemoimportorg.apache.spark.sql.{ForeachWriter,Row}importjava.sql.{Connection,ResultSet,Statement}/**
* 自定义Sink,将数据写入MySQL
*
* @param url 连接串
* @param user 用户名
* @param password 密码
*/classSink1(url:String, user:String, password:String)extendsForeachWriter[Row]{var statement:Statement= _
var resultset:ResultSet= _
var conn:Connection= _
/**
* 初始化连接,从连接池获取连接对象
*
* @param partitionId
* @param epochId
* @return
*/
override def open(partitionId:Long, epochId:Long):Boolean={Class.forName(Constants.mysql8driver)
val pool =newMySQLPool()
pool.init(url, user, password)
conn = pool.getConn
statement = conn.createStatement()true}
override def process(value:Row):Unit={
val name = value.getAs[String]("name").replace("[\\[\\]]","")
val count = value.getAs[Long]("count").asInstanceOf[Int]
val sql1 ="select 1 from newscount where name = '"+ name +"'"
val sql2 ="insert into newscount(name,count) values('"+ name +"',"+ count +")"
val sql3 ="update newscount set count = "+ count +" where name = '"+ name +"'"try{
resultset = statement.executeQuery(sql1)if(resultset.next()){
statement.execute(sql3)}else{
statement.execute(sql2)}}catch{case e:Exception=>print("出现异常“"+ e.getMessage)}}
override def close(errorOrNull:Throwable):Unit={if(null!= statement){
statement.close()}if(null!= conn){
conn.close()}}}
另一个半差不差:
packagecom.zhiyong.sparkDemoimportorg.apache.spark.sql.{ForeachWriter,Row}importjava.sql.{Connection,ResultSet,Statement}classSink2(url:String, user:String, password:String)extendsForeachWriter[Row]{var statement:Statement= _
var resultset:ResultSet= _
var conn:Connection= _
/**
* 初始化连接,从连接池获取连接对象
*
* @param partitionId
* @param epochId
* @return
*/
override def open(partitionId:Long, epochId:Long):Boolean={Class.forName(Constants.mysql8driver)
val pool =newMySQLPool()
pool.init(url, user, password)
conn = pool.getConn
statement = conn.createStatement()true}
override def process(value:Row):Unit={
val logtime = value.getAs[String]("logtime")
val count = value.getAs[Long]("count").asInstanceOf[Int]
val sql1 ="select 1 from periodcount where logtime = '"+ logtime +"'"
val sql2 ="insert into periodcount(logtime,count) values('"+ logtime +"',"+ count +")"
val sql3 ="update periodcount set count = "+ count +" where logtime = '"+ logtime +"'"try{
resultset = statement.executeQuery(sql1)if(resultset.next()){
statement.execute(sql3)}else{
statement.execute(sql2)}}catch{case e:Exception=>print("出现异常“"+ e.getMessage)}}
override def close(errorOrNull:Throwable):Unit={if(null!= statement){
statement.close()}if(null!= conn){
conn.close()}}}
追加数据的方法
packagecom.zhiyong.sparkDemo;importjava.io.*;/**
* @program: sougoulogs
* @description: 模拟数据生成
* @author: zhiyong
* @create: 2022-04-04 23:43
**/publicclassMockData{// 主方法publicstaticvoidmain(String[] args){String inputPath = args[0];String outputPath = args[1];String interval = args[2];System.out.println("读数路径:"+ interval);System.out.println("写log路径“"+ outputPath);System.out.println("产生数据间隔:"+ interval +"ms");try{mock(inputPath, outputPath, interval);}catch(Exception e){
e.printStackTrace();}}/**
* 模拟生成数据
*
* @param inputPath 读取文件的路径
* @param outputPath 写入数据的路径
* @param interval mock数据的时间间隔
*/privatestaticvoidmock(String inputPath,String outputPath,String interval)throwsIOException{FileInputStream fis =null;InputStreamReader isr =null;BufferedReader br =null;String line =null;try{int counter =1;//记录行号
fis =newFileInputStream(inputPath);
isr =newInputStreamReader(fis,"GBK");
br =newBufferedReader(isr);while(null!=(line = br.readLine())){System.out.println("产生第"+ counter +"条数据:"+ line);wrieData(outputPath, line);
counter++;Thread.sleep(Long.parseLong(interval));}}catch(Exception e){
e.printStackTrace();}finally{// 最后要关流,防止文件持续占用if(null!= br){
br.close();}if(null!= isr){
isr.close();}if(null!= fis){
fis.close();}}}/**
* 写数据
*
* @param outputPath 输出路径
* @param line 每行数据
*/privatestaticvoidwrieData(String outputPath,String line)throwsIOException{FileOutputStream fos =null;OutputStreamWriter osr =null;BufferedWriter bw =null;try{
fos =newFileOutputStream(outputPath,true);//允许追加
osr =newOutputStreamWriter(fos);
bw =newBufferedWriter(osr);
bw.write(line);
bw.write("\n");}catch(Exception e){
e.printStackTrace();}finally{if(null!=bw){
bw.close();}if(null!=osr){
osr.close();}if(null!=fos){
fos.close();}}}}
为了使用Flume的Tail采集数据,需要写个自动的方法来替代手工echo并重定向到文件。
其实在Flink里使用各种富函数,重新定义方法就很方便,不需要这么麻烦。Flume、Sqoop这类工具,可能只是方便不会写Java的SQL Boy们吧。不过这年头已经有阿里云DataPhin,华为云FusionInsight之类的成熟产品,还有kyuubi这类可以直接配置就能跑数据集成的开源组件,大数据的门槛属实低了好多。
版权归原作者 虎鲸不是鱼 所有, 如有侵权,请联系我们删除。