0


开源大数据案例(第1章 通话记录数据分析)思路,操作,及执行ct-common

  1. ** 通话记录数据分析1. ** 项目背景

通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。

    1. ** 项目架构**
      1. ** 整体架构**

      1. ** 数据分析流程**

      1. ** 数据展示流程**

      1. ** 消费模型**

    1. ** 项目实现**

系统环境:

表1

系统

版本

windows

10 专业版

linux

CentOS 6.8

开发工具

表2

工具

版本

idea

2017.2.5旗舰版

maven

3.3.9

JDK

1.8+

提示:idea2017.2.5必须使用maven3.3.9,不要使用maven3.5,有部分兼容性问题

集群环境:

表3

框架

版本

hadoop

2.7.2

zookeeper

3.4.10

hbase

1.3.1

flume

1.7.0

kafka

2.11-0.11.0.0

硬件环境:

表4

hadoop102

hadoop103

hadoop104

内存

4G

2G

2G

CPU

2核

1核

1核

硬盘

50G

50G

50G

      1. ** 数据生产**

此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。

        1. 数据结构

我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个flag作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。

表5

列名

解释

举例

call1

第一个手机号码

15369468720

call1_name

第一个手机号码人姓名(非必须)

李雁

call2

第二个手机号码

19920860202

call2_name

第二个手机号码人姓名(非必须)

卫艺

date_time

建立通话的时间

20171017081520

date_time_ts

建立通话的时间(时间戳形式)

duration

通话持续时间(秒)

0600

        1. 编写代码
思路

1.创建Java集合类存放模拟的电话号码和联系人;

2.随机选取两个手机号码当作“主叫”与“被叫”(注意判断两个手机号不能重复),产出call1与call2字段数据;

3.创建随机生成通话建立时间的方法,可指定随机范围,最后生成通话建立时间,产出date_time字段数据;

4.随机一个通话时长,单位:秒,产出duration字段数据;

5.将产出的一条数据拼接封装到一个字符串中;

6.使用IO操作将产出的一条通话数据写入到本地文件中;

代码

首先新建一个maven项目 qingtaishuju-project-ct 作为整个工程的父项目,因为所有的业务不可能通过一个项目来实现,会有很多模块,比如有生产、消费、统计、展示等等。

**1.创建通用基础模块 **

新建ct-common,创建好之后导入项目所需要的依赖,如下所示。

<dependencies>
    <dependency>

        <groupId>org.apache.hbase</groupId>

        <artifactId>hbase-server</artifactId>

        <version>1.3.1</version>

    </dependency>

    <dependency>

        <groupId>org.apache.hbase</groupId>

        <artifactId>hbase-client</artifactId>

        <version>1.3.1</version>

    </dependency>

</dependencies>

依赖导入成功之后,接下来创建相应的包、接口和类,搭建基础的环境,模块结构如下

将对应的包复制到对应的

2.数据生产模块
  1. 通用模块创建好之后,新建数据生产模块:ct-producer
    模块包结构如下:

  1. 打开Bootstrap类,修改代码如下

  1. 将cantact.log复制到Datas目录下(需要新建Datas目录)

  1. 运行Bootstrap的main方法

  2. 此时在Datas目录下自动生成call.log文件,并持续写入数据(点击可停止运行)

3.上传至Linux生成数据
  1. 将主函数的输入输出路径写成动态的

2.打jar包(注意将程序打包成运行包,不是依赖包)

3.将数据包和jar包上传至Linux

运行jar包

      1. ** 数据采集/消费(存储)**

欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架flume和kafka的定位是什么。我们在此需要将实时数据通过flume采集到kafka然后供给给hbase消费。

flume:cloudera公司研发

适合下游数据消费者不多的情况;

适合数据安全性要求不高的操作;

适合与Hadoop生态圈对接的操作。

kafka:linkedin公司研发

适合数据下游消费众多的情况;

适合数据安全性要求较高的操作(支持replication);

因此我们常用的一种模型是:

线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS

        1. 数据采集
思路
  1. 配置kafka,启动zookeeper和kafka集群;

  2. 创建kafka主题;

  3. 启动kafka控制台消费者(此消费者只用于测试使用);

  4. 配置flume,监控日志文件;

  5. 启动flume监控任务;

  6. 观察测试。

操作

启动zookeeper,kafka集群

zkServer.sh start

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

创建kafka主题

kafka-topics.sh --zookeeper master:2181 --topic calllog --create --replication-factor 1 --partitions 3

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #建议使用这种方式,不需要启动多个窗口

启动kafka

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties #建议使用这种方式,不需要启动多个窗口

检查一下是否创建主题成功:

kafka-topics.sh --zookeeper master:2181 --list

启动kafka控制台消费者,等待flume信息的输入

kafka-console-consumer.sh --bootstrap-server master:9092 -topic calllog --from-beginning

配置flume-env.sh

export JAVA_HOME=/home/software/jdk1.8

export HADOOP_HOME=/home/software/hadoop

解决版本冲突 flume-ng

配置flume(flume-kafka.conf)

define

a1.sources = r1

a1.sinks = k1

a1.channels = c1

source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F -c +0 /opt/open/call.log

sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave01:9092,slave02:9092

a1.sinks.k1.kafka.topic = calllog

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动flume

flume-ng agent --conf $FLUME_HOME/conf --name a1 --conf-file $FLUME_HOME/conf/flume-kafka.conf -Dflume.root.logger=INFO,console

在flume所在的文件夹即安装目录
./bin/flume-ng agent --conf ./conf --name a1 --conf-file ./conf/flume-kafka.conf -Dflume.root.logger=INFO,console

观察kafka控制台消费者是否成功显示产生的数据

        1. 数据消费

如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。

思路
  1. 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;
  2. 既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;
  3. 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。
代码

创建新的module项目:ct_consumer

模块目录如下

执行

1.虚拟机执行jar包生产数据

java -jar ct-producer.jar contact.log call.log

2.flume采集

bin/flume-ng agent -c conf/ -n a1 -f /opt/open-bigdata-project/flume-2-kafka.conf

3.启动Hadoop(否则无法启动hbase)

4.启动虚拟机HBASE

./start-hbase.sh

4.idea消费者消费

消费完之后查看HBASE数据库

        1. 数据查询方式一

使用scan查看HBase中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时长的数据。进行该单元测试前,需要先运行数据采集任务,确保HBase中已有数据存在。

新建工具过滤器工具类:HBaseFilterUtil

新建单元测试类:**HBaseScanTest1 **

运行单元测试

        1. 数据消费测试

项目成功后,则将项目打包后在linux中运行测试。

**1) **打包HBase消费者代码

      1. ** 数据分析**

我们的数据已经完整的采集到了HBase集群中,这次我们需要对采集到的数据进行分析,统计出我们想要的结果。注意,在分析的过程中,我们不一定会采取一个业务指标对应一个mapreduce-job的方式,如果情景允许,我们会采取一个mapreduce分析多个业务指标的方式来进行任务。具体何时采用哪种方式,我们后续会详细探讨。

分析模块流程如图所示:

业务指标:

a) 用户每天主叫通话个数统计,通话时间统计。

b) 用户每月通话记录统计,通话时间统计。

c) 用户之间亲密关系统计。(通话次数与通话时间体现用户亲密关系)

        1. 需求分析

根据需求目标,设计出下述(3.2.2)表结构。我们需要按照时间范围(年月日),结合MapReduce统计出所属时间范围内所有手机号码的通话次数总和以及通话时长总和。

思路:

**a) *维度,即某个角度,某个视角,按照时间维度来统计通话,比如我想统计2017年所有月份所有日子的通话记录,那这个维度我们大概可以表述为2017年月*日

**b) **通过Mapper将数据按照不同维度聚合给Reducer

c) 通过Reducer拿到按照各个维度聚合过来的数据,进行汇总,输出

d) 根据业务需求,将Reducer的输出通过Outputformat把数据

数据输入:HBase

数据输出:Mysql

HBase****中数据源结构:

标签

举例&说明

rowkey

hashregion_call1_datetime_call2_flag_duration

01_15837312345_20170527081033_13766889900_1_0180

family

f1列族:存放主叫信息

f2列族:存放被叫信息

call1

第一个手机号码

call2

第二个手机号码

date_time

通话建立的时间,例如:20171017081520

date_time_ts

date_time对应的时间戳形式

duration

通话时长(单位:秒)

flag

标记call1是主叫还是被叫(call1的身份与call2的身份互斥)

**a) **已知目标,那么需要结合目标思考已有数据是否能够支撑目标实现;

**b) **根据目标数据结构,构建Mysql表结构,建表;

**c) **思考代码需要涉及到哪些功能模块,建立不同功能模块对应的包结构。

**d) **描述数据,一定是基于某个维度(视角)的,所以构建维度类。比如按照“年”与“手机号码”的组合作为key聚合所有的数据,便可以统计这个手机号码,这一年的相关结果。

**e) **自定义OutputFormat用于对接Mysql,使数据输出。

**f) **创建相关工具类。

        1. MySQL数据表结构设计

我们将分析的结果数据保存到Mysql中,以方便Web端进行查询展示。

**1) **:db_telecom.tb_contacts

用于存放用户手机号码与联系人姓名。

备注

类型

id

自增主键

int(11) NOT NULL

telephone

手机号码

varchar(255) NOT NULL

name

联系人姓名

varchar(255) NOT NULL

**2) **:db_telecom.tb_call

用于存放某个时间维度下通话次数与通话时长的总和。

备注

类型

id_date_contact

复合主键(联系人维度id,时间维度id)

varchar(255) NOT NULL

id_date_dimension

时间维度id

int(11) NOT NULL

id_contact

查询人的电话号码

int(11) NOT NULL

call_sum

通话次数总和

int(11) NOT NULL DEFAULT 0

call_duration_sum

通话时长总和

int(11) NOT NULL DEFAULT 0

**3) **:db_telecom.tb_dimension_date

用于存放时间维度的相关数据

备注

类型

id

自增主键

int(11) NOT NULL

year

年,当前通话信息所在年

int(11) NOT NULL

month

月,当前通话信息所在月,如果按照年来统计信息,则month为-1。

int(11) NOT NULL

day

日,当前通话信息所在日,如果是按照月来统计信息,则day为-1。

int(11) NOT NULL

**4)****:db_telecom.tb_**intimacy

用于存放所有用户用户关系的结果数据。

备注

类型

id

自增主键

int(11) NOT NULL

intimacy_rank

好友亲密度排名

int(11) NOT NULL

id_contact1

联系人1,当前所查询人

int(11) NOT NULL

id_contact2

联系人2,与联系人为好友

int(11) NOT NULL

call_count

两联系人通话次数

int(11) NOT NULL DEFAULT 0

call_duration_count

两联系人通话持续时间

int(11) NOT NULL DEFAULT 0

        1. 环境准备

*1) 新建module:ct_analysis*

**2) **创建包结构,如下图

**3) **类表

类名

备注

CountDurationMapper

数据分析的Mapper类,继承自TableMapper

CountDurationReducer

数据分析的Reducer类,继承自Reduccer

CountDurationRunner

数据分析的驱动类,组装Job

MySQLOutputFormat

自定义Outputformat,对接Mysql

BaseDimension

维度(key)基类

BaseValue

值(value)基类

ComDimension

时间维度+联系人维度的组合维度

ContactDimension

联系人维度

DateDimension

时间维度

CountDurationValue

通话次数与通话时长的封装

JDBCUtil

连接Mysql的工具类

JDBCCacheBean

单例JDBCConnection

IConverter

转化接口,用于根据传入的维度对象,得到该维度对象对应的数据库主键id

DimensionConverter

IConverter实现类,负责实际的维度转id功能

LRUCache

用于缓存已知的维度id,减少对mysql的操作次数,提高效率

Constants

常量类

        1. 需求实现

将代码导入对应位置

        1. 运行测试
  1. 将mysql驱动包放入到hadoop根目录的$HADOOP_HOME/share/hadoop/common/目录下
  2. 将打包好的程序上传到master中

**3) **提交任务

cd 程序包目录

$ HADOOP_HOME/yarn jar ct_analysis-1.0-SNAPSHOT.jar

观察Mysql中的结果。

      1. ** 数据展示**
        1. 环境准备
  1. 新建module或项目:ct_web

项目结构如下:

pom.xml配置文件:

**2) **创建包结构

bean

contants

controller

dao

entries

**3) **类表

类名

备注

CallLog

用于封装数据分析结果的JavaBean

Contact

用于封装联系人的JavaBean

Contants

常量类

CallLogHandler

用于处理请求的Controller

CallLogDAO

查询某人某个维度通话记录的DAO

ContactDAO

查询联系人的DAO

QueryInfo

用于封装向服务器发来的请求参数

4) web****目录结构,web部分的根目录:webapp

文件夹名

备注

css

存放css静态资源的文件夹

html

存放html静态资源的文件夹

images

存放图片静态资源文件夹

js

存放js静态资源的文件夹

jsp

存放jsp页面的文件夹

WEB-INF

存放web相关配置的文件夹

5) resources****目录下创建spring相关配置文件

6) WEB-INF****目录下创建web相关配置

**7) **拷贝js框架到js目录下

        1. 编写代码

思路:

a) 首先测试数据通顺以及完整性,写一个联系人的测试用例。

b) 测试通过后,通过输入手机号码以及时间参数,查询指定维度的数据,并以图表展示。

代码:将代码复制到对应包中

        1. 最终预览

查询人通话时长与通话次数统计大概如下所示:

折线图如图所示:

柱状图如图所示:

统一展示如图所示:

    1. ** 项目总结**

重新总结梳理整个项目流程和方法论。


本文转载自: https://blog.csdn.net/D32618666604/article/details/132089993
版权归原作者 长岛山没有雪 所有, 如有侵权,请联系我们删除。

“开源大数据案例(第1章 通话记录数据分析)思路,操作,及执行ct-common”的评论:

还没有评论