0


使用Flink1.14.3与Kafka、Fine BI练习搜狗日志实时BI小项目

前言

概述

这年头IT发展很快,稍不留神,Flink已经1.14.4了,Fine BI居然能做实时BI了。。。遂拿经典的Sougoulogs小项目练练手,体验下一步一个坑的快感。

得益于Flink1.14实现了API层面真正的流批一体,批处理也可以用流的方式实现,Kappa架构运维起来还是要比流批分离的Lambda架构容易很多。当然也有软件厂将原有的Spark与Streaming任务切换为离线跑Hive(当然是跑Map Reduce,都不会跑Tez),好腾出一些内存条给Flink集群跑实时流计算。各有各的想法。

本小项目主要是从csv读取Log,sink到MQ Kafka中,模拟数据的采集生成部分。之后从MQ读取数据,处理后sink到MySQL。最终使用Fine BI制作实时BI实现数据可视化。适合像笔者这样用过老版Flink,会SQL,又不咋会Echarts的学徒工学习。

效果预览

B站直达。

CDSN视频。

使用Flink+Kafka+FineBI练手搜狗日志BI小项目

组件版本

使用之前搭建好的USDP2.0集群。
Flink:手动更新的1.14.3;
Kafka:自带的2.0.1;还有自带的KafkaEagle;
FineBI:5.1个人免费版。

准备工作

Kafka建Topic

[root@zhiyong2 lib]# kafka-topics.sh --zookeeper zhiyong2:2181 --delete --topic sougoulogs
Topic sougoulogs is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --create --topic sougoulogs --replication-factor 3 --partitions 3
Created topic "sougoulogs".[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list
sougoulogs

使用Kafka

Kafka是实时MQ使用最多的组件,还有个Pulsar最近也流行起来了。对广大学徒工来说,有GUI还是要友好很多。

启动Kafka及KafkaEagle

USDP的Web UI直接启动即可,灰常方便。还可以打开可视化类的KafkaEagle:

http://zhiyong4:8048/
admin,123456

测试Kafka功能

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1nyBX1sT-1649866898000)(E:\study\studyBigDataProj\sougouLogs\md\搜狗日志项目.assets\image-20220412205215698.png)]
可以直接在KafkaEagle的Mock中选取sougoulogs这个topic,然后造数据:

00:00:01123456[哈哈\]23 www.baidu.com

Send之后还可以在KSQL中写SQL看到数据:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Hl62Q07O-1649866898002)(E:\study\studyBigDataProj\sougouLogs\md\搜狗日志项目.assets\image-20220412205358651.png)]
在框子里敲:

select*from sougoulogs where`partition`in(0,1,2)limit5

当然也可以用命令行开启个Consumer查看数据:

[root@zhiyong2 ~]# kafka-console-consumer.sh --bootstrap-server zhiyong2:9092 --topic sougoulogs
00:00:01 123456[哈哈\]23 www.baidu.com

这年头大数据组件都SQL化了,实在是便宜了那些SQL Boy,只会SQL就能胜任数仓相关的岗位。主要还是数据源比较规整,都是结构化数据。处理半结构化、非结构化数据,或者搭建平台,只会SQL,技能栈就捉襟见肘了。

MySQL建库表

MySQL建的库表用于将运算后的数据展示到BI。

createdatabase sougoulogs;use sougoulogs;createtableifnotexists newscount (
    name varchar(50)notnull,
    count int(11)notnull);createtableifnotexists periodcount(
    logtime varchar(50)notnull,
    count int(11)notnull);

实时BI

总共3个组件,只用到2个表和3条SQL:

SELECT`name`,`count`FROM newscount ORDERby`count`descLIMIT10--newscountselectcount(1)from newscount --countsselect`logtime`,`count`from periodcount orderby logtime desclimit10--periodcount

新版本Fine BI有点像Tableau,对广大学徒工很友好,官网也有详细教程,不赘述。大概流程:
添加连接→添加实时数据源→添加数据集→配置组件绘制Dashboard。

哪怕不会写Spring和Echarts也不影响展示数据。

编写Flink程序

pom依赖

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>sougoulogs</artifactId><groupId>com.zhiyong</groupId><version>1.0.0</version></parent><modelVersion>4.0.0</modelVersion><!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url></repository></repositories><artifactId>flinkDemo</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.14.3</flink.version><scala.version>2.12.14</scala.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink操作hadoop--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-filesystems</artifactId><version>${flink.version}</version><type>pom</type></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version><!--            <version>1.14.4</version>--><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><!--        可以使用Lombok的@注解--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.20</version></dependency><!--        MySQL驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 可以设置jar包的入口类(可选) --><!--<mainClass>com.aa.flink.StreamWordCount</mainClass>--></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

Mock数据

