根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下:
a.txt
数据切分方式:
,
数据所在位置:
/user/test/input/a.txt
15733218050,15778423030,1542457633,1542457678,450000,530000
157332180501577842303015424576331542457678450000530000呼叫者手机号接受者手机号开始时间戳(s)接受时间戳(s)呼叫者地址省份编码接受者地址省份编码
Mysql
数据库:
用户名:
root
密码:
123123
数据库名:
mydb
用户表:
userphone
列名类型非空是否自增介绍idint(11)√√用户IDphonevarchar(255)手机号trueNamevarchar(255)真实姓名
地址省份表:
allregion
列名类型非空是否自增介绍idint(11)√√用户IDCodeNumvarchar(255)编号Addressvarchar(255)地址
清洗规则:
- 处理数据中的时间戳(秒级)将其转化为
2017-06-21 07:01:58
,年-月-日 时:分:秒 这种格式; - 处理数据中的省份编码,结合
mysql
的表数据对应,将其转换成省份名称; - 处理用户手机号,与
mysql
的表数据对应,关联用户的真实姓名; - 处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);
- 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为:
/user/test/input/a.txt (HDFS)
; 清洗后的数据存放于:/user/test/output (HDFS)
。
数据清洗后如下:
邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市
邓二张倩13666666666151518896012018-03-29 10:58:122018-03-29 10:58:4230黑龙江省上海市用户名A用户名B用户A的手机号用户B的手机号开始时间结束时间
step/com/LogMR.java
package com;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogMR {
/********** begin **********/
static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
Map<String, String> userMap = new HashMap<>();
Map<String, String> addressMap = new HashMap<>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
PhoneLog pl = new PhoneLog();
Text text = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Connection connection = DBHelper.getConnection();
try {
Statement statement = connection.createStatement();
String sql = "select * from userphone";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String phone = resultSet.getString(2);
String trueName = resultSet.getString(3);
userMap.put(phone, trueName);
}
String sql2 = "select * from allregion";
ResultSet resultSetA = statement.executeQuery(sql2);
while (resultSetA.next()) {
String phone = resultSetA.getString(2);
String trueName = resultSetA.getString(3);
addressMap.put(phone, trueName);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] split = str.split(",");
if (split.length == 6) {
String trueName1 = userMap.get(split[0]);
String trueName2 = userMap.get(split[1]);
String address1 = addressMap.get(split[4]);
String address2 = addressMap.get(split[5]);
long startTimestamp = Long.parseLong(split[2]);
String startTime = sdf.format(startTimestamp * 1000);
long endTimestamp = Long.parseLong(split[3]);
String endTime = sdf.format(endTimestamp * 1000);
long timeLen = endTimestamp - startTimestamp;
pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
address2);
context.write(pl, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogMR.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(PhoneLog.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
Path inPath = new Path("/user/test/input/a.txt");
Path out = new Path("/user/test/output");
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, out);
job.waitForCompletion(true);
}
/********** end **********/
}
step/com/DBHelper.java
package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBHelper {
/********** begin **********/
private static final String driver = "com.mysql.jdbc.Driver";
private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
private static final String username = "root";// 数据库的用户名
private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
private static Connection conn = null; // 声明数据库连接对象
static {
try {
Class.forName(driver);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static Connection getConnection() {
if (conn == null) {
try {
conn = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
e.printStackTrace();
} // 连接数据库
return conn;
}
return conn;
}
/********** end **********/
}
step/com/phonelog.java
package com;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class PhoneLog implements WritableComparable<PhoneLog> {
private String userA;
private String userB;
private String userA_Phone;
private String userB_Phone;
private String startTime;
private String endTime;
private Long timeLen;
private String userA_Address;
private String userB_Address;
public PhoneLog() {
}
public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
String endTime, Long timeLen, String userA_Address, String userB_Address) {
this.userA = userA;
this.userB = userB;
this.userA_Phone = userA_Phone;
this.userB_Phone = userB_Phone;
this.startTime = startTime;
this.endTime = endTime;
this.timeLen = timeLen;
this.userA_Address = userA_Address;
this.userB_Address = userB_Address;
}
public String getUserA_Phone() {
return userA_Phone;
}
public void setUserA_Phone(String userA_Phone) {
this.userA_Phone = userA_Phone;
}
public String getUserB_Phone() {
return userB_Phone;
}
public void setUserB_Phone(String userB_Phone) {
this.userB_Phone = userB_Phone;
}
public String getUserA() {
return userA;
}
public void setUserA(String userA) {
this.userA = userA;
}
public String getUserB() {
return userB;
}
public void setUserB(String userB) {
this.userB = userB;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
public Long getTimeLen() {
return timeLen;
}
public void setTimeLen(Long timeLen) {
this.timeLen = timeLen;
}
public String getUserA_Address() {
return userA_Address;
}
public void setUserA_Address(String userA_Address) {
this.userA_Address = userA_Address;
}
public String getUserB_Address() {
return userB_Address;
}
public void setUserB_Address(String userB_Address) {
this.userB_Address = userB_Address;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(userA);
out.writeUTF(userB);
out.writeUTF(userA_Phone);
out.writeUTF(userB_Phone);
out.writeUTF(startTime);
out.writeUTF(endTime);
out.writeLong(timeLen);
out.writeUTF(userA_Address);
out.writeUTF(userB_Address);
}
@Override
public void readFields(DataInput in) throws IOException {
userA = in.readUTF();
userB = in.readUTF();
userA_Phone = in.readUTF();
userB_Phone = in.readUTF();
startTime = in.readUTF();
endTime = in.readUTF();
timeLen = in.readLong();
userA_Address = in.readUTF();
userB_Address = in.readUTF();
}
@Override
public String toString() {
return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
+ timeLen + "," + userA_Address + "," + userB_Address;
}
@Override
public int compareTo(PhoneLog pl) {
if(this.hashCode() == pl.hashCode()) {
return 0;
}
return -1;
}
}
最后重启hadoop#start-all.sh 完成评测
版权归原作者 是草莓熊吖 所有, 如有侵权,请联系我们删除。