根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下:
data.json
;
数据所在位置:
/root/data/data.json
;
{
"id":4,
"company_name":"智联招聘网/Zhaopin.com",
"eduLevel_name":"本科",
"emplType":"全职",
"jobName":"大数据工程师010",
"salary":"20K-30K",
"createDate":"2019-04-21T12:14:27.000+08:00",
"endDate":"2019-05-21T12:14:27.000+08:00",
"city_code":"530",
"companySize":"1000-9999人",
"welfare":"",
"responsibility":"岗位职责:1、负责体系大数据分析的ETL的代码开发及优化;2、...",
"place":"北京市朝阳区望京阜荣街10号首开广场5层",
"workingExp":"1-3年"
}
idcompany_nameeduLevel_nameemplTypejobNamesalarycreateDateendDatecity_codecompanySizewelfareresponsibilityplaceworkingExpid编号公司名称学历要求工作类型工作名称薪资发布时间截止时间城市编码公司规模福利岗位职责地区工作经验
Mysql
数据库:
用户名:
root
; 密码:
123123
。
数据库名:
mydb
;
城市编码表:
province
;
列名类型非空是否自增介绍city_codevarchar(255)城市编码city_namevarchar(255)城市名称
HBase
数据库:
最终结果表:
job
列族:
info
。
清洗规则:
- 若某个属性为空则删除这条数据;
- 处理数据中的
salary
;1)mK-nK:(m+n)/2
; 2)其余即为0
。 - 按照
MySQL
表province
将城市编码转化为城市名; - 将结果存入
HBase
表job
中; - 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为:
/root/data/data.json
; 清洗后的数据存放于:HBase
表job
。
(1)DBHelper类代码:
package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBHelper {
/********** begin **********/
private static final String driver = "com.mysql.jdbc.Driver";
private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
private static final String username = "root";
private static final String password = "123123";
private static Connection conn = null;
static {
try {
Class.forName(driver);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static Connection getConnection() {
if (conn == null) {
try {
conn = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
return conn;
}
public static void main(String[] args) {
Connection connection = DBHelper.getConnection();
}
/********** end **********/
}
(2)JsonMap类代码:
package com;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class JsonMap extends Mapper<LongWritable, Text, NullWritable, Put> {
/********** begin **********/
Map<String, String> pro = new HashMap<String, String>();
Put put;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Connection connection = DBHelper.getConnection();
try {
Statement statement = connection.createStatement();
String sql = "select * from province";
ResultSet resultSetA = statement.executeQuery(sql);
while (resultSetA.next()) {
String city_code = resultSetA.getString(1);
String city_name = resultSetA.getString(2);
pro.put(city_code, city_name);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
String line = value.toString();
//解析json数据
JSONObject jsonObject = JSONObject.parseObject(line);
String[] data = new String[14];
data[0] = jsonObject.getString("id");
data[1] = jsonObject.getString("company_name");
data[2] = jsonObject.getString("eduLevel_name");
data[3] = jsonObject.getString("emplType");
data[4] = jsonObject.getString("jobName");
String salary=jsonObject.getString("salary");
if (salary.contains("K-")) {
Double a =Double.valueOf(salary.substring(0,salary.indexOf("K")));
Double b =Double.valueOf(salary.substring(salary.indexOf("-")+1,salary.lastIndexOf("K")));
data[5] = (a+b)/2+"";
}else {
data[5]="0";
}
data[6] = jsonObject.getString("createDate");
data[7] = jsonObject.getString("endDate");
String code = jsonObject.getString("city_code");
//data[8] = pro.get(code);
data[8] = code;
data[9] = jsonObject.getString("companySize");
data[10] = jsonObject.getString("welfare");
data[11] = jsonObject.getString("responsibility");
data[12] = jsonObject.getString("place");
data[13] = jsonObject.getString("workingExp");
//循环判空
for(String i : data) {
if(i==null||i.equals("")) {
return;
}
}
String columnFamily = "info";
put= new Put(data[0].getBytes());
put.addColumn(columnFamily.getBytes(), "company_name".getBytes(), data[1].getBytes());
put.addColumn(columnFamily.getBytes(), "eduLevel_name".getBytes(), data[2].getBytes());
put.addColumn(columnFamily.getBytes(), "emplType".getBytes(), data[3].getBytes());
put.addColumn(columnFamily.getBytes(), "jobName".getBytes(), data[4].getBytes());
put.addColumn(columnFamily.getBytes(), "salary".getBytes(), data[5].getBytes());
put.addColumn(columnFamily.getBytes(), "createDate".getBytes(), data[6].getBytes());
put.addColumn(columnFamily.getBytes(), "endDate".getBytes(), data[7].getBytes());
put.addColumn(columnFamily.getBytes(), "city_name".getBytes(), data[8].getBytes());
put.addColumn(columnFamily.getBytes(), "companySize".getBytes(), data[9].getBytes());
put.addColumn(columnFamily.getBytes(), "welfare".getBytes(), data[10].getBytes());
put.addColumn(columnFamily.getBytes(), "responsibility".getBytes(), data[11].getBytes());
put.addColumn(columnFamily.getBytes(), "place".getBytes(), data[12].getBytes());
put.addColumn(columnFamily.getBytes(), "workingExp".getBytes(), data[13].getBytes());
context.write(NullWritable.get(), put);
}
/********** end **********/
}
(3)PhoneLog类代码:
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
public class JsonTest {
public static void main(String[] args) throws Exception{
Configuration config = HBaseConfiguration.create();
//设置zookeeper的配置
config.set("hbase.zookeeper.quorum", "127.0.0.1");
Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin();
TableName tableName = TableName.valueOf("job");
boolean isExists = admin.tableExists(tableName);
if (!isExists) {
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
tableDescriptor.setColumnFamily(family); // 设置列族
admin.createTable(tableDescriptor.build()); // 创建表
} else {
admin.disableTable(tableName);
admin.deleteTable(tableName);
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
tableDescriptor.setColumnFamily(family); // 设置列族
admin.createTable(tableDescriptor.build()); // 创建表
}
/********** begin **********/
Job job = Job.getInstance(config);
job.setJarByClass(JsonTest.class);
job.setMapperClass(JsonMap.class);
job.setMapOutputKeyClass(NullWritable.class);
//只有map没有reduce,所以设置reduce的数目为0
job.setNumReduceTasks(0);
//设置数据的输入路径,没有使用参数,直接在程序中写入HDFS的路径
FileInputFormat.setInputPaths(job, new Path("/root/data/data.json"));
//驱动函数
TableMapReduceUtil.initTableReducerJob("job",null, job);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
/********** end **********/
}
}
启动HBASE#start-hbase.sh
版权归原作者 是草莓熊吖 所有, 如有侵权,请联系我们删除。