packagecom.zhiyong.flinkDemo;importorg.apache.commons.lang3.StringUtils;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;importjava.io.BufferedReader;importjava.io.FileInputStream;importjava.io.InputStreamReader;/**
 * @program: sougoulogs
 * @description: 模拟生成数据的方法
 * @author: zhiyong
 * @create: 2022-04-13 18:01
 **/publicclassMockData{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<String> data = env.addSource(newCsvSource());

        data.print("自定义数据源产生的数据:");

        env.execute("模拟数据");}staticclassCsvSourceextendsRichSourceFunction<String>{FileInputStream fis =null;InputStreamReader isr =null;BufferedReader br =null;String line =null;String csvPath =ConstConfig.csvPath;String lastStr ="";//上一条数据boolean needRun =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{while(needRun !=false){while(null!=(line = br.readLine())){//System.out.println("读取到数据" + line);
                    ctx.collect(line);if(StringUtils.isNoneBlank(lastStr)&& line.split(",")[0]!= lastStr.split(",")[0]){Thread.sleep((Long.parseLong(line.split(",")[0])-Long.parseLong(lastStr.split(",")[0]))*1000);}

                    lastStr = line;}}}@Overridepublicvoidcancel(){
            needRun =false;}@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
            fis =newFileInputStream(csvPath);
            isr =newInputStreamReader(fis,"UTF-8");
            br =newBufferedReader(isr);}@Overridepublicvoidclose()throwsException{super.close();{// 最后要关流,防止文件持续占用if(null!= br){
                    br.close();}if(null!= isr){
                    isr.close();}if(null!= fis){
                    fis.close();}}}}}

自定义数据源的目的是防止Flink按照批处理的模式,将CSV一次性全部读完,会自动退出,看不出流的过程。同时限速也能防止开发时出现内存溢出等资源不够的问题。

Sink数据到Kafka

packagecom.zhiyong.flinkDemo;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.connector.base.DeliveryGuarantee;importorg.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;importorg.apache.flink.connector.kafka.sink.KafkaSink;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importjava.util.Properties;/**
 * @program: sougoulogs
 * @description: 从Log读数据写入Kafka
 * @author: zhiyong
 * @create: 2022-04-13 12:02
 **/publicclassFile2Kafka{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);String inputPath ="E:/study/flink/data/test1";//DataStreamSource<String> data = env.readTextFile(inputPath);//实际为批处理,跑完数据直接结束,太快DataStreamSource<String> data = env.addSource(newMockData.CsvSource());// 构建Kafka生产者Properties kafkaProperties =newProperties();
        kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10+"");//需要<15分钟KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")//解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)..setKafkaProducerConfig(kafkaProperties)//.setTransactionalIdPrefix("").setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("sougoulogs").setValueSerializationSchema(newSimpleStringSchema()).build()).build();

        data.sinkTo(kafkaSink);

        env.execute("File2Kafka");}}

Sink数据到MySQL

packagecom.zhiyong.flinkDemo;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.connector.kafka.source.KafkaSource;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.flink.util.Collector;importjava.io.Serializable;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.Statement;/**
 * @program: sougoulogs
 * @description: 使用Flink读Kafka数据写入MySQL
 * @author: zhiyong
 * @create: 2022-04-06 22:49
 **/publicclassKafka2MySQL_FlinkimplementsSerializable{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);//        Properties prop = new Properties();//        prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");//        prop.setProperty("group.id","sougoulogs");////        FlinkKafkaConsumer<String> kafkaConsumer =//                new FlinkKafkaConsumer<>("sougoulogs",new SimpleStringSchema(),prop);//过时方法////        DataStreamSource<String> data = env.addSource(kafkaConsumer);//        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();//        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();KafkaSource<String> kafkaSource =KafkaSource.<String>builder()//在.后指定泛型.setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092").setTopics("sougoulogs").setGroupId("sougoulogs").setStartingOffsets(OffsetsInitializer.earliest())//                .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)).setValueOnlyDeserializer(newSimpleStringSchema()).build();DataStreamSource<String> source = env
                .fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"Kafka Source");System.out.println("构建环境完成");

        source.print();SingleOutputStreamOperator<String> cleanStream = source
                .filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String value)throwsException{return value.split(",").length ==6;}});SingleOutputStreamOperator<Tuple2<String,Integer>> newsCountsStream = cleanStream
                .flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{
                        out.collect(Tuple2.of(value.toLowerCase().split(",")[2],1));}}).keyBy(newKeySelector<Tuple2<String,Integer>,Object>(){@OverridepublicObjectgetKey(Tuple2<String,Integer> value)throwsException{return value.f0;}}).sum(1);SingleOutputStreamOperator<Tuple2<String,Integer>> periodCountsStream = cleanStream
                .flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{
                        out.collect(newTuple2<>(value.toLowerCase().split(",")[0],1));}}).keyBy(newKeySelector<Tuple2<String,Integer>,Object>(){@OverridepublicObjectgetKey(Tuple2<String,Integer> value)throwsException{return value.f0;}}).sum(1);

        newsCountsStream.print("新闻计数流数据:");
        periodCountsStream.print("阶段汇总流数据:");// 落数据
        newsCountsStream.addSink(newRichSinkFunction<Tuple2<String,Integer>>(){privateConnection conn =null;privateStatement stmt =null;// stream应用使用同一个连接持续写入@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);Class.forName(ConstConfig.JDBC_Driver);
                conn =DriverManager.getConnection(ConstConfig.URL,ConstConfig.user,ConstConfig.password);}// 自动生成,没什么用//            @Override//            public void setRuntimeContext(RuntimeContext t) {//                super.setRuntimeContext(t);//            }// 托管写数据的方法@Overridepublicvoidinvoke(Tuple2<String,Integer> value,Context context)throwsException{try{String name = value.f0.replaceAll("[\\[\\]]","");Integer count = value.f1;String sql1 ="select 1 from newscount where name ='"+ name +"'";String sql2 ="update newscount set count ="+ count +" where name ='"+ name +"'";String sql3 ="insert into newscount(name,count) values('"+ name +"',"+ count +")";

                    stmt = conn.createStatement();ResultSet resultSet = stmt.executeQuery(sql1);if(resultSet.next()){
                        stmt.execute(sql2);}else{
                        stmt.execute(sql3);}}catch(Exception e){
                    e.printStackTrace();}}// stream应用正常情况不会关闭@Overridepublicvoidclose()throwsException{super.close();if(null!= stmt){
                    stmt.close();}if(null!= conn){
                    conn.close();}}});

        periodCountsStream.addSink(newRichSinkFunction<Tuple2<String,Integer>>(){privateConnection conn =null;privateStatement stmt =null;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);Class.forName(ConstConfig.JDBC_Driver);
                conn =DriverManager.getConnection(ConstConfig.URL,ConstConfig.user,ConstConfig.password);}@Overridepublicvoidinvoke(Tuple2<String,Integer> value,Context context)throwsException{super.invoke(value, context);try{String logtime = value.f0;Integer count = value.f1;String sql1 ="select 1 from periodcount where logtime ='"+ logtime +"'";String sql2 ="update periodcount set count ="+ count +" where logtime ='"+ logtime +"'";String sql3 ="insert into periodcount(logtime,count) values('"+ logtime +"',"+ count +")";

                    stmt = conn.createStatement();ResultSet resultSet = stmt.executeQuery(sql1);if(resultSet.next()){
                        stmt.execute(sql2);}else{
                        stmt.execute(sql3);}}catch(Exception e){
                    e.printStackTrace();}}@Overridepublicvoidclose()throwsException{super.close();if(null!= stmt){
                    stmt.close();}if(null!= conn){
                    conn.close();}}});

        env.execute("kafka2MySQL");}}

配置类

packagecom.zhiyong.flinkDemo;importjava.io.Serializable;/**
 * @program: sougoulogs
 * @description: 写死的配置类
 * @author: zhiyong
 * @create: 2022-04-06 22:43
 **/publicclassConstConfigimplementsSerializable{publicstaticfinalStringJDBC_Driver="com.mysql.cj.jdbc.Driver";publicstaticfinalString URL ="jdbc:mysql://192.168.88.100:3306/sougoulogs";publicstaticfinalString user ="root";publicstaticfinalString password ="123456";publicstaticfinalString csvPath ="E:/study/studyBigDataProj/sougouLogs/data/sougou500w_2.txt";}

该类用于集中存储配置信息。方便修改。流计算一般是一次开启,跑很久都不关,指标计算也基本是事先定好的,修改次数不多,大部分情况写死都没啥大问题。像离线批处理那样需要动态传参,做组件或者搭平台会用得到。

遇到的问题

Flink1.14全新的Kafka连接器

版本太新,还得翻源码才能知道怎么使用,各种坑。之前的老版本可能是使用:

Properties prop =newProperties();
        prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");
        prop.setProperty("group.id","sougoulogs");FlinkKafkaConsumer<String> kafkaConsumer =newFlinkKafkaConsumer<>("sougoulogs",newSimpleStringSchema(),prop);//过时方法DataStreamSource<String> data = env.addSource(kafkaConsumer);

不过,过时方法当然能不用就不用!!!

参照源码:

packageorg.apache.flink.connector.kafka.source;/**
 * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link
 * KafkaSource}. The following example shows how to create a KafkaSource emitting records of <code>
 * String</code> type.
 *
 * <pre>{@code
 * KafkaSource<String> source = KafkaSource
 *     .<String>builder()
 *     .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
 *     .setGroupId("MyGroup")
 *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
 *     .setDeserializer(new TestingKafkaRecordDeserializationSchema())
 *     .setStartingOffsets(OffsetsInitializer.earliest())
 *     .build();
 * }</pre>
 *
 * <p>See {@link KafkaSourceBuilder} for more details.
 *
 * @param <OUT> the output type of the source.
 */publicclassKafkaSource<OUT>implementsSource<OUT,KafkaPartitionSplit,KafkaSourceEnumState>,ResultTypeQueryable<OUT>{}

还好注释有写怎么构建对象,照猫画虎抄过来魔改:

KafkaSource<String> kafkaSource =KafkaSource.<String>builder()//在.后指定泛型.setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092").setTopics("sougoulogs").setGroupId("sougoulogs").setStartingOffsets(OffsetsInitializer.earliest())//                .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)).setValueOnlyDeserializer(newSimpleStringSchema()).build();

使用消费者对象时,Kafka有Exactly Once,当然是能用则用,然后就出现了报错:
Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
由于Kafka默认超时15分钟,先随便设置个10分钟防止报错:

Properties kafkaProperties =newProperties();
        kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10+"");//需要<15分钟KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")//解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)..setKafkaProducerConfig(kafkaProperties)//.setTransactionalIdPrefix("").setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("sougoulogs").setValueSerializationSchema(newSimpleStringSchema()).build()).build();

        data.sinkTo(kafkaSink);

MySQL字符编码

执行时会出现:

构建环境完成
00:00:01,123456,[哈哈],2,3,www.baidu.com
新闻计数流数据:>([哈哈],1)
阶段汇总流数据:>([哈哈],1)java.sql.SQLException:Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE)for operation '='
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
    at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
    at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:184)
    at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:161)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750)java.sql.SQLException:Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE)for operation '='
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
    at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
    at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:137)
    at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:107)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750)

这是因为USDP自带的MySQL有Latin和UTF-8、GBK混编。当然不能为了这个小项目影响集群稳定性。
执行:
执行:

[root@zhiyong1 usdp]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor.  Commands endwith;or \g.
Your MySQL connection id is171
Server version: 5.7.30 MySQL Community Server (GPL)

Copyright (c)2000,2020, Oracle and/or its affiliates.All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.Type'help;'or'\h'for help.Type'\c'to clear the current input statement.

mysql>SHOW VARIABLES LIKE'character_set_%';+--------------------------+----------------------------+| Variable_name            |Value|+--------------------------+----------------------------+| character_set_client     | utf8                       || character_set_connection | utf8                       || character_set_database   | latin1                     || character_set_filesystem |binary|| character_set_results    | utf8                       || character_set_server     | latin1                     || character_set_system     | utf8                       || character_sets_dir       |/usr/share/mysql/charsets/|+--------------------------+----------------------------+8rowsinset(0.00 sec)

mysql>SHOW VARIABLES LIKE'collation_%';+----------------------+-------------------+| Variable_name        |Value|+----------------------+-------------------+| collation_connection | utf8_general_ci   || collation_database   | latin1_swedish_ci || collation_server     | latin1_swedish_ci |+----------------------+-------------------+3rowsinset(0.00 sec)

mysql>showdatabases;+--------------------+|Database|+--------------------+| information_schema || db_hive_metastore  || db_hue             || db_ranger          || db_udp             || dolphinscheduler   || mysql              || performance_schema || sougoulogs         || sys                |+--------------------+10rowsinset(0.00 sec)

mysql>use sougoulogs;
Reading table information for completion oftableandcolumn names
You can turn off this feature to get a quicker startup with-A

Database changed
mysql>
mysql>showtables;+----------------------+| Tables_in_sougoulogs |+----------------------+| newscount            || periodcount          |+----------------------+2rowsinset(0.00 sec)

mysql>
mysql>altertable`newscount`charset=utf8
    ->;
Query OK,0rows affected (0.00 sec)
Records: 0  Duplicates: 0Warnings: 0

mysql>altertable`periodcount`charset=utf8;
Query OK,0rows affected (0.00 sec)
Records: 0  Duplicates: 0Warnings: 0

mysql>altertable`newscount`converttocharacterset utf8;
Query OK,17rows affected (0.03 sec)
Records: 17  Duplicates: 0Warnings: 0

mysql>altertable`periodcount`converttocharacterset utf8;
Query OK,12rows affected (0.03 sec)
Records: 12  Duplicates: 0Warnings: 0

只需要修改这2个ADS表的编码方式即可。

Fine BI不能自动刷新

本以为Fine BI有了实时数据源就能自动刷新,结果。。。还是太天真了。参照官网指导:
本地路径:

C:\FineBI5.1\webapps\webroot\WEB-INF\lib\fine-bi-adapter-5.1.jar
fine-bi-adapter-5.1.jar\com\finebi\web\html\show.html

在Jar包该文件的尾部插入:

</body><!-- 这个位置插入 --><!-- 增加刷新功能 --><scripttype="text/javascript"src="/webroot/refresh.js"></script><!-- 这个位置插入 --></html>

在C:\FineBI5.1\webapps\webroot创建refresh.js:

setTimeout(function(){var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//对仪表板id进行判断,实现指定仪表板刷新if(a=="7fcbb0a362314ca4b015ee26f39f5a79"){setInterval(function(){BI.SharingPool.put("controlFilters",BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);},5000);//定时刷新的频率,单位ms}},2000)

刷新太快会导致占用内存和CPU比较多,GT730亮机卡凑合着能胜任。

古老的做法

当然还有很多古老的方式,也可以实现这种效果。但是迟早要随时间泯没。

以下是记录的是大势已去的历史。

Flume

采集日志可能还会用得上这货,最新版本是2018.12.18,太成(gu)熟(lao)了。。。

配置

编写一个Flume的File2Avro.properties配置文件,指定读取文件并且以Avro序列化落到另外的2个Flume节点。

[root@zhiyong2 flume]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume
[root@zhiyong2 flume]# ll
总用量 164
drwxrwxrwx. 2 hadoop hadoop    628月  102021 bin
-rwxrwxrwx. 1 hadoop hadoop 8560212月 182018 CHANGELOG
drwxrwxrwx. 2 hadoop hadoop   1974月   121:27 conf
-rwxrwxrwx. 1 hadoop hadoop  568112月 182018 DEVNOTES
-rwxrwxrwx. 1 hadoop hadoop  287312月 182018 doap_Flume.rdf
drwxrwxrwx. 2 hadoop hadoop  81928月  102021 lib
-rwxrwxrwx. 1 hadoop hadoop 4340512月 182018 LICENSE
-rwxrwxrwx. 1 hadoop hadoop   24912月 182018 NOTICE
drwxr-xr-x  3 root   root      244月  1220:30 plugins.d
-rwxrwxrwx. 1 hadoop hadoop  248312月 182018 README.md
-rwxrwxrwx. 1 hadoop hadoop  195812月 182018 RELEASE-NOTES
drwxrwxrwx. 2 hadoop hadoop     63月   123:10 run
drwxrwxrwx. 2 hadoop hadoop    687月  262021 tools
[root@zhiyong2 flume]# cd conf/[root@zhiyong2 conf]# ll
总用量 28
-rwxr-xr-x  1 root   root   16614月   121:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 166112月 182018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 14553月   123:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 145512月 182018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 15933月   123:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 156812月 182018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 31173月   123:10 log4j.properties
[root@zhiyong2 conf]# touch File2Avro.properties[root@zhiyong2 conf]# ll
总用量 28
-rw-r--r--  1 root   root      04月  13 00:00 File2Avro.properties
-rwxr-xr-x  1 root   root   16614月   121:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 166112月 182018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 14553月   123:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 145512月 182018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 15933月   123:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 156812月 182018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 31173月   123:10 log4j.properties
[root@zhiyong2 conf]# vim File2Avro.properties#Properties文件的内容在下边[root@zhiyong2 conf]# chmod 777 ./File2Avro.properties

这个Properties文件的内容:

agent1.sources = source1
agent1.channels = fileChannel
agent1.sinkgroups = sinkgroup1
agent1.sinks = sink1 sink2

agent1.sources.source1.type = TAILDIR
# 通过JSON记录偏移量
agent1.sources.source1.positionFile = /bigdataproj/data/source/logs/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /bigdataproj/data/source/logs/sogou.log
agent1.sources.source1.channels = fileChannel

# 配置channel的检查点
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /bigdataproj/data/channel/checkpointDir
agent1.channels.fileChannel.dataDirs = /bigdataproj/data/channel/dataDirs

# 配置sink为负载均衡,轮询
agent1.sinkgroups.sinkgroup1.sinks = sink1 sink2
agent1.sinkgroups.sinkgroup1.processor.type = load_balance
agent1.sinkgroups.sinkgroup1.processor.backoff = true
agent1.sinkgroups.sinkgroup1.processor.selector = round_robin
agent1.sinkgroups.sinkgroup1.processor.selector.maxTimeOut=10000

# 以Avro序列化落到zhiyong3
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.channel = fileChannel
agent1.sinks.sink1.batchSize = 1
agent1.sinks.sink1.hostname = zhiyong3
agent1.sinks.sink1.port = 12345

# 以Avro序列化落到zhiyong4
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.channel = fileChannel
agent1.sinks.sink2.batchSize = 1
agent1.sinks.sink2.hostname = zhiyong4
agent1.sinks.sink2.port = 12345

编写一个Flume的Avro2HBaseAndKafka.properties配置文件,读取Avro并且都把数据落入HBase和Kafka。

[root@zhiyong3 conf]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume/conf
[root@zhiyong3 conf]# touch Avro2HBaseAndKafka.properties[root@zhiyong3 conf]# chmod 777 ./Avro2HBaseAndKafka.properties[root@zhiyong3 conf]# ll
总用量 24
-rwxrwxrwx  1 root   root      04月  13 00:04 Avro2HBaseAndKafka.properties
-rw-rw-r--. 1 hadoop hadoop 166112月 182018 flume-conf.properties.template
-rwxr-xr-x. 1 hadoop hadoop 14553月   123:10 flume-env.ps1
-rw-rw-r--. 1 hadoop hadoop 145512月 182018 flume-env.ps1.template
-rwxr-xr-x. 1 hadoop hadoop 15933月   123:10 flume-env.sh
-rw-rw-r--. 1 hadoop hadoop 156812月 182018 flume-env.sh.template
-rwxrwxr-x. 1 hadoop hadoop 31173月   123:10 log4j.properties
[root@zhiyong3 conf]# vim Avro2HBaseAndKafka.properties#Properties的内容在下边[root@zhiyong3 conf]# scp ./Avro2HBaseAndKafka.properties root@zhiyong4:$PWD
Avro2HBaseAndKafka.properties                                                                                               100% 1617466.5KB/s   00:00
[root@zhiyong3 conf]#

这个Properties文件的内容:

agent1.sources = source1
agent1.channels = HBaseChannel KafkaChannel 
agent1.sinks =  HBaseSink kafkaSink

# Define and configure an avro
agent1.sources.source1.type = avro
agent1.sources.source1.channels = KafkaChannel HBaseChannel
# 监听所有主机
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 12345

agent1.sources.source1.selector.type = replicating

# 配置HBase的channel
agent1.channels.HBaseChannel.type = memory
agent1.channels.HBaseChannel.capacity = 10000
agent1.channels.HBaseChannel.transactionCapacity = 10000
agent1.channels.HBaseChannel.keep-alive = 5

# Define and configure a  sink
agent1.sinks.HBaseSink.type = asynchbase
agent1.sinks.HBaseSink.channel = HBaseChannel
agent1.sinks.HBaseSink.table = sougoulogs
agent1.sinks.HBaseSink.serializer = org.apache.flume.sink.hbase.SougoulogsAsyncHbaseEventSerializer
agent1.sinks.HBaseSink.zookeeperQuorum = zhiyong2:2181,zhiyong3:2181,zhiyong4:2181
agent1.sinks.HBaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
agent1.sinks.HBaseSink.znodeParent = /hbase
agent1.sinks.HBaseSink.columnFamily = info

# 配置Kafka的channel
agent1.channels.KafkaChannel.type = memory
agent1.channels.KafkaChannel.capacity = 10000
agent1.channels.KafkaChannel.transactionCapacity = 10000
agent1.channels.KafkaChannel.keep-alive = 5

agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = sougoulogs
agent1.sinks.kafkaSink.brokerList = zhiyong2:9092,zhiyong3:9092,zhiyong4:9092
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.kafkaSink.producer.acks = 1
agent1.sinks.kafkaSink.producer.requiredAcks = 1
agent1.sinks.kafkaSink.channel = KafkaChannel

补上就可以成功跑起来:

[root@zhiyong3 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:18:08 INFO source.AvroSource: Avro source source1 started.

同样的方式启动另一个Flume:

[root@zhiyong3 ~]# ssh zhiyong4
Last login: Tue Apr 1220:39:23 2022 from zhiyong3
[root@zhiyong4 ~]# date2022年 04月 13日 星期三 00:11:17 CST
[root@zhiyong4 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:19:58 INFO source.AvroSource: Avro source source1 started.

之后启动采集数据的Flume:

[root@zhiyong2 ~]# mkdir -p /bigdataproj/data/source/logs[root@zhiyong2 ~]# cd /bigdataproj/data/source/logs[root@zhiyong2 logs]# touch sogou.log[root@zhiyong2 logs]# chmod 777 ./sogou.log[root@zhiyong2 logs]# cd[root@zhiyong2 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/File2Avro.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1: Building RpcClient with hostname: zhiyong3, port: 123452022-04-13 00:25:59 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2022-04-13 00:25:59 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1 started.

Flink直接写HBase貌似比二开Flume还简单。Flume写HBase常用于HBase1,Flume1.9才能二开HBase2,HBase3都出现很久了。就让前浪拍x在沙滩上。采集日志Tail到到Kafka的功能还比较实用。

Spark

批处理用的挺多,2018年还有人用Streaming做简单的micro-batch处理,不过这都4年过去了。。。Structured Streaming又只能保证at least once,做不到Exactly Once。。。单独批处理当然还是优先Spark。

pom依赖

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>sougoulogs</artifactId><groupId>com.zhiyong</groupId><version>1.0.0</version></parent><modelVersion>4.0.0</modelVersion><artifactId>sparkDemo</artifactId><!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url></repository></repositories><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.version>2.12.12</scala.version><scala.binary.version>2.12</scala.binary.version><spark.version>3.0.1</spark.version><encoding>UTF-8</encoding></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- 添加spark依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!--        可以使用Lombok的@注解--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.20</version></dependency><!--        MySQL驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 可以设置jar包的入口类(可选) --><!--<mainClass>com.aa.flink.StreamWordCount</mainClass>--></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

配置类

packagecom.zhiyong.sparkDemo// 该类存储配置
object Constants{
  val url ="jdbc:mysql://192.168.88.100/sougoulogs"
  val userName ="root"
  val passWord ="123456"
  val kafkaServer ="zhiyong2:9092,zhiyong3:9092,zhiyong4:9092"
  val groupId ="sougoulogs"
  val offset ="earliest"
  val topic ="sougoulogs"
  val mysql8driver ="com.mysql.cj.jdbc.Driver"}

定义连接池

packagecom.zhiyong.sparkDemo;importjava.io.Serializable;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.SQLException;importjava.util.LinkedList;/**
 * @program: sougoulogs
 * @description: MySQL的连接池
 * @author: zhiyong
 * @create: 2022-04-06 19:46
 **/publicclassMySQLPoolimplementsSerializable{privateString url ="";privateString user ="";privateString password ="";privateint maxNum =5;// 最大同时使用的连接数privateint currentNum =0;//当前产生的连接数privateLinkedList<Connection> connections =newLinkedList<>();//可用连接/**
     * 传参用的构造方法
     * @param url JDBC连接串
     * @param user  用户名
     * @param password 密码
     */publicvoidinit(String url,String user,String password){this.url=url;this.user=user;this.password=password;}/**
     * 获取连接池的连接对象
     * @return 返回连接对象
     * @throws SQLException 抛异常
     */publicsynchronizedConnectiongetConn()throwsException{if(connections.isEmpty()){addDrive();Connection connection =DriverManager.getConnection(url, user, password);
            connections.push(connection);//入栈
            currentNum++;}return connections.poll();//弹栈}/**
     * 加载驱动
     */privatevoidaddDrive()throwsException{if(maxNum>currentNum&&(!connections.isEmpty())){Thread.sleep(1000);//等待获取连接addDrive();}else{Class.forName(Constants.mysql8driver());//加载驱动}}/**
     * 清除连接
     * @param conn 传入连接对象
     */privatevoidclearConn(Connection conn){
        connections.push(conn);}}

由于Spark的源码为Scala,实际上可以实现Java与Scala混编。

Streaming方式

packagecom.zhiyong.sparkDemoimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimportorg.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimportorg.apache.spark.streaming.kafka010.KafkaUtilsimportorg.apache.spark.streaming.{Seconds,StreamingContext}importjava.sql.{Connection,DriverManager,Statement}// 使用SparkStreaming:读取Kafka数据,处理后落入MySQL
object Kafka2MySQL_Streaming{// 统计新闻浏览量
  def newsCount(records:Iterator[(String,Int)]):Unit={var conn:Connection=null//声明SQL连接var statement:Statement=null//生命SQL执行对象try{
      conn =DriverManager.getConnection(Constants.url,Constants.userName,Constants.passWord)//获取连接对象

      records.foreach(data =>{
        val name = data._1.replaceAll("[\\[\\]]","")
        val count = data._2

        val sql1 ="select 1 from newscount where name = '"+ name +"'"//用于判断是否已经有这条数据
        val sql2 ="update newscount set count = "+ count +" where name = '"+ name +"'"
        val sql3 ="insert into newscount(name,count) values('"+ name +"',"+ count +")"

        statement = conn.createStatement()
        val resultSet = statement.executeQuery(sql1)if(resultSet.next()){
          statement.executeUpdate(sql2)//有数据就更新}else{
          statement.execute(sql3)//没有数据就插入}})}catch{case e:Exception=> e.printStackTrace()}finally{//最后要关流if(null!= statement){
        statement.close()}if(null!= conn){
        conn.close()}}}// 统计时段浏览量
  def periodCount(records:Iterator[(String,Int)]):Unit={var conn:Connection=nullvar statement:Statement=nulltry{
      conn =DriverManager.getConnection(Constants.url,Constants.userName,Constants.passWord)
      records.foreach(data =>{
        val logtime = data._1
        val count = data._2

        val sql1 ="select 1 from periodcount where logtime = '"+ logtime +"'"
        val sql2 ="update periodcount set count = "+ count +" where logtime='"+ logtime +"'"
        val sql3 ="insert into periodcount(logtime,count) values('"+ logtime +"',"+ count +")"

        statement = conn.createStatement()
        val resultSet = statement.executeQuery(sql1)if(resultSet.next()){
          statement.execute(sql2)}else{
          statement.execute(sql3)}})}catch{case e:Exception=> e.printStackTrace()}finally{if(null!= statement){
        statement.close()}if(null!= conn){
        conn.close()}}}// 主方法
  def main(args:Array[String]):Unit={print("start:Kafka2MySQL_Streaming")
    val sparkConf =newSparkConf().setAppName("sougoulogs_Streaming").setMaster("local[2]")
    val ssc =newStreamingContext(sparkConf,Seconds(1))// 每秒做一次micro-batch// 统一配置Kafka的参数
    val kafkaParams =Map[String,Object]{"bootstrap.servers"->Constants.kafkaServer
      "key.deserializer"-> classOf[StringDeserializer]"value.deserializer"-> classOf[StringDeserializer]"group.id"->Constants.groupId
      "auto.offset.reset"->Constants.offset
      "enable.auto.commit"->java.lang.Boolean.TRUE
    }

    val topics =Array(Constants.topic)
    val stream =KafkaUtils.createDirectStream[String,String](
      ssc,PreferConsistent,Subscribe[String,String](topics, kafkaParams))//    val lines = stream.map(record => record.value)// 去除脏数据
    val cleanStream = stream.map(_.value()).map(_.split(",")).filter(_.length ==6)// 获取新闻浏览量指标的流
    val newsCountDStream = cleanStream
      .map(x =>(x(2),1)).reduceByKey(_ + _)// 获取时段浏览量的流
    val periodCountDStream = cleanStream
      .map(x =>(x(0),1)).reduceByKey(_ + _)// 处理流
    newsCountDStream.foreachRDD(rdd =>{print("每个分区都执行读取Kafka数据计算新闻话题浏览量并将结果写入MySQL")
      rdd.foreachPartition(newsCount)})

    periodCountDStream.foreachRDD(rdd =>{print("每个分区都执行读取Kafka数据计算时段浏览量并将结果写入MySQL")
      rdd.foreachPartition(periodCount)})//启动流
    ssc.start()// Spark的Streaming(DStream)必须有这句,否则会报错
    ssc.awaitTermination()}}

Structured Streaming方式

packagecom.zhiyong.sparkDemoimportorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.streaming.{OutputMode,Trigger}// 使用SparkStructuredStreaming:读取Kafka数据,处理后写入MySQL
object Kafka2MySQL_StructuredStreaming{//// 主方法
  def main(args:Array[String]):Unit={print("start:Kafka2MySQL_StructuredStreaming")
    val spark =SparkSession.builder().appName("").master("local[2]").getOrCreate()importspark.implicits._//导入隐式推测防止报错

    val cleanSs = spark.readStream
      .format("kafka").option("kafka.bootstrap.servers",Constants.kafkaServer).option("subscribe",Constants.topic).load().selectExpr("cast(key as string)","cast(value as string)")//Java版Spark可以传String[].as[(String,String)].map(_._2).map(_.split(",")).filter(6== _.length)

    val newsCountsSs = cleanSs.map(_ (2)).groupBy("value").count().toDF("name","count")

    val periodCountsSs = cleanSs.map(_ (0)).groupBy("value").count().toDF("logtime","count")

    val sink1 =newSink1(Constants.url,Constants.userName,Constants.passWord)
    val sink2 =newSink2(Constants.url,Constants.userName,Constants.passWord)

    val query1 = newsCountsSs.writeStream
      .foreach(sink1).outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("1 seconds")).start()

    val query2 = periodCountsSs.writeStream
      .foreach(sink2).outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("1 seconds")).start()

    query1.awaitTermination()
    query2.awaitTermination()}}

对应的2个Sink:

packagecom.zhiyong.sparkDemoimportorg.apache.spark.sql.{ForeachWriter,Row}importjava.sql.{Connection,ResultSet,Statement}/**
 * 自定义Sink,将数据写入MySQL
 *
 * @param url      连接串
 * @param user     用户名
 * @param password 密码
 */classSink1(url:String, user:String, password:String)extendsForeachWriter[Row]{var statement:Statement= _
  var resultset:ResultSet= _
  var conn:Connection= _

  /**
   * 初始化连接,从连接池获取连接对象
   *
   * @param partitionId
   * @param epochId
   * @return
   */
  override def open(partitionId:Long, epochId:Long):Boolean={Class.forName(Constants.mysql8driver)
    val pool =newMySQLPool()
    pool.init(url, user, password)
    conn = pool.getConn
    statement = conn.createStatement()true}

  override def process(value:Row):Unit={
    val name = value.getAs[String]("name").replace("[\\[\\]]","")
    val count = value.getAs[Long]("count").asInstanceOf[Int]
    val sql1 ="select 1 from newscount where name = '"+ name +"'"
    val sql2 ="insert into newscount(name,count) values('"+ name +"',"+ count +")"
    val sql3 ="update newscount set count = "+ count +" where name = '"+ name +"'"try{
      resultset = statement.executeQuery(sql1)if(resultset.next()){
        statement.execute(sql3)}else{
        statement.execute(sql2)}}catch{case e:Exception=>print("出现异常“"+ e.getMessage)}}

  override def close(errorOrNull:Throwable):Unit={if(null!= statement){
      statement.close()}if(null!= conn){
      conn.close()}}}

另一个半差不差:

packagecom.zhiyong.sparkDemoimportorg.apache.spark.sql.{ForeachWriter,Row}importjava.sql.{Connection,ResultSet,Statement}classSink2(url:String, user:String, password:String)extendsForeachWriter[Row]{var statement:Statement= _
  var resultset:ResultSet= _
  var conn:Connection= _

  /**
   * 初始化连接,从连接池获取连接对象
   *
   * @param partitionId
   * @param epochId
   * @return
   */
  override def open(partitionId:Long, epochId:Long):Boolean={Class.forName(Constants.mysql8driver)
    val pool =newMySQLPool()
    pool.init(url, user, password)
    conn = pool.getConn
    statement = conn.createStatement()true}

  override def process(value:Row):Unit={
    val logtime = value.getAs[String]("logtime")
    val count = value.getAs[Long]("count").asInstanceOf[Int]
    val sql1 ="select 1 from periodcount where logtime = '"+ logtime +"'"
    val sql2 ="insert into periodcount(logtime,count) values('"+ logtime +"',"+ count +")"
    val sql3 ="update periodcount set count = "+ count +" where logtime = '"+ logtime +"'"try{
      resultset = statement.executeQuery(sql1)if(resultset.next()){
        statement.execute(sql3)}else{
        statement.execute(sql2)}}catch{case e:Exception=>print("出现异常“"+ e.getMessage)}}

  override def close(errorOrNull:Throwable):Unit={if(null!= statement){
      statement.close()}if(null!= conn){
      conn.close()}}}

追加数据的方法

packagecom.zhiyong.sparkDemo;importjava.io.*;/**
 * @program: sougoulogs
 * @description: 模拟数据生成
 * @author: zhiyong
 * @create: 2022-04-04 23:43
 **/publicclassMockData{// 主方法publicstaticvoidmain(String[] args){String inputPath = args[0];String outputPath = args[1];String interval = args[2];System.out.println("读数路径:"+ interval);System.out.println("写log路径“"+ outputPath);System.out.println("产生数据间隔:"+ interval +"ms");try{mock(inputPath, outputPath, interval);}catch(Exception e){
            e.printStackTrace();}}/**
     * 模拟生成数据
     *
     * @param inputPath  读取文件的路径
     * @param outputPath 写入数据的路径
     * @param interval   mock数据的时间间隔
     */privatestaticvoidmock(String inputPath,String outputPath,String interval)throwsIOException{FileInputStream fis =null;InputStreamReader isr =null;BufferedReader br =null;String line =null;try{int counter =1;//记录行号
            fis =newFileInputStream(inputPath);
            isr =newInputStreamReader(fis,"GBK");
            br =newBufferedReader(isr);while(null!=(line = br.readLine())){System.out.println("产生第"+ counter +"条数据:"+ line);wrieData(outputPath, line);
                counter++;Thread.sleep(Long.parseLong(interval));}}catch(Exception e){
            e.printStackTrace();}finally{// 最后要关流,防止文件持续占用if(null!= br){
                br.close();}if(null!= isr){
                isr.close();}if(null!= fis){
                fis.close();}}}/**
     * 写数据
     *
     * @param outputPath 输出路径
     * @param line       每行数据
     */privatestaticvoidwrieData(String outputPath,String line)throwsIOException{FileOutputStream fos =null;OutputStreamWriter osr =null;BufferedWriter bw =null;try{
            fos =newFileOutputStream(outputPath,true);//允许追加
            osr =newOutputStreamWriter(fos);
            bw =newBufferedWriter(osr);
            bw.write(line);
            bw.write("\n");}catch(Exception e){
            e.printStackTrace();}finally{if(null!=bw){
                bw.close();}if(null!=osr){
                osr.close();}if(null!=fos){
                fos.close();}}}}

为了使用Flume的Tail采集数据,需要写个自动的方法来替代手工echo并重定向到文件。

其实在Flink里使用各种富函数,重新定义方法就很方便,不需要这么麻烦。Flume、Sqoop这类工具,可能只是方便不会写Java的SQL Boy们吧。不过这年头已经有阿里云DataPhin,华为云FusionInsight之类的成熟产品,还有kyuubi这类可以直接配置就能跑数据集成的开源组件,大数据的门槛属实低了好多。

标签: 大数据 flink Fine BI

本文转载自: https://blog.csdn.net/qq_41990268/article/details/124161096
版权归原作者 虎鲸不是鱼 所有, 如有侵权,请联系我们删除。

“使用Flink1.14.3与Kafka、Fine BI练习搜狗日志实时BI小项目”的评论:

还没有评论