- ** 通话记录数据分析1. ** 项目背景
通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。
- ** 项目架构**
- ** 整体架构**
- ** 数据分析流程**
- ** 数据展示流程**
- ** 消费模型**
- ** 项目实现**
系统环境:
表1
系统
版本
windows
10 专业版
linux
CentOS 6.8
开发工具:
表2
工具
版本
idea
2017.2.5旗舰版
maven
3.3.9
JDK
1.8+
提示:idea2017.2.5必须使用maven3.3.9,不要使用maven3.5,有部分兼容性问题
集群环境:
表3
框架
版本
hadoop
2.7.2
zookeeper
3.4.10
hbase
1.3.1
flume
1.7.0
kafka
2.11-0.11.0.0
硬件环境:
表4
hadoop102
hadoop103
hadoop104
内存
4G
2G
2G
CPU
2核
1核
1核
硬盘
50G
50G
50G
- ** 数据生产**
此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。
- 数据结构
我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个flag作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。
表5
列名
解释
举例
call1
第一个手机号码
15369468720
call1_name
第一个手机号码人姓名(非必须)
李雁
call2
第二个手机号码
19920860202
call2_name
第二个手机号码人姓名(非必须)
卫艺
date_time
建立通话的时间
20171017081520
date_time_ts
建立通话的时间(时间戳形式)
duration
通话持续时间(秒)
0600
- 编写代码
思路
1.创建Java集合类存放模拟的电话号码和联系人;
2.随机选取两个手机号码当作“主叫”与“被叫”(注意判断两个手机号不能重复),产出call1与call2字段数据;
3.创建随机生成通话建立时间的方法,可指定随机范围,最后生成通话建立时间,产出date_time字段数据;
4.随机一个通话时长,单位:秒,产出duration字段数据;
5.将产出的一条数据拼接封装到一个字符串中;
6.使用IO操作将产出的一条通话数据写入到本地文件中;
代码
首先新建一个maven项目 qingtaishuju-project-ct 作为整个工程的父项目,因为所有的业务不可能通过一个项目来实现,会有很多模块,比如有生产、消费、统计、展示等等。
**1.创建通用基础模块 **
新建ct-common,创建好之后导入项目所需要的依赖,如下所示。
<dependencies> <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
依赖导入成功之后,接下来创建相应的包、接口和类,搭建基础的环境,模块结构如下
将对应的包复制到对应的
2.数据生产模块
- 通用模块创建好之后,新建数据生产模块:ct-producer
模块包结构如下:
- 打开Bootstrap类,修改代码如下
- 将cantact.log复制到Datas目录下(需要新建Datas目录)
运行Bootstrap的main方法
此时在Datas目录下自动生成call.log文件,并持续写入数据(点击可停止运行)
3.上传至Linux生成数据
- 将主函数的输入输出路径写成动态的
2.打jar包(注意将程序打包成运行包,不是依赖包)
3.将数据包和jar包上传至Linux
运行jar包
- ** 数据采集/消费(存储)**
欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架flume和kafka的定位是什么。我们在此需要将实时数据通过flume采集到kafka然后供给给hbase消费。
flume:cloudera公司研发
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与Hadoop生态圈对接的操作。
kafka:linkedin公司研发
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作(支持replication);
因此我们常用的一种模型是:
线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS
- 数据采集
思路
配置kafka,启动zookeeper和kafka集群;
创建kafka主题;
启动kafka控制台消费者(此消费者只用于测试使用);
配置flume,监控日志文件;
启动flume监控任务;
观察测试。
操作
启动zookeeper,kafka集群
zkServer.sh start
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
创建kafka主题
kafka-topics.sh --zookeeper master:2181 --topic calllog --create --replication-factor 1 --partitions 3
启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #建议使用这种方式,不需要启动多个窗口
启动kafka
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties #建议使用这种方式,不需要启动多个窗口
检查一下是否创建主题成功:
kafka-topics.sh --zookeeper master:2181 --list
启动kafka控制台消费者,等待flume信息的输入
kafka-console-consumer.sh --bootstrap-server master:9092 -topic calllog --from-beginning
配置flume-env.sh
export JAVA_HOME=/home/software/jdk1.8
export HADOOP_HOME=/home/software/hadoop
解决版本冲突 flume-ng
配置flume(flume-kafka.conf)
define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/open/call.log
sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave01:9092,slave02:9092
a1.sinks.k1.kafka.topic = calllog
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
flume-ng agent --conf $FLUME_HOME/conf --name a1 --conf-file $FLUME_HOME/conf/flume-kafka.conf -Dflume.root.logger=INFO,console
在flume所在的文件夹即安装目录
./bin/flume-ng agent --conf ./conf --name a1 --conf-file ./conf/flume-kafka.conf -Dflume.root.logger=INFO,console
观察kafka控制台消费者是否成功显示产生的数据
- 数据消费
如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。
思路
- 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;
- 既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;
- 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。
代码
创建新的module项目:ct_consumer
模块目录如下
执行
1.虚拟机执行jar包生产数据
java -jar ct-producer.jar contact.log call.log
2.flume采集
bin/flume-ng agent -c conf/ -n a1 -f /opt/open-bigdata-project/flume-2-kafka.conf
3.启动Hadoop(否则无法启动hbase)
4.启动虚拟机HBASE
./start-hbase.sh
4.idea消费者消费
消费完之后查看HBASE数据库
- 数据查询方式一
使用scan查看HBase中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时长的数据。进行该单元测试前,需要先运行数据采集任务,确保HBase中已有数据存在。
新建工具过滤器工具类:HBaseFilterUtil
新建单元测试类:**HBaseScanTest1 **
运行单元测试
- 数据消费测试
项目成功后,则将项目打包后在linux中运行测试。
**1) **打包HBase消费者代码
- ** 数据分析**
我们的数据已经完整的采集到了HBase集群中,这次我们需要对采集到的数据进行分析,统计出我们想要的结果。注意,在分析的过程中,我们不一定会采取一个业务指标对应一个mapreduce-job的方式,如果情景允许,我们会采取一个mapreduce分析多个业务指标的方式来进行任务。具体何时采用哪种方式,我们后续会详细探讨。
分析模块流程如图所示:
业务指标:
a) 用户每天主叫通话个数统计,通话时间统计。
b) 用户每月通话记录统计,通话时间统计。
c) 用户之间亲密关系统计。(通话次数与通话时间体现用户亲密关系)
- 需求分析
根据需求目标,设计出下述(3.2.2)表结构。我们需要按照时间范围(年月日),结合MapReduce统计出所属时间范围内所有手机号码的通话次数总和以及通话时长总和。
思路:
**a) *维度,即某个角度,某个视角,按照时间维度来统计通话,比如我想统计2017年所有月份所有日子的通话记录,那这个维度我们大概可以表述为2017年月*日
**b) **通过Mapper将数据按照不同维度聚合给Reducer
c) 通过Reducer拿到按照各个维度聚合过来的数据,进行汇总,输出
d) 根据业务需求,将Reducer的输出通过Outputformat把数据
数据输入:HBase
数据输出:Mysql
HBase****中数据源结构:
标签
举例&说明
rowkey
hashregion_call1_datetime_call2_flag_duration
01_15837312345_20170527081033_13766889900_1_0180
family
f1列族:存放主叫信息
f2列族:存放被叫信息
call1
第一个手机号码
call2
第二个手机号码
date_time
通话建立的时间,例如:20171017081520
date_time_ts
date_time对应的时间戳形式
duration
通话时长(单位:秒)
flag
标记call1是主叫还是被叫(call1的身份与call2的身份互斥)
**a) **已知目标,那么需要结合目标思考已有数据是否能够支撑目标实现;
**b) **根据目标数据结构,构建Mysql表结构,建表;
**c) **思考代码需要涉及到哪些功能模块,建立不同功能模块对应的包结构。
**d) **描述数据,一定是基于某个维度(视角)的,所以构建维度类。比如按照“年”与“手机号码”的组合作为key聚合所有的数据,便可以统计这个手机号码,这一年的相关结果。
**e) **自定义OutputFormat用于对接Mysql,使数据输出。
**f) **创建相关工具类。
- MySQL数据表结构设计
我们将分析的结果数据保存到Mysql中,以方便Web端进行查询展示。
**1) **:db_telecom.tb_contacts
用于存放用户手机号码与联系人姓名。
列
备注
类型
id
自增主键
int(11) NOT NULL
telephone
手机号码
varchar(255) NOT NULL
name
联系人姓名
varchar(255) NOT NULL
**2) **:db_telecom.tb_call
用于存放某个时间维度下通话次数与通话时长的总和。
列
备注
类型
id_date_contact
复合主键(联系人维度id,时间维度id)
varchar(255) NOT NULL
id_date_dimension
时间维度id
int(11) NOT NULL
id_contact
查询人的电话号码
int(11) NOT NULL
call_sum
通话次数总和
int(11) NOT NULL DEFAULT 0
call_duration_sum
通话时长总和
int(11) NOT NULL DEFAULT 0
**3) **:db_telecom.tb_dimension_date
用于存放时间维度的相关数据
列
备注
类型
id
自增主键
int(11) NOT NULL
year
年,当前通话信息所在年
int(11) NOT NULL
month
月,当前通话信息所在月,如果按照年来统计信息,则month为-1。
int(11) NOT NULL
day
日,当前通话信息所在日,如果是按照月来统计信息,则day为-1。
int(11) NOT NULL
**4)****:db_telecom.tb_**intimacy
用于存放所有用户用户关系的结果数据。
列
备注
类型
id
自增主键
int(11) NOT NULL
intimacy_rank
好友亲密度排名
int(11) NOT NULL
id_contact1
联系人1,当前所查询人
int(11) NOT NULL
id_contact2
联系人2,与联系人为好友
int(11) NOT NULL
call_count
两联系人通话次数
int(11) NOT NULL DEFAULT 0
call_duration_count
两联系人通话持续时间
int(11) NOT NULL DEFAULT 0
- 环境准备
*1) 新建module:ct_analysis*
**2) **创建包结构,如下图
**3) **类表
类名
备注
CountDurationMapper
数据分析的Mapper类,继承自TableMapper
CountDurationReducer
数据分析的Reducer类,继承自Reduccer
CountDurationRunner
数据分析的驱动类,组装Job
MySQLOutputFormat
自定义Outputformat,对接Mysql
BaseDimension
维度(key)基类
BaseValue
值(value)基类
ComDimension
时间维度+联系人维度的组合维度
ContactDimension
联系人维度
DateDimension
时间维度
CountDurationValue
通话次数与通话时长的封装
JDBCUtil
连接Mysql的工具类
JDBCCacheBean
单例JDBCConnection
IConverter
转化接口,用于根据传入的维度对象,得到该维度对象对应的数据库主键id
DimensionConverter
IConverter实现类,负责实际的维度转id功能
LRUCache
用于缓存已知的维度id,减少对mysql的操作次数,提高效率
Constants
常量类
- 需求实现
将代码导入对应位置
- 运行测试
- 将mysql驱动包放入到hadoop根目录的$HADOOP_HOME/share/hadoop/common/目录下
- 将打包好的程序上传到master中
**3) **提交任务
cd 程序包目录
$ HADOOP_HOME/yarn jar ct_analysis-1.0-SNAPSHOT.jar
观察Mysql中的结果。
- ** 数据展示**
- 环境准备
新建module或项目:ct_web
项目结构如下:
pom.xml配置文件:
**2) **创建包结构
bean
contants
controller
dao
entries
**3) **类表
类名
备注
CallLog
用于封装数据分析结果的JavaBean
Contact
用于封装联系人的JavaBean
Contants
常量类
CallLogHandler
用于处理请求的Controller
CallLogDAO
查询某人某个维度通话记录的DAO
ContactDAO
查询联系人的DAO
QueryInfo
用于封装向服务器发来的请求参数
4) web****目录结构,web部分的根目录:webapp
文件夹名
备注
css
存放css静态资源的文件夹
html
存放html静态资源的文件夹
images
存放图片静态资源文件夹
js
存放js静态资源的文件夹
jsp
存放jsp页面的文件夹
WEB-INF
存放web相关配置的文件夹
5) resources****目录下创建spring相关配置文件
6) WEB-INF****目录下创建web相关配置
**7) **拷贝js框架到js目录下
- 编写代码
思路:
a) 首先测试数据通顺以及完整性,写一个联系人的测试用例。
b) 测试通过后,通过输入手机号码以及时间参数,查询指定维度的数据,并以图表展示。
代码:将代码复制到对应包中
- 最终预览
查询人通话时长与通话次数统计大概如下所示:
折线图如图所示:
柱状图如图所示:
统一展示如图所示:
- ** 项目总结**
重新总结梳理整个项目流程和方法论。
版权归原作者 长岛山没有雪 所有, 如有侵权,请联系我们删除。