这个是最开始的数据:乱七八糟的,要取出其中的一些,类似这些
其中毫秒级的时间数据要转为时间戳
spark先过滤出要取的数据
package sparkj;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class sp {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> filedata=sc.textFile("file:///root/555.txt");
JavaRDD<String> data2=filedata.filter(f->f.matches("[0-9].*"))
.map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]);
data2.foreach(f->System.err.println(f));
//data2.saveAsTextFile("file:///root/555copy.txt");
// JavaPairRDD<String,Integer> rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1));
// JavaPairRDD<String,Integer> rdd7=rdd6.reduceByKey((x,y)->x+y);
//rdd7.foreach(f->System.err.println(f));
}
public static long String2Date(String date,String time) throws Exception{
String newdate = date + " " + time;
SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
return s.parse(newdate).getTime();
}
}
结果如下:
打开hbase,对hbase进行操作了,先start...,
root@master:/opt/hbase-1.3.3/bin# ./start-hbase.sh
starting master, logging to /opt/hbase-1.3.3/bin/../logs/hbase-root-master-master.out
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
root@master:/opt/hbase-1.3.3/bin# ./hbase shell
2022-05-05 07:21:22,172 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.3.3, ra75a458e0c8f3c60db57f30ec4b06a606c9027b4, Fri Dec 14 16:02:53 PST 2018
hbase(main):001:0> list
TABLE
hbase单词计数
单独的hbase写法
主函数:
package test1;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
public class MainHB1 {
private static String getType(Object a) {
return a.getClass().toString();
}
public static void main(String[] args) throws Exception {
// try {
// HBaseOp ho = new HBaseOp();
// String tab = "logData1";
// String[] name = new String[1];
// name[0] = "log";
// ho.createTable(tab, name);
// System.out.println("ok");
// } catch (Exception e) {
// e.printStackTrace();
// }
try {
HBaseOp ho = new HBaseOp();
String tab = "logData1";
String[] name = new String[1];
name[0] = "log";
//ho.filterByRowFilter(tab, "class","info");
//ho.filterBySingleColumnValueFilter(tab, "log","level","");
System.out.println("ERROR一共是");
ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR");
System.out.println("FATAL一共是");
ho.filterBySingleColumnValueFilter(tab, "log","level","FATAL");
System.out.println("ok");
} catch (Exception e) {
e.printStackTrace();
}
File file = new File("/root/555copy.txt");
System.out.println("插入数据成功");
System.out.println(" \\n" +
"\\\n" +
"");
BufferedReader br = new BufferedReader(new FileReader(file));
String s;
String[] column = new String[2];
column[0] = "level";
column[1] = "class";
int i = 0;
while ((s = br.readLine()) != null) {
String line[] = s.split(" ");
long t = Long.parseLong(line[0]);
String time = String.valueOf(Long.MAX_VALUE-t);
try {
HBaseOp ho = new HBaseOp();
String tab = "logData1";
String name = "log";
// ho.put(tab,time, name,column[0], line[1]);
// ho.put(tab,time, name,column[1], line[2]);
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(i);
System.out.println("ok");
}
}
类的方法
package test1;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
//import org.apache.hadoop.hbase.filter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseOp {
class P{
String row;
String[] colum;
}
// TODO Auto-generated method stub
Configuration conf=HBaseConfiguration.create();
public void createTable(String strTab,String[] arrCf) throws Exception{
HBaseAdmin admin=new HBaseAdmin(conf);
if(admin.tableExists(strTab)) {
System.out.println("Table"+strTab+"exists.");
}
else {
HTableDescriptor fs=new HTableDescriptor(strTab);
for(String cf:arrCf) {
HColumnDescriptor ff=new HColumnDescriptor(cf);
ff.setTimeToLive(10000);
fs.addFamily(ff);
}
admin.createTable(fs);
}
admin.close();
}
public void deleteTable(String strTab)throws Exception
{
HBaseAdmin admin = new HBaseAdmin(conf);
if (!admin.tableExists(strTab)) {//判断表是否存在
System.out.println(strTab + "不存在");
}
else if(admin.isTableEnabled(strTab)) {//如果表处于disable
admin.disableTable(strTab);
admin.deleteTable(strTab);
System.out.println(strTab + " deleted");
}
else
{
admin.deleteTable(strTab);
System.out.println(strTab + " deleted");
}
admin.close();
}
//
public void mulPut() throws Exception {
HTable table = new HTable(conf,"scores");
// 创建一个列表用于存放Put实例
List<Put> puts = new ArrayList<Put>();
// 将第一个Put实例添加到列表
Put put1 = new Put(Bytes.toBytes("Tom"));
put1.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("1"));
put1.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("990"));
puts.add(put1);
// 将第2个Put实例添加到列表
Put put2 = new Put(Bytes.toBytes("John"));
put2.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("2"));
put2.add(Bytes.toBytes("course"), Bytes.toBytes("Chinese"), Bytes.toBytes("99"));
puts.add(put2);
// 将第3个Put实例添加到列表
Put put3 = new Put(Bytes.toBytes("gyy"));
put3.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("12"));
put3.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("1000"));
put3.add(Bytes.toBytes("course"), Bytes.toBytes("geo"), Bytes.toBytes("1000"));
put3.add(Bytes.toBytes("course"), Bytes.toBytes("语文"), Bytes.toBytes("1000"));
// 向HBase中存入多行多列数据
puts.add(put3);
table.put(puts);
table.close();
}
//
public void put(String tablename,String row,String cf,String column,String data) throws Exception{
HTable table=new HTable(conf,tablename);
Put p=new Put(Bytes.toBytes(row));
p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data));
table.put(p);
table.close();
}
public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
{
HTable table = new HTable(conf, tablename);
Scan s = new Scan();
SingleColumnValueFilter sf =
new SingleColumnValueFilter(Bytes.toBytes(strF),
Bytes.toBytes(strC), CompareOp.EQUAL,
Bytes.toBytes(strClass));
s.setFilter(sf);
ResultScanner rs = table.getScanner(s);
HashMap<String, Integer> hashMap = new HashMap<>();
int i=0;
for (Result r : rs) {
i=i+1;
byte[] row = r.getRow();
byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
if(!hashMap.containsKey(Bytes.toString(value2))) {
//集合中没有该单词,值定义为1
hashMap.put(Bytes.toString(value2), 1);
}else if(hashMap.containsKey(Bytes.toString(value2))) {
//集合中有该单词,值+1;key不变;
int b=hashMap.get(Bytes.toString(value2));
hashMap.put(Bytes.toString(value2), b+1);
}
// System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
}
System.out.println(hashMap);
// System.out.println( " ERROR一共是="+i);
System.out.println("出现最多的类是:"+getProcessCdByName(hashMap));
System.out.println(" FATAL一共是="+i);
rs.close();
table.close();
}
public static String getProcessCdByName(HashMap<String, Integer> processMap){//我找最大值对应的哪一个键
int max=0;
for (Integer in : processMap.values()) {
System.err.println(in);
max=Math.max(max, in);
}
String result = null;
Set<Map.Entry<String, Integer>> set = processMap.entrySet();
for(Map.Entry<String, Integer> entry : set){
if(entry.getValue()==max){
result = entry.getKey();
break;
}
}
return result;
}
public void filterBySingleColumnValueFilter(String tablename, String cf, String C) throws Exception
{
HTable table = new HTable(conf, tablename);
Scan s = new Scan();
SingleColumnValueFilter sf =
new SingleColumnValueFilter(Bytes.toBytes(cf),
Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("ERROR")));
SingleColumnValueFilter sf2 =
new SingleColumnValueFilter(Bytes.toBytes(cf),
Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("FATAL")));
FilterList lst= new FilterList(
FilterList.Operator.MUST_PASS_ONE);
lst.addFilter(sf);
lst.addFilter(sf2);
s.setFilter(lst);
ResultScanner rs = table.getScanner(s);
int i =0;
for (Result r : rs) {
byte[] row = r.getRow();
byte[] value = r.getValue(Bytes.toBytes(cf), Bytes.toBytes(C));
byte[] value2 = r.getValue(Bytes.toBytes(cf), Bytes.toBytes("class"));
System.out.println("--------------------------");
System.out.println("Filter: " + Bytes.toString(row) + " is in " + C + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
i=i+1;
}
System.out.println(i);
rs.close();
table.close();
}
//public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
//{
// HTable table = new HTable(conf, tablename);
// Scan s = new Scan();
// SingleColumnValueFilter sf =
// new SingleColumnValueFilter(Bytes.toBytes(strF),
// Bytes.toBytes(strC), CompareOp.EQUAL,
// Bytes.toBytes(strClass));
// s.setFilter(sf);
//
// ResultScanner rs = table.getScanner(s);
//
// for (Result r : rs) {
// byte[] row = r.getRow();
// byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
// byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
// System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
// }
// rs.close();
// table.close();
//}
public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
{
HTable table = new HTable(conf, tablename);
Scan s = new Scan();
SingleColumnValueFilter sf =
new SingleColumnValueFilter(Bytes.toBytes(strF),
Bytes.toBytes(strC), CompareOp.EQUAL,
Bytes.toBytes(strClass));
s.setFilter(sf);
ResultScanner rs = table.getScanner(s);
HashMap<String, Integer> hashMap = new HashMap<>();
int i=0;
for (Result r : rs) {
i=i+1;
byte[] row = r.getRow();
byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
if(!hashMap.containsKey(Bytes.toString(value2))) {
//集合中没有该单词,值定义为1
hashMap.put(Bytes.toString(value2), 1);
}else if(hashMap.containsKey(Bytes.toString(value2))) {
//集合中有该单词,值+1;key不变;
int b=hashMap.get(Bytes.toString(value2));
hashMap.put(Bytes.toString(value2), b+1);
}
// System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
}
System.out.println(hashMap);
System.out.println("i="+i);
rs.close();
table.close();
}
public void get(String tablename,String row,String info,String name) throws Exception{
HTable table=new HTable(conf,tablename);
Get g=new Get(Bytes.toBytes(row));
Result result = table.get(g);
byte[] val = result.getValue(Bytes.toBytes(info),Bytes.toBytes(name));
System.out.println(info+" "+name+" "+"Values =" + Bytes.toString(val));
}
public void scan(String tablename,String cf,String column) throws Exception{
HTable table=new HTable(conf,tablename);
Scan s=new Scan();
s.setStartRow(Bytes.toBytes("0"));
s.setStopRow(Bytes.toBytes("g"));
// s.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
ResultScanner res=table.getScanner(s);
for(Result r:res) {
byte[] row=r.getRow();
byte[] val=r.getValue(Bytes.toBytes(cf), Bytes.toBytes(column));
System.out.println("Scan:"+Bytes.toString(row)
+" values is "+Bytes.toString(val));
}
res.close();
table.close();
}
public void delete(String tablename,String row,String cf,String column,String data) throws Exception{
HTable table=new HTable(conf,tablename);
List<Delete> ls=new ArrayList<Delete>();
Delete p=new Delete(Bytes.toBytes(row));
ls.add(p);
//p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data));
table.delete(ls);
table.close();
}
}
插进表里面这种数据貌似是对的
或者直接用spark简单粗暴过滤这个,这个是这一句直接来的
先hbase主函数这个:
ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR");
会有一个数据产生,把数据拷贝到一个文件里面,再spark单词计数哈哈哈
方法类这样:
public void filterBySingleColumnValueFilter1(String tablename, String strF, String strC, String strClass) throws Exception
{
HTable table = new HTable(conf, tablename);
Scan s = new Scan();
SingleColumnValueFilter sf =
new SingleColumnValueFilter(Bytes.toBytes(strF),
Bytes.toBytes(strC), CompareOp.EQUAL,
Bytes.toBytes(strClass));
s.setFilter(sf);ResultScanner rs = table.getScanner(s); for (Result r : rs) { byte[] row = r.getRow(); byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC)); byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class")); System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2)); } rs.close(); table.close();
}
package sparkj;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class sp {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> filedata=sc.textFile("file:///root/555.txt");
// JavaRDD<String> data2=filedata.filter(f->f.matches("[0-9].*"))
// .map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]);
//
// data2.foreach(f->System.err.println(f));
// data2.saveAsTextFile("file:///root/555copy.txt");
JavaPairRDD<String,Integer> rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1));
JavaPairRDD<String,Integer> rdd7=rdd6.reduceByKey((x,y)->x+y);
rdd7.foreach(f->System.err.println(f));
}
public static long String2Date(String date,String time) throws Exception{
String newdate = date + " " + time;
SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
return s.parse(newdate).getTime();
}
}
最后结果
版权归原作者 好了,ap 所有, 如有侵权,请联系我们删除。