🚀 本文章实现了基于MapReduce的手机浏览日志分析
🚀 文章简介:主要包含了数据生成部分,数据处理部分,数据存储部分与数据可视化部分
🚀 【本文仅供参考】其中需求实现的方式有多种,提供的代码并非唯一写法,选择适合的方式即可。
目录
手机日志分析需求
- 本文主要实现以下需求
- 编写数据生成器生成1G~10G大小的数据,字段必须包括id,日期,手机号码、型号、操作系统字段。
- 需要将手机号码4~9为掩码处理。
- 分析2021年、2022年操作系统市场占比、手机型号市场占比情况
- 分析2022年手机运营商市场占比情况
- 分析数据存储到HDFS集群/ana/phone节点下面
- 将分析结果存储到Mysql,并进行数据可视化
数据生成工具类
- 手机号码随机生成
- 可以采用随机数生成的方式,结合三大运营商的号码前三位数为规则进行生成 代码如下
/**
* @Description 生成三大运营商的手机号
* @Author 湧哥
* @Version 1.0
*//**
* 中国移动手机号段:
* 134、135、136、137、138、139、147、150、151、152、157、158、159、172、178、182、183、184、187、188、198、1703、1705、1706
* 中国联通手机号段:
* 130、131、132、145、155、156、166、171、175、176、185、186、1704、1707、1708、1709
* 中国电信手机号段:
* 133、153、173、177、180、181、189、191、193、199、1700、1701、1702
* 腾讯云API https://market.cloud.tencent.com/products/31101
*/publicclassPhoneNumberGenerator{//生成一万个手机号码,只需将 generatePhoneNumbers 方法中的参数 count 修改为 10000 即可//移动privatestaticfinalString[] CHINA_MOBILE_PREFIX ={"134","139","150","151","182"};//联通privatestaticfinalString[] CHINA_UNICOM_PREFIX ={"130","155","186"};//电信privatestaticfinalString[] CHINA_TELECOM_PREFIX ={"133","153","180","181","189"};publicstaticvoidmain(String[] args){String phoneNumbers =generatePhoneNumbers(1);System.out.println(phoneNumbers);}publicstaticStringgeneratePhoneNumbers(int count){String phoneNumber=null;Random random =newRandom();for(int i =0; i < count; i++){String prefix;int operatorIndex = random.nextInt(3);switch(operatorIndex){case0:
prefix = CHINA_MOBILE_PREFIX[random.nextInt(CHINA_MOBILE_PREFIX.length)];break;case1:
prefix = CHINA_UNICOM_PREFIX[random.nextInt(CHINA_UNICOM_PREFIX.length)];break;default:
prefix = CHINA_TELECOM_PREFIX[random.nextInt(CHINA_TELECOM_PREFIX.length)];}
phoneNumber = prefix +generateRandomNumber(random,11- prefix.length());}returnreplaceCharacters(phoneNumber,3,8);}privatestaticStringreplaceCharacters(String input,int startIndex,int endIndex){if(input ==null|| input.length()< endIndex){return input;}StringBuilder sb =newStringBuilder(input);for(int i = startIndex; i <= endIndex; i++){
sb.setCharAt(i,'*');}return sb.toString();}privatestaticStringgenerateRandomNumber(Random random,int length){StringBuilder sb =newStringBuilder();for(int i =0; i < length; i++){
sb.append(random.nextInt(10));}return sb.toString();}}
- 运营商解析的其中一种方式 【采用接口分析】
- 这里可以使用鹅厂或者其他厂商开发的接口进行运营商识别 申请获取对应的秘钥即可 例子如下
/**
* @Description
* @Author 湧哥
* @Version 1.0
*/publicclassPhoneOperator{publicstaticStringcalcAuthorization(String source,String secretId,String secretKey,String datetime)throwsNoSuchAlgorithmException,UnsupportedEncodingException,InvalidKeyException{String signStr ="x-date: "+ datetime +"\n"+"x-source: "+ source;Mac mac =Mac.getInstance("HmacSHA1");Key sKey =newSecretKeySpec(secretKey.getBytes("UTF-8"), mac.getAlgorithm());
mac.init(sKey);byte[] hash = mac.doFinal(signStr.getBytes("UTF-8"));String sig =newBASE64Encoder().encode(hash);String auth ="hmac id=\""+ secretId +"\", algorithm=\"hmac-sha1\", headers=\"x-date x-source\", signature=\""+ sig +"\"";return auth;}publicstaticStringurlencode(Map<?,?> map)throwsUnsupportedEncodingException{StringBuilder sb =newStringBuilder();for(Map.Entry<?,?> entry : map.entrySet()){if(sb.length()>0){
sb.append("&");}
sb.append(String.format("%s=%s",URLEncoder.encode(entry.getKey().toString(),"UTF-8"),URLEncoder.encode(entry.getValue().toString(),"UTF-8")));}return sb.toString();}publicstaticvoidmain(String[] args)throwsNoSuchAlgorithmException,UnsupportedEncodingException,InvalidKeyException{//云市场分配的密钥IdString secretId ="xx";//云市场分配的密钥KeyString secretKey = "xx;String source ="market";Calendar cd =Calendar.getInstance();SimpleDateFormat sdf =newSimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'",Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));String datetime = sdf.format(cd.getTime());// 签名String auth =calcAuthorization(source, secretId, secretKey, datetime);// 请求方法String method ="POST";// 请求头Map<String,String> headers =newHashMap<String,String>();
headers.put("X-Source", source);
headers.put("X-Date", datetime);
headers.put("Authorization", auth);// 查询参数Map<String,String> queryParams =newHashMap<String,String>();
queryParams.put("mobile","XXX");// body参数Map<String,String> bodyParams =newHashMap<String,String>();// url参数拼接String url ="https://service-8c43o60c-1253285064.gz.apigw.tencentcs.com/release/sms";if(!queryParams.isEmpty()){
url +="?"+urlencode(queryParams);}BufferedReader in =null;try{URL realUrl =newURL(url);HttpURLConnection conn =(HttpURLConnection) realUrl.openConnection();
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
conn.setRequestMethod(method);// request headersfor(Map.Entry<String,String> entry : headers.entrySet()){
conn.setRequestProperty(entry.getKey(), entry.getValue());}// request bodyMap<String,Boolean> methods =newHashMap<>();
methods.put("POST",true);
methods.put("PUT",true);
methods.put("PATCH",true);Boolean hasBody = methods.get(method);if(hasBody !=null){
conn.setRequestProperty("Content-Type","application/x-www-form-urlencoded");
conn.setDoOutput(true);DataOutputStream out =newDataOutputStream(conn.getOutputStream());
out.writeBytes(urlencode(bodyParams));
out.flush();
out.close();}// 定义 BufferedReader输入流来读取URL的响应
in =newBufferedReader(newInputStreamReader(conn.getInputStream()));String line;String result ="";while((line = in.readLine())!=null){
result += line;}System.out.println(result);}catch(Exception e){System.out.println(e);
e.printStackTrace();}finally{try{if(in !=null){
in.close();}}catch(Exception e2){
e2.printStackTrace();}}}}
结果如下 (另一种方式为:直接根据前三位手机号进行判断)
模拟数据生成类
- 数据生成器 id,日期,手机号码、型号、操作系统
/**
* @Description
* 数据生成器 id,日期,手机号码、型号、操作系统
* id:UUID 随机生成 日期:2021 2022 手机号码:三大运营商 型号:Apple HuaWei Oppo Vivo Meizu Nokia 操作系统:Apple ios Harmony Samsung
* 1.分析2021年、2022年操作系统市场占比、手机型号市场占比情况
* 2.分析2022年手机运营商市场占比情况
* 3.分析数据存储到HDFS集群/ana/phone节点下面
* 4.将分析结果存储到Mysql,并进行数据可视化
* @Author 湧哥
* @Version 1.0
*/publicclassDataGenerator{publicstaticvoidmain(String[] args){try{BufferedWriter writer =newBufferedWriter(newFileWriter("data/phone.log"));for(int i =0; i <1000; i++){//UUID随机生成 id,日期,手机号码、型号、操作系统String id = UUID.randomUUID().toString();String date =getRandomDate();String phoneNumber =PhoneNumberGenerator.generatePhoneNumbers(1);String model =getRandomModel();String operatingSystem =getRandomOperatingSystem();String line = id +","+ date +","+ phoneNumber +","+ model +","+ operatingSystem;
writer.write(line);
writer.newLine();}
writer.close();}catch(IOException e){
e.printStackTrace();}}privatestaticStringgetRandomDate(){Random random =newRandom();int year = random.nextInt(2)==0?2021:2022;int month = random.nextInt(12)+1;int dayOfMonth;if(month ==2){
dayOfMonth = random.nextInt(28)+1;}elseif(month ==4|| month ==6|| month ==9|| month ==11){
dayOfMonth= random.nextInt(30)+1;}else{
dayOfMonth= random.nextInt(31)+1;}return year +"-"+(month <10?"0":"")+
month+"-"+(dayOfMonth<10?"0":"")+
dayOfMonth ;}privatestaticStringgetRandomPhoneNumber(){Random random =newRandom();StringBuilder phoneNumber =newStringBuilder("1");for(int i =0; i <10; i++){
phoneNumber.append(random.nextInt(10));}return phoneNumber.toString();}privatestaticStringgetRandomModel(){String[] models ={"Apple","HuaWei","Oppo","Vivo","Meizu","Nokia"};return models[newRandom().nextInt(models.length)];}privatestaticStringgetRandomOperatingSystem(){String[] operatingSystems ={"Apple","HarmonyOS","Samsung","iOS"};return operatingSystems[newRandom().nextInt(operatingSystems.length)];}}
结果如下
MapReduce程序需求编写
- 分析2021年、2022年操作系统市场占比、手机型号市场占比情况
/**
* @Description
* @Author 湧哥
* @Version 1.0
*/importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStreamReader;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Partitioner;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassPhoneOSAnalysis{privatestaticint totalCount;privatestaticint lineCount2021 =0;privatestaticint lineCount2022 =0;publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,DoubleWritable>{privatefinalstaticDoubleWritable one =newDoubleWritable(1);privateText word =newText();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String[] fields = value.toString().split(",");if(fields.length >=5){// 操作系统市场占比
word.set(fields[1].substring(0,4)+"-OS-"+ fields[4]);
context.write(word, one);// 手机型号市场占比
word.set(fields[1].substring(0,4)+"-Model-"+ fields[3]);
context.write(word, one);}}}publicstaticclassMarketShareReducerextendsReducer<Text,DoubleWritable,Text,DoubleWritable>{privateDoubleWritable result =newDoubleWritable();publicvoidreduce(Text key,Iterable<DoubleWritable> values,Context context)throwsIOException,InterruptedException{double sum =0;for(DoubleWritable val : values){//这里会根据分组的key来计算sum
sum += val.get();}int yearTotalCount = key.toString().contains("2021")? lineCount2021 : lineCount2022;double percentage = sum / yearTotalCount;
result.set(percentage);
context.write(key, result);}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();FileSystem fs =FileSystem.get(conf);Path inputPath =newPath("data/phone.log");FSDataInputStream inputStream = fs.open(inputPath);try(BufferedReader reader =newBufferedReader(newInputStreamReader(fs.open(inputPath)))){String line;while((line = reader.readLine())!=null){if(line.contains("2021")){
lineCount2021++;}elseif(line.contains("2022")){
lineCount2022++;}}}// totalCount = Math.max(lineCount2021, lineCount2022);Job job =Job.getInstance(conf,"market share analysis");
job.setJarByClass(PhoneOSAnalysis.class);
job.setMapperClass(TokenizerMapper.class);// 设置自定义分区器
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(2);
job.setReducerClass(MarketShareReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);Path outputPath =newPath("data/result");if(fs.exists(outputPath)){
fs.delete(outputPath,true);}FileInputFormat.addInputPath(job,newPath("data/phone.log"));FileOutputFormat.setOutputPath(job,newPath(String.valueOf(outputPath)));// TextInputFormat.addInputPath(job, new Path("hdfs://192.168.192.100:8020/"));// TextInputFormat.outInputPath(job, new Path("hdfs://192.168.192.100:8020/"));System.exit(job.waitForCompletion(true)?0:1);}publicstaticclassCustomPartitionerextendsPartitioner<Text,DoubleWritable>{@OverridepublicintgetPartition(Text key,DoubleWritable value,int numPartitions){// 根据年份进行分区if(key.toString().contains("2021")){return0;}else{return1;}}}}
- 分析2022年手机运营商市场占比情况
/**
* @Description
* @Author 湧哥
* @Version 1.0
*/importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStreamReader;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassOperatorMR{privatestaticint lineCount2022 =0;publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,DoubleWritable>{privatefinalstaticDoubleWritable one =newDoubleWritable(1);privateText word =newText();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String[] fields = value.toString().split(",");if(fields.length >=3&& fields[1].contains("2022")){// 手机运营商市场占比
word.set(fields[1].substring(0,4)+"-Operator-"+getCarrier(fields[2]));
context.write(word, one);}}privateStringgetCarrier(String phoneNumber){String prefix = phoneNumber.substring(0,3);switch(prefix){//"133","153","180","181","189"case"133":case"153":case"180":case"181":case"189":return"电信";//"130","155","186"case"130":case"155":case"186":return"联通";default:return"移动";}}}publicstaticclassMarketShareReducerextendsReducer<Text,DoubleWritable,Text,DoubleWritable>{privateDoubleWritable result =newDoubleWritable();publicvoidreduce(Text key,Iterable<DoubleWritable> values,Context context)throwsIOException,InterruptedException{double sum =0;for(DoubleWritable val : values){
sum += val.get();}double percentage = sum / lineCount2022;
result.set(percentage);
context.write(key, result);}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();FileSystem fs =FileSystem.get(conf);Path inputPath =newPath("data/phone.log");try(BufferedReader reader =newBufferedReader(newInputStreamReader(fs.open(inputPath)))){String line;while((line = reader.readLine())!=null){if(line.contains("2022")){
lineCount2022++;}}}Job job =Job.getInstance(conf,"PhoneOperator");
job.setJarByClass(OperatorMR.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(MarketShareReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);Path outputPath =newPath("data/result-phone");if(fs.exists(outputPath)){
fs.delete(outputPath,true);}// TextInputFormat.addInputPath(job, new Path("hdfs://192.168.192.100:8020/"));// TextInputFormat.outInputPath(job, new Path("hdfs://192.168.192.100:8020/"));FileInputFormat.addInputPath(job,newPath("data/phone.log"));FileOutputFormat.setOutputPath(job, outputPath);System.exit(job.waitForCompletion(true)?0:1);}}
结果如下
-将分析结果存储到Mysql,并进行数据可视化
packagecom.yopai.mrmysql;/**
* @Description
* @Author 湧哥
* @Version 1.0
*/importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStreamReader;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.db.DBConfiguration;importorg.apache.hadoop.mapreduce.lib.db.DBOutputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;publicclassOPMysqlMR{privatestaticint lineCount2022 =0;publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,DoubleWritable>{privatefinalstaticDoubleWritable one =newDoubleWritable(1);privateText word =newText();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String[] fields = value.toString().split(",");if(fields.length >=3&& fields[1].contains("2022")){// 手机运营商市场占比
word.set(fields[1].substring(0,4)+"-Operator-"+getCarrier(fields[2]));
context.write(word, one);}}privateStringgetCarrier(String phoneNumber){String prefix = phoneNumber.substring(0,3);switch(prefix){case"133":case"153":case"180":case"181":case"189":return"电信";case"130":case"155":case"186":return"联通";default:return"移动";}}}publicstaticclassMarketShareReducerextendsReducer<Text,DoubleWritable,DBOutputWritable,NullWritable>{privateDoubleWritable result =newDoubleWritable();publicvoidreduce(Text key,Iterable<DoubleWritable> values,Context context)throwsIOException,InterruptedException{double sum =0;for(DoubleWritable val : values){
sum += val.get();}double percentage = sum / lineCount2022;
result.set(percentage);
context.write(newDBOutputWritable(key.toString(), result.get()),NullWritable.get());}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();// 设置数据库连接信息String dbUrl ="jdbc:mysql://localhost:3306/blog";String dbUsername ="root";String dbPassword ="Admin2022!";DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", dbUrl, dbUsername, dbPassword);try(Connection connection =DriverManager.getConnection(dbUrl, dbUsername, dbPassword)){String createTableSql ="CREATE TABLE IF NOT EXISTS operator_market_share(operator VARCHAR(255), market_share DOUBLE)";PreparedStatement preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.executeUpdate();}FileSystem fs =FileSystem.get(conf);Path inputPath =newPath("data/phone.log");try(BufferedReader reader =newBufferedReader(newInputStreamReader(fs.open(inputPath)))){String line;while((line = reader.readLine())!=null){if(line.contains("2022")){
lineCount2022++;}}}Job job =Job.getInstance(conf,"PhoneOperator");
job.setJarByClass(OPMysqlMR.class);
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(MarketShareReducer.class);
job.setOutputKeyClass(DBOutputWritable.class);
job.setOutputValueClass(NullWritable.class);// 设置数据库输出DBOutputFormat.setOutput(job,"operator_market_share","operator","market_share");FileInputFormat.addInputPath(job,newPath("data/phone.log"));System.exit(job.waitForCompletion(true)?0:1);}}
packagecom.yopai.mrmysql;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.io.Writable;importorg.apache.hadoop.mapred.lib.db.DBWritable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;/**
* @Description
* @Author 湧哥
* @Version 1.0
*/publicclassDBOutputWritableimplementsWritable,DBWritable{privateString operator;privatedouble market_share;publicDBOutputWritable(){}publicDBOutputWritable(String operator,double market_share){this.operator = operator;this.market_share = market_share;}@OverridepublicvoidreadFields(DataInput in)throwsIOException{
operator = in.readUTF();
market_share = in.readDouble();}@Overridepublicvoidwrite(DataOutput out)throwsIOException,IOException{
out.writeUTF(operator);
out.writeDouble(market_share);}@OverridepublicvoidreadFields(ResultSet resultSet)throwsSQLException{// 不需要实现此方法,因为我们只会写入数据到数据库}@Overridepublicvoidwrite(PreparedStatement preparedStatement)throwsSQLException{
preparedStatement.setString(1, operator);
preparedStatement.setDouble(2, market_share);}}
运行结果如下
- 可视化操作
packagecom.yopai.draw;/**
* @Description
* @Author 湧哥
* @Version 1.0
*/importjava.awt.*;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.Statement;importjavax.swing.JFrame;importorg.jfree.chart.ChartFactory;importorg.jfree.chart.ChartPanel;importorg.jfree.chart.JFreeChart;importorg.jfree.chart.plot.PiePlot;importorg.jfree.data.general.DefaultPieDataset;publicclassPieChartExampleextendsJFrame{publicPieChartExample(){// 从数据库获取数据DefaultPieDataset dataset =newDefaultPieDataset();try{String dbUrl ="jdbc:mysql://localhost:3306/blog";String dbUsername ="root";String dbPassword ="Admin2022!";Connection connection =DriverManager.getConnection(dbUrl, dbUsername, dbPassword);Statement statement = connection.createStatement();ResultSet resultSet = statement.executeQuery("SELECT operator, market_share FROM operator_market_share");while(resultSet.next()){String operator = resultSet.getString("operator");double marketShare = resultSet.getDouble("market_share");
dataset.setValue(operator, marketShare);}}catch(Exception e){
e.printStackTrace();}// 创建饼图JFreeChart pieChart =ChartFactory.createPieChart("运营商市场占比",// 图表标题
dataset,// 数据集true,// 是否显示图例true,// 是否生成工具提示false// 是否生成URL链接);// 设置字体以显示中文Font font =newFont("宋体",Font.PLAIN,12);
pieChart.getTitle().setFont(font);
pieChart.getLegend().setItemFont(font);PiePlot plot =(PiePlot) pieChart.getPlot();
plot.setLabelFont(font);// 添加饼图到面板并显示ChartPanel chartPanel =newChartPanel(pieChart);setContentPane(chartPanel);}publicstaticvoidmain(String[] args){PieChartExample pieChartExample =newPieChartExample();
pieChartExample.setSize(600,600);
pieChartExample.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
pieChartExample.setVisible(true);}}
结果如下
本篇文章到这里结束 需要注意的是每个人的环境不用调用的API会有所差异。
版权归原作者 大数据小禅 所有, 如有侵权,请联系我们删除。