0


基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

文章目录

22:FineBI配置数据集

  • 目标实现FineBI访问MySQL结果数据集的配置
  • 实施- 安装FineBI- 参考《FineBI Windows版本安装手册.docx》安装FineBIimage-20210906214702837- 配置连接image-20210906214908806image-20210906214943267image-20210906215001069数据连接名称:Momo用户名:root密码:自己MySQL的密码数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8image-20210906215136987image-20210906215313596- 数据准备image-20210906233741527image-20210906215517834image-20210906215600395SELECT id, momo_totalcount,momo_province,momo_username,momo_msgcount,CASE momo_grouptype WHEN'1'THEN'总消息量'WHEN'2'THEN'各省份发送量'WHEN'3'THEN'各省份接收量'WHEN'4'THEN'各用户发送量'WHEN'5'THEN'各用户接收量'ENDAS momo_grouptypeFROM momo_count
  • 小结- 实现FineBI访问MySQL结果数据集的配置

23:FineBI构建报表

  • 目标实现FineBI实时报表构建
  • 路径- step1:实时报表构建- step2:实时报表配置- step3:实时刷新测试
  • 实施- 实时报表构建- 新建仪表盘image-20210906221339838image-20210906221410591- 添加标题image-20210906221452201image-20210906221633739- 实时总消息数image-20210906225231210- 发送消息最多的Top10用户image-20210906221821438image-20210906222156861image-20210906222225524image-20210906222300546image-20210906222336466image-20210906222405217image-20210906222544774image-20210906222815956- 接受消息最多的Top10用户image-20210906224107608image-20210906224155452image-20210906224301084image-20210906224422220- 各省份发送消息Top10image-20210906224657081image-20210906224806298image-20210906224850783- 各省份接收消息Top10image-20210906224548114image-20210906223310186image-20210906223414046image-20210906223433477image-20210906223453710image-20210906223805626- 各省份总消息量image-20210906225451414image-20210906225508401image-20210906225557658image-20210906230243869
  • 小结- 实现FineBI实时报表构建

24:FineBI实时配置测试

  • 目标:实现实时报表测试
  • 实施- 实时报表配置- 官方文档:https://help.fanruan.com/finebi/doc-view-363.html- 添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下image-20210906230548177- 注意:如果提示已存在,就选择覆盖- 添加JS文件- 创建js文件:refresh.jssetTimeout(function(){var b =document.title;var a =BI.designConfigure.reportId;//获取仪表板id//这里要指定自己仪表盘的idif(a=="d574631848bd4e33acae54f986d34e69"){setInterval(function(){BI.SharingPool.put("controlFilters",BI.Utils.getControlCalculations());//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());BI.Utils.broadcastAllWidgets2Refresh(true);},3000);//5000000为定时刷新的频率,单位ms}},2000)- 将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中image-20210906231356346- 关闭FineBI缓存,然后关闭FineBIimage-20210906231254734- 修改jar包,添加jsimage-20210906231519478image-20210906231626750image-20210906231721464image-20210906231735007<!-- 增加刷新功能 --> <script type="text/javascript" src="/webroot/refresh.js"></script>- 重启FineBI
  • 实时刷新测试- 清空MySQL结果表- 启动Flink程序:运行MoMoFlinkCount- 启动Flume程序cd /export/server/flume-1.9.0-binbin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console- 启动模拟数据java-jar /export/data/momo_init/MoMo_DataGen.jar \/export/data/momo_init/MoMo_Data.xlsx \/export/data/momo_data/ \10`````` - 观察报表

image-20210906235752933

image-20210906235808012

  • 小结- 实现FineBI实时测试

## 附录一:Maven依赖

​```xml
  <!--远程仓库-->
  <repositories>
      <repository>
          <id>aliyun</id>
          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
          <releases><enabled>true</enabled></releases>
          <snapshots>
              <enabled>false</enabled>
              <updatePolicy>never</updatePolicy>
          </snapshots>
      </repository>
  </repositories>
  <dependencies>
      <!--Hbase 客户端-->
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>2.1.0</version>
      </dependency>
      <!--kafka 客户端-->
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.4.1</version>
      </dependency>
      <!--JSON解析工具包-->
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <!--Flink依赖-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-runtime-web_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-shaded-hadoop-2-uber</artifactId>
          <version>2.7.5-10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-jdbc_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>flink-connector-redis_2.11</artifactId>
          <version>1.0</version>
      </dependency>
      <!--HTTP请求的的依赖-->
      <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>4.5.4</version>
      </dependency>
      <!--MySQL连接驱动-->
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.38</version>
      </dependency>
  </dependencies>

  <build>
      <plugins>
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.1</version>
              <configuration>
                  <target>1.8</target>
                  <source>1.8</source>
              </configuration>
          </plugin>
      </plugins>
  </build>

附录二:离线消费者完整代码

packagebigdata.itcast.cn.momo.offline;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.client.Table;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.hbase.util.MD5Hash;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.consumer.OffsetAndMetadata;importorg.apache.kafka.common.TopicPartition;importjava.io.IOException;importjava.text.ParseException;importjava.text.SimpleDateFormat;importjava.time.Duration;importjava.util.*;/**
 * @ClassName MomoKafkaToHbase
 * @Description TODO 离线场景:消费Kafka的数据写入Hbase
 * @Create By     Maynor
 */publicclassMomoKafkaToHbase{privatestaticSimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");privatestaticConnection conn;privatestaticTable table;privatestaticTableName tableName =TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名privatestaticbyte[] family =Bytes.toBytes("C1");//列族//todo:2-构建Hbase连接//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try{//构建配置对象Configuration conf =HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");//构建连接
            conn =ConnectionFactory.createConnection(conf);//获取表对象
            table = conn.getTable(tableName);}catch(IOException e){
            e.printStackTrace();}}publicstaticvoidmain(String[] args)throwsException{//todo:1-构建消费者,获取数据consumerKafkaToHbase();//        String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");//        System.out.println(momoRowkey);}/**
     * 用于消费Kafka的数据,将合法数据写入Hbase
     */privatestaticvoidconsumerKafkaToHbase()throwsException{//构建配置对象Properties props =newProperties();//指定服务端地址
        props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");//指定消费者组的id
        props.setProperty("group.id","momo1");//关闭自动提交
        props.setProperty("enable.auto.commit","false");//指定K和V反序列化的类型
        props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//构建消费者的连接KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);//指定订阅哪些Topic
        consumer.subscribe(Arrays.asList("MOMO_MSG"));//持续拉取数据while(true){//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据:打印//取出每个分区的数据进行处理Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for(TopicPartition partition : partitions){List<ConsumerRecord<String,String>> partRecords = records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset =0;for(ConsumerRecord<String,String> record : partRecords){//获取TopicString topic = record.topic();//获取分区int part = record.partition();//获取offset
                    offset = record.offset();//获取KeyString key = record.key();//获取ValueString value = record.value();System.out.println(topic +"\t"+ part +"\t"+ offset +"\t"+ key +"\t"+ value);//将Value数据写入Hbaseif(value !=null&&!"".equals(value)&& value.split("\001").length ==20){writeToHbase(value);}}//手动提交分区的commit offsetMap<TopicPartition,OffsetAndMetadata> offsets =Collections.singletonMap(partition,newOffsetAndMetadata(offset+1));
                consumer.commitSync(offsets);}}}/**
     * 用于实现具体的写入Hbase的方法
     * @param value
     */privatestaticvoidwriteToHbase(String value)throwsException{//todo:3-写入Hbase//切分数据String[] items = value.split("\001");String stime = items[0];String sender_accounter = items[2];String receiver_accounter = items[11];//构建rowkeyString rowkey =getMomoRowkey(stime,sender_accounter,receiver_accounter);//构建PutPut put =newPut(Bytes.toBytes(rowkey));//添加列
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));//执行写入
        table.put(put);}/**
     * 基于消息时间、发送人id、接受人id构建rowkey
     * @param stime
     * @param sender_accounter
     * @param receiver_accounter
     * @return
     * @throws Exception
     */privatestaticStringgetMomoRowkey(String stime,String sender_accounter,String receiver_accounter)throwsException{//转换时间戳long time = format.parse(stime).getTime();String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;//构建MD5String prefix =MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix+"_"+suffix;}}
标签: flume kafka hbase

本文转载自: https://blog.csdn.net/xianyu120/article/details/133922455
版权归原作者 AI_Maynor 所有, 如有侵权,请联系我们删除。

“基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化”的评论:

还没有评论