0


基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析

文章目录

08:离线分析:Hbase表设计及构建

  • 目标掌握Hbase表的设计及创建表的实现
  • 路径- step1:基础设计- step2:Rowkey设计- step3:分区设计- step4:建表
  • 实施- 基础设计- Namespace:MOMO_CHAT- Table:MOMO_MSG- Family:C1- Qualifier:与数据中字段名保持一致image-20210905200550740- Rowkey设计- 查询需求:根据发件人id + 收件人id + 消息日期 查询聊天记录- 发件人账号- 收件人账号- 时间- 设计规则:业务、唯一、长度、散列、组合- 设计实现- 加盐方案:CRC、Hash、MD5、MUR- => 8位、16位、32位MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间- 分区设计- Rowkey前缀:MD5编码,由字母和数字构成- 数据并发量:高- 分区设计:使用HexSplit16进制划分多个分区- 建表- 启动Hbase:start-hbase.sh- 进入客户端:hbase shell#创建NScreate_namespace 'MOMO_CHAT'#建表create 'MOMO_CHAT:MOMO_MSG', {NAME =>"C1", COMPRESSION =>"GZ"}, { NUMREGIONS =>6, SPLITALGO =>'HexStringSplit'}image-20210905192807020
  • 小结- 掌握Hbase表的设计及创建表的实现

09:离线分析:Kafka消费者构建

  • 目标实现离线消费者的开发
  • 路径- 整体实现的路径//入口:调用实现消费Kafka,将数据写入Hbasepublicvoidmain(){//step1:消费KafkaconsumerKafka();}//用于消费Kafka数据publicvoidconsumerKafka(){ prop =newProperties()KafkaConsumer consumer =newKafkaConsumer(prop) consumer.subscribe("MOMO_MSG")ConsumerRecords records = consumer.poll //基于每个分区来消费和处理record :Topic、Partition、Offset、Key、Value//step2:写入HbasewriteToHbase(value)//提交这个分区的offsetcommitSycn(offset+1)}//用于将value的数据写入Hbase方法publicvoidwriteToHbase(){//step1:构建连接//step2:构建Table对象//step3:构建Put对象//获取rowkey rowkey =getRowkey(value)Put put =newPut(rowkey) put.添加每一列 table.put()}publicStringgetRowkey(){ value.getSender value.getReceiver value.getTime rowkey =MD5+sender+receiverId +time return rowkey}
  • 实施/** * 用于消费Kafka的数据,将合法数据写入Hbase */privatestaticvoidconsumerKafkaToHbase()throwsException{//构建配置对象Properties props =newProperties();//指定服务端地址 props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");//指定消费者组的id props.setProperty("group.id","momo");//关闭自动提交 props.setProperty("enable.auto.commit","false");//指定K和V反序列化的类型 props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//构建消费者的连接KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);//指定订阅哪些Topic consumer.subscribe(Arrays.asList("MOMO_MSG"));//持续拉取数据while(true){//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));//todo:3-处理拉取到的数据:打印//取出每个分区的数据进行处理Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区//对每个分区的数据做处理for(TopicPartition partition : partitions){List<ConsumerRecord<String,String>> partRecords = records.records(partition);//取出这个分区的所有数据//处理这个分区的数据long offset =0;for(ConsumerRecord<String,String> record : partRecords){//获取TopicString topic = record.topic();//获取分区int part = record.partition();//获取offset offset = record.offset();//获取KeyString key = record.key();//获取ValueString value = record.value();System.out.println(topic +"\t"+ part +"\t"+ offset +"\t"+ key +"\t"+ value);//将Value数据写入Hbaseif(value !=null&&!"".equals(value)&& value.split("\001").length ==20){writeToHbase(value);}}//手动提交分区的commit offsetMap<TopicPartition,OffsetAndMetadata> offsets =Collections.singletonMap(partition,newOffsetAndMetadata(offset+1)); consumer.commitSync(offsets);}}}
  • 小结- 实现离线消费者的开发

10:离线分析:Hbase连接构建

  • 目标实现Hbase连接的构建
  • 实施privatestaticSimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");privatestaticConnection conn;privatestaticTable table;privatestaticTableName tableName =TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名privatestaticbyte[] family =Bytes.toBytes("C1");//列族// 静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能static{try{//构建配置对象Configuration conf =HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");//构建连接 conn =ConnectionFactory.createConnection(conf);//获取表对象 table = conn.getTable(tableName);}catch(IOException e){ e.printStackTrace();}}
  • 小结- 实现Hbase连接的构建

11:离线分析:Rowkey的构建

  • 目标实现Rowkey的构建
  • 实施privatestaticStringgetMomoRowkey(String stime,String sender_accounter,String receiver_accounter)throwsException{//转换时间戳long time = format.parse(stime).getTime();String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;//构建MD5String prefix =MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);//合并返回return prefix+"_"+suffix;}
  • 小结- 实现Rowkey的构建

12:离线分析:Put数据列构建

  • 目标实现Put数据列的构建
  • 实施put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
  • 小结- 实现Put数据列的构建

13:离线分析:存储运行测试

  • 目标测试运行消费Kafka数据动态写入Hbase
  • 实施- 启动消费者程序- 启动Flume程序cd /export/server/flume-1.9.0-binbin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console- 启动模拟数据java-jar /export/data/momo_init/MoMo_DataGen.jar \/export/data/momo_init/MoMo_Data.xlsx \/export/data/momo_data/ \10- 观察Hbase结果image-20210905213457245
  • 小结- 测试运行消费Kafka数据动态写入Hbase

14:离线分析:Hive关联测试

  • 目标使用Hive关联Hbase实现离线分析
  • 路径- step1:关联- step2:查询
  • 实施- 启动Hive和yarnstart-yarn.shhive-daemon.sh metastorehive-daemon.sh hiveserver2start-beeline.sh- 关联createdatabase MOMO_CHAT;use MOMO_CHAT;create external tableifnotexists MOMO_CHAT.MOMO_MSG ( id string, msg_time string , sender_nickyname string , sender_account string , sender_sex string , sender_ip string , sender_os string , sender_phone_type string , sender_network string , sender_gps string , receiver_nickyname string , receiver_ip string , receiver_account string , receiver_os string , receiver_phone_type string , receiver_network string , receiver_gps string , receiver_sex string , msg_type string , distance string , message string ) stored by'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname, C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type,C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account,C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex,C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');- 分析查询--基础查询select msg_time,sender_nickyname,receiver_nickyname,distance from momo_msg limit10;--查询聊天记录:发送人id + 接收人id + 日期:1f300e5d_13280256412_15260978785_1632888342000select*from momo_msg where sender_account='13280256412'and receiver_account='15260978785'and substr(msg_time,0,10)='2021-09-29';--统计每个小时的消息数select substr(msg_time,0,13)ashour,count(*)as cntfrom momo_msggroupby substr(msg_time,0,13);
  • 小结- 使用Hive关联Hbase实现离线分析

15:离线分析:Phoenix关联测试

  • 目标使用Phoenix关联Hbase实现即时查询
  • 路径- step1:关联- step2:查询
  • 实施- 启动cd /export/server/phoenix-5.0.0-HBase-2.0-bin/bin/sqlline.py node1:2181- 关联createviewifnotexists MOMO_CHAT.MOMO_MSG ("id"varcharprimarykey, C1."msg_time"varchar, C1."sender_nickyname"varchar, C1."sender_account"varchar, C1."sender_sex"varchar, C1."sender_ip"varchar, C1."sender_os"varchar, C1."sender_phone_type"varchar, C1."sender_network"varchar, C1."sender_gps"varchar, C1."receiver_nickyname"varchar, C1."receiver_ip"varchar, C1."receiver_account"varchar, C1."receiver_os"varchar, C1."receiver_phone_type"varchar, C1."receiver_network"varchar, C1."receiver_gps"varchar, C1."receiver_sex"varchar, C1."msg_type"varchar, C1."distance"varchar, C1."message"varchar);- 即时查询--基础查询select"id",c1."sender_account",c1."receiver_account"from momo_chat.momo_msg limit10;--查询每个发送人发送的消息数select c1."sender_account",count(*)as cnt from momo_chat.momo_msg groupby c1."sender_account";--查询每个发送人聊天的人数select c1."sender_account",count(distinct c1."receiver_account")as cnt from momo_chat.momo_msg groupby c1."sender_account"orderby cnt desc;
  • 小结- 使用Phoenix关联Hbase实现即时查询
标签: flume kafka hbase

本文转载自: https://blog.csdn.net/xianyu120/article/details/133811744
版权归原作者 AI_Maynor 所有, 如有侵权,请联系我们删除。

“基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析”的评论:

还没有评论