文章目录
22:FineBI配置数据集
- 目标:实现FineBI访问MySQL结果数据集的配置
- 实施- 安装FineBI- 参考《FineBI Windows版本安装手册.docx》安装FineBI- 配置连接
数据连接名称:Momo用户名:root密码:自己MySQL的密码数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
- 数据准备SELECT id, momo_totalcount,momo_province,momo_username,momo_msgcount,CASE momo_grouptype WHEN'1'THEN'总消息量'WHEN'2'THEN'各省份发送量'WHEN'3'THEN'各省份接收量'WHEN'4'THEN'各用户发送量'WHEN'5'THEN'各用户接收量'ENDAS momo_grouptypeFROM momo_count
- 小结- 实现FineBI访问MySQL结果数据集的配置
23:FineBI构建报表
- 目标:实现FineBI实时报表构建
- 路径- step1:实时报表构建- step2:实时报表配置- step3:实时刷新测试
- 实施- 实时报表构建- 新建仪表盘- 添加标题- 实时总消息数- 发送消息最多的Top10用户- 接受消息最多的Top10用户- 各省份发送消息Top10- 各省份接收消息Top10- 各省份总消息量
- 小结- 实现FineBI实时报表构建
24:FineBI实时配置测试
- 目标:实现实时报表测试
- 实施- 实时报表配置- 官方文档:https://help.fanruan.com/finebi/doc-view-363.html- 添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下- 注意:如果提示已存在,就选择覆盖- 添加JS文件- 创建js文件:refresh.js
setTimeout(function(){var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//这里要指定自己仪表盘的idif(a=="d574631848bd4e33acae54f986d34e69"){setInterval(function(){BI.SharingPool.put("controlFilters",BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);},3000);//5000000为定时刷新的频率,单位ms}},2000)
- 将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中- 关闭FineBI缓存,然后关闭FineBI- 修改jar包,添加js<!-- 增加刷新功能 --> <script type="text/javascript" src="/webroot/refresh.js"></script>
- 重启FineBI - 实时刷新测试- 清空MySQL结果表- 启动Flink程序:运行MoMoFlinkCount- 启动Flume程序
cd /export/server/flume-1.9.0-binbin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
- 启动模拟数据java-jar /export/data/momo_init/MoMo_DataGen.jar \/export/data/momo_init/MoMo_Data.xlsx \/export/data/momo_data/ \10`````` - 观察报表
- 小结- 实现FineBI实时测试
## 附录一:Maven依赖
```xml
<!--远程仓库-->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<!--Hbase 客户端-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<!--kafka 客户端-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!--JSON解析工具包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!--Flink依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!--HTTP请求的的依赖-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.4</version>
</dependency>
<!--MySQL连接驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
附录二:离线消费者完整代码
packagebigdata.itcast.cn.momo.offline;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.client.Table;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.hbase.util.MD5Hash;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.consumer.OffsetAndMetadata;importorg.apache.kafka.common.TopicPartition;importjava.io.IOException;importjava.text.ParseException;importjava.text.SimpleDateFormat;importjava.time.Duration;importjava.util.*;/**
* @ClassName MomoKafkaToHbase
* @Description TODO 离线场景:消费Kafka的数据写入Hbase
* @Create By Maynor
*/publicclassMomoKafkaToHbase{privatestaticSimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");privatestaticConnection conn;privatestaticTable table;privatestaticTableName tableName =TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名privatestaticbyte[] family =Bytes.toBytes("C1");//列族//todo:2-构建Hbase连接//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try{//构建配置对象Configuration conf =HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");//构建连接
conn =ConnectionFactory.createConnection(conf);//获取表对象
table = conn.getTable(tableName);}catch(IOException e){
e.printStackTrace();}}publicstaticvoidmain(String[] args)throwsException{//todo:1-构建消费者,获取数据consumerKafkaToHbase();// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");// System.out.println(momoRowkey);}/**
* 用于消费Kafka的数据,将合法数据写入Hbase
*/privatestaticvoidconsumerKafkaToHbase()throwsException{//构建配置对象Properties props =newProperties();//指定服务端地址
props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");//指定消费者组的id
props.setProperty("group.id","momo1");//关闭自动提交
props.setProperty("enable.auto.commit","false");//指定K和V反序列化的类型
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//构建消费者的连接KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);//指定订阅哪些Topic
consumer.subscribe(Arrays.asList("MOMO_MSG"));//持续拉取数据while(true){//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据:打印//取出每个分区的数据进行处理Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for(TopicPartition partition : partitions){List<ConsumerRecord<String,String>> partRecords = records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset =0;for(ConsumerRecord<String,String> record : partRecords){//获取TopicString topic = record.topic();//获取分区int part = record.partition();//获取offset
offset = record.offset();//获取KeyString key = record.key();//获取ValueString value = record.value();System.out.println(topic +"\t"+ part +"\t"+ offset +"\t"+ key +"\t"+ value);//将Value数据写入Hbaseif(value !=null&&!"".equals(value)&& value.split("\001").length ==20){writeToHbase(value);}}//手动提交分区的commit offsetMap<TopicPartition,OffsetAndMetadata> offsets =Collections.singletonMap(partition,newOffsetAndMetadata(offset+1));
consumer.commitSync(offsets);}}}/**
* 用于实现具体的写入Hbase的方法
* @param value
*/privatestaticvoidwriteToHbase(String value)throwsException{//todo:3-写入Hbase//切分数据String[] items = value.split("\001");String stime = items[0];String sender_accounter = items[2];String receiver_accounter = items[11];//构建rowkeyString rowkey =getMomoRowkey(stime,sender_accounter,receiver_accounter);//构建PutPut put =newPut(Bytes.toBytes(rowkey));//添加列
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));//执行写入
table.put(put);}/**
* 基于消息时间、发送人id、接受人id构建rowkey
* @param stime
* @param sender_accounter
* @param receiver_accounter
* @return
* @throws Exception
*/privatestaticStringgetMomoRowkey(String stime,String sender_accounter,String receiver_accounter)throwsException{//转换时间戳long time = format.parse(stime).getTime();String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;//构建MD5String prefix =MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix+"_"+suffix;}}
本文转载自: https://blog.csdn.net/xianyu120/article/details/133922455
版权归原作者 AI_Maynor 所有, 如有侵权,请联系我们删除。
版权归原作者 AI_Maynor 所有, 如有侵权,请联系我们删除。