0


【基于HBase和ElasticSearch构建大数据实时检索项目】

基于HBase和ElasticSearch构建大数据实时检索项目

一、项目说明

  1. 利用HBase存储海量数据,解决海量数据存储和实时更新查询的问题;
  2. 利用ElasticSearch作为HBase索引,加快大数据集中实时查询数据;
  3. 使用到的大数据组件有:Hadoop-2.7.3、HBase-1.3.1、zookeeper-3.4.5、ElasticSearch-7.8.0
  4. 实验环境: 虚拟机(操作系统CentOS7.6) + 个人PC(Windows)+ Eclipse或者Idea
  5. 大数据环境:3节点构成的全分布式环境
  6. 项目系统架构图如下:在这里插入图片描述
  7. 本项目是利用hbase和elasticsearch的API来完成数据的写入和检索

二、环境搭建

  1. 创建3台虚拟机,即3节点,主节点内存4G、从节点内存3G,可根据自己电脑配置来设置;
  2. 安装部署Hadoop全分布式,可参考:Hadoop2.7.3全分布式环境搭建
  3. 安装部署zookeeper全分布式,可参考:Zookeeper的集群安装
  4. 安装部署HBase全分布式,可参考:HBase几种安装方式,注意:需要先安装zookeeper并启动后,再安装和启动hbase
  5. 安装部署ElasticSearch集群,可参考:Linux下安装ElasticSearch集群,注意:需要使用es普通用户启动集群,安装成功后各个节点上启动

三、编写程序

本项目是在eclipse上编写
  1. 构建maven工程,配置settings.xml(可配置阿里或华为maven仓库),如下所示:<?xml version="1.0" encoding="utf-8"?><settingsxmlns="http://maven.apache.org/SETTINGS/1.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation=" http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"><mirrors><mirror><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></mirror><mirror><id>net-cn</id><mirrorOf>central</mirrorOf><name>Nexus net</name><url>http://maven.net.cn/content/groups/public/</url></mirror></mirrors><profiles><profile><repositories><repository><id>nexus</id><name>local private nexus</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></repository></repositories><pluginRepositories><pluginRepository><id>nexus</id><name>local private nexus</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></profile></profiles><activeProfiles><activeProfile>nexus</activeProfile></activeProfiles></settings>
  2. 添加依赖到pom.xml中,如下所示:<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bigdata</groupId><artifactId>realtimesearch</artifactId><version>1.0-SNAPSHOT</version><!-- Spring boot 父引用 --><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.4.0.RELEASE</version></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><!--仓库源--><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></repository></repositories><dependencies><!-- Spring boot 核心web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 解决thymeleaf模板引擎对h5页面检查太严格问题 --><dependency><groupId>net.sourceforge.nekohtml</groupId><artifactId>nekohtml</artifactId><version>1.9.22</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><!-- HBase --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version><exclusions><exclusion><artifactId>hadoop-mapreduce-client-core</artifactId><groupId>org.apache.hadoop</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-protocol</artifactId><version>1.3.1</version></dependency><!-- ElasticSearch --><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>7.8.0</version></dependency><!-- 解锁ES运行时没有对应方法的的错误 --><dependency><groupId>org.locationtech.spatial4j</groupId><artifactId>spatial4j</artifactId><version>0.6</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.9</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><!-- 解决ES和HBase中 io.netty包冲突 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.16.Final</version></dependency><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.13</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>C:\Program Files\Java\jdk1.8.0_301\lib\tools.jar</systemPath></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource></resources></build></project>
  3. 新建data目录,并将测试数据放在该目录下,如下图所示:测试数据下载在这里插入图片描述
  4. 添加各类配置文件,如conf.propertiesapplication.propertieslog4j.propertieslog4j2.properties等,如下图所示:在这里插入图片描述
  5. 配置conf.properties,内容如下: #原始数据路径 inputPath =data/ #HBase的配置 #通过CloudTable服务列表获取的ZK连接地址,运行后可看到日志打印具体内网地址 ZKServer=hostname01:2181,hostname02:2181,hostname03:2181 #HBase表名 tableName=PublicSecurity #HBase列族 columnFamily1=Basic columnFamily2=OtherInfo #ElasticSearch的配置,如ES集群名称,虚拟机IP,默认端口 clusterName=Es-cluster hostName=192.168.1.109 tcpPort=9300 indexName=publicsecurity typeName=info
  6. 配置application.properties,内容如下: server.port=8084 server.contextPath=/bigdata #web页面热布署 spring.thymeleaf.cache=false #解决html5检查太严格问题 spring.thymeleaf.mode = LEGACYHTML5
  7. 配置log4j.properties,内容如下: log4j.rootLogger=INFO,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
  8. 配置log4j2.properties,内容如下: name = PropertiesConfig property.filename = target/logs #appenders = console, file #配置值是appender的类型,并不是具体appender实例的name appenders = rolling appender.rolling.type = RollingFile appender.rolling.name = RollingLogFile appender.rolling.fileName=${filename}/automationlogs.log appender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.log appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 5 rootLogger.level = INFO,console rootLogger.appenderRef.rolling.ref = rolling rootLogger.appenderRef.rolling.ref = RollingLogFile
  9. 编写读取配置文件的工具类ConstantUtil,代码如下:packagecom.bigdata.utils;importorg.apache.log4j.PropertyConfigurator;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.FileInputStream;importjava.io.IOException;importjava.util.Properties;/** * {@docRoot 用于读取配置内容} * @author suben */publicclassConstantUtil{publicstaticfinalPropertiesPROPS=newProperties();publicstaticfinalLoggerLOG=LoggerFactory.getLogger(ConstantUtil.class);publicstaticfinalStringINPUT_PATH;publicstaticfinalStringZK_SERVER;publicstaticfinalStringTABLE_NAME;publicstaticfinalStringCOLUMN_FAMILY_1;publicstaticfinalStringCOLUMN_FAMILY_2;publicstaticfinalStringINDEX_NAME;publicstaticfinalStringTYPE_NAME;//ES集群名,默认值elasticsearchpublicstaticfinalStringCLUSTER_NAME;//ES集群中某个节点publicstaticfinalStringHOSTNAME;//ES连接端口号publicstaticfinalintTCP_PORT;static{try{//加载日志配置PropertyConfigurator.configure(ConstantUtil.class.getClassLoader().getResource("log4j.properties").getPath());//加载连接配置PROPS.load(newFileInputStream(ConstantUtil.class.getClassLoader().getResource("conf.properties").getPath()));}catch(IOException e){ e.printStackTrace();}INPUT_PATH=PROPS.getProperty("inputPath");ZK_SERVER=PROPS.getProperty("ZKServer");TABLE_NAME=PROPS.getProperty("tableName");INDEX_NAME=PROPS.getProperty("indexName").toLowerCase();TYPE_NAME=PROPS.getProperty("typeName");COLUMN_FAMILY_1=PROPS.getProperty("columnFamily1");COLUMN_FAMILY_2=PROPS.getProperty("columnFamily2");CLUSTER_NAME=PROPS.getProperty("clusterName");HOSTNAME=PROPS.getProperty("hostName");TCP_PORT=Integer.valueOf(PROPS.getProperty("tcpPort"));}}
  10. 编写HBase工具类,代码如下:packagecom.bigdata.utils;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.*;importorg.apache.hadoop.hbase.client.*;importorg.apache.hadoop.hbase.util.Bytes;importorg.slf4j.Logger;importjava.io.IOException;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;publicclassHBaseUtil{publicstaticAdmin admin =null;publicstaticConfiguration conf =null;publicstaticConnection conn =null;privateHashMap<String,Table> tables =null;privatestaticfinalLoggerLOG=ConstantUtil.LOG;publicHBaseUtil(){this(ConstantUtil.ZK_SERVER);}publicHBaseUtil(String zkServer){init(zkServer);}privatevoidifNotConnTableJustConn(String tableName){if(!tables.containsKey(tableName)){this.addTable(tableName);}}publicTablegetTable(String tableName){ifNotConnTableJustConn(tableName);return tables.get(tableName);}publicvoidaddTable(String tableName){try{ tables.put(tableName, conn.getTable(TableName.valueOf(tableName)));}catch(IOException e){ e.printStackTrace();}}publicbooleanput(String tableName,List<Put> putList)throwsException{boolean res =false;ifNotConnTableJustConn(tableName);try{getTable(tableName).put(putList); res =true;}catch(IOException e){ e.printStackTrace();}return res;}publicResultget(String tableName,String row)throwsIOException{Result result =null;ifNotConnTableJustConn(tableName);Table newTable =getTable(tableName);Get get =newGet(Bytes.toBytes(row));try{ result = newTable.get(get);KeyValue[] raw = result.raw();}catch(IOException e){ e.printStackTrace();}return result;}publicbooleancreateTable(String tableName,String... columnFamilys){boolean result =false;try{if(admin.tableExists(TableName.valueOf(tableName))){LOG.info(tableName +"表已经存在!");}else{HTableDescriptor tableDesc =newHTableDescriptor(TableName.valueOf(tableName));for(String columnFamily : columnFamilys){ tableDesc.addFamily(newHColumnDescriptor(columnFamily.getBytes()));} admin.createTable(tableDesc); result =true;LOG.info(tableName +"表创建成功!");}}catch(IOException e){ e.printStackTrace();LOG.info(tableName +"表创建失败 !");}return result;}publicbooleantableExists(String tableName)throwsIOException{return admin.tableExists(TableName.valueOf(tableName));}publicvoiddisableTable(String tableName)throwsIOException{if(tableExists(tableName)){ admin.disableTable(TableName.valueOf(tableName));}}/** * 删除表 * * @param tableName */publicvoiddeleteTable(String tableName)throwsIOException{disableTable(tableName); admin.deleteTable(TableName.valueOf(tableName));}/** * 查询所有表名 * * @return * @throws Exception */publicList<String>getALLTableName()throwsException{ArrayList<String> tableNames =newArrayList<String>();if(admin !=null){HTableDescriptor[] listTables = admin.listTables();if(listTables.length >0){for(HTableDescriptor tableDesc : listTables){ tableNames.add(tableDesc.getNameAsString());}}}return tableNames;}/** * 删除所有表,慎用!仅用于测试环境 */publicvoiddeleteAllTable()throwsException{List<String> allTbName =getALLTableName();for(String s : allTbName){LOG.info("Start delete table : "+ s +"......");deleteTable(s);LOG.info("done delete table : "+ s);}}/** * 初始化配置 * * @param zkServer */publicvoidinit(String zkServer){ tables =newHashMap<String,Table>(); conf =HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", zkServer);try{ conn =ConnectionFactory.createConnection(conf); admin = conn.getAdmin();}catch(IOException e){ e.printStackTrace();}}/** * 清理所有连接 * * @throws IOException */publicvoidclear()throwsIOException{for(Map.Entry<String,Table> m : tables.entrySet()){ m.getValue().close();} admin.close(); conn.close(); conf.clear();}/** * 关卡登记信息bayonet:姓名,身份证号,年龄,性别,关卡号,日期时间,通关形式 * 住宿登记信息hotel:姓名,身份证号,年龄,性别,起始日期,结束日期,同行人 * 网吧登记信息internet:姓名,身份证号,年龄,性别,网吧名,日期,逗留时长 *///用于提前建好表和列族publicstaticvoidpreDeal()throwsException{HBaseUtil hBaseUtils =newHBaseUtil(); hBaseUtils.createTable(ConstantUtil.TABLE_NAME,ConstantUtil.COLUMN_FAMILY_1,ConstantUtil.COLUMN_FAMILY_2);}//测试publicstaticvoidtest()throwsException{HBaseUtil hBaseUtils =newHBaseUtil();long startTime =System.currentTimeMillis();String tb ="testTb";String colFamily ="info";String col ="name";String row ="100000";String value ="张三"; hBaseUtils.createTable(tb, colFamily);List<Put> listPut =newArrayList<>();Put put =newPut(Bytes.toBytes(row)); put.addColumn(Bytes.toBytes(colFamily),Bytes.toBytes(col),Bytes.toBytes(value)); listPut.add(put); hBaseUtils.put(tb, listPut);Result res = hBaseUtils.get("testTb","100000");List<Cell> list = res.getColumnCells(Bytes.toBytes("info"),Bytes.toBytes("name"));for(Cell c : list){LOG.info(Bytes.toString(CellUtil.cloneFamily(c)));LOG.info(Bytes.toString(CellUtil.cloneQualifier(c)));LOG.info(Bytes.toString(CellUtil.cloneValue(c)));}long endTime =System.currentTimeMillis();float seconds =(endTime - startTime)/1000F;LOG.info(" 耗时"+Float.toString(seconds)+" seconds.");}publicstaticvoidmain(String[] args)throwsException{test();preDeal();}}
  11. 编写ElasticSearch工具类,代码如下:packagecom.bigdata.utils;importcom.alibaba.fastjson.JSONObject;importorg.apache.lucene.search.TotalHits;importorg.elasticsearch.action.admin.indices.create.CreateIndexResponse;//import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;importorg.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;importorg.elasticsearch.action.index.IndexRequestBuilder;importorg.elasticsearch.action.index.IndexResponse;importorg.elasticsearch.action.search.SearchRequestBuilder;importorg.elasticsearch.action.search.SearchResponse;importorg.elasticsearch.action.support.master.AcknowledgedResponse;importorg.elasticsearch.client.IndicesAdminClient;importorg.elasticsearch.client.transport.TransportClient;importorg.elasticsearch.common.settings.Settings;importorg.elasticsearch.common.transport.TransportAddress;importorg.elasticsearch.common.xcontent.XContentBuilder;importorg.elasticsearch.common.xcontent.XContentType;importorg.elasticsearch.index.query.QueryBuilder;importorg.elasticsearch.index.query.QueryBuilders;importorg.elasticsearch.search.SearchHit;importorg.elasticsearch.search.SearchHits;importorg.elasticsearch.transport.client.PreBuiltTransportClient;importorg.slf4j.Logger;importjava.io.IOException;importjava.net.InetAddress;importjava.net.UnknownHostException;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.Set;importstaticorg.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;publicclassElasticSearchUtil{//构建Settings对象privatestaticSettings settings =Settings.builder().put("cluster.name",ConstantUtil.CLUSTER_NAME).put("client.transport.sniff",false).build();//TransportClient对象,用于连接ES集群privatevolatileTransportClient client;privatefinalstaticLoggerLOG=ConstantUtil.LOG;publicElasticSearchUtil(){init();}/** * 同步synchronized(*.class)代码块的作用和synchronized static方法作用一样, * 对当前对应的*.class进行持锁,static方法和.class一样都是锁的该类本身,同一个监听器 * * @return * @throws UnknownHostException */publicTransportClientgetClient(){if(client ==null){synchronized(TransportClient.class){try{ client =newPreBuiltTransportClient(settings).addTransportAddress(newTransportAddress(InetAddress.getByName(ConstantUtil.HOSTNAME),ConstantUtil.TCP_PORT));}catch(UnknownHostException e){ e.printStackTrace();}}}return client;}/** * 获取索引管理的IndicesAdminClient */publicIndicesAdminClientgetAdminClient(){returngetClient().admin().indices();}/** * 判定索引是否存在 * * @param indexName * @return */publicbooleanisExistsIndex(String indexName){IndicesExistsResponse response =getAdminClient().prepareExists(indexName).get();return response.isExists()?true:false;}/** * 创建索引 * * @param indexName * @return */publicbooleancreateIndex(String indexName){CreateIndexResponse createIndexResponse =getAdminClient().prepareCreate(indexName.toLowerCase()).get();return createIndexResponse.isAcknowledged()?true:false;}/** * 删除索引 * * @param indexName * @return */publicbooleandeleteIndex(String indexName){AcknowledgedResponse deleteResponse =getAdminClient().prepareDelete(indexName.toLowerCase()).execute().actionGet();return deleteResponse.isAcknowledged()?true:false;}/** * 为索引indexName设置mapping * * @param indexName * @param typeName * @param mapping */publicvoidsetMapping(String indexName,String typeName,String mapping){getAdminClient().preparePutMapping(indexName).setType(typeName).setSource(mapping,XContentType.JSON).get();}/** * 创建文档,相当于往表里面insert一行数据 * * @param indexName * @param typeName * @param id * @param document * @return * @throws IOException */publiclongaddDocument(String indexName,String typeName,String id,Map<String,Object> document)throwsIOException{Set<Map.Entry<String,Object>> documentSet = document.entrySet();IndexRequestBuilder builder =getClient().prepareIndex(indexName, typeName, id);XContentBuilder xContentBuilder =jsonBuilder().startObject();for(Map.Entry e : documentSet){ xContentBuilder = xContentBuilder.field(e.getKey().toString(), e.getValue());}IndexResponse response = builder.setSource(xContentBuilder.endObject()).get();return response.getVersion();}publicList<Map<String,Object>>queryStringQuery(String text){List<Map<String,Object>> resListMap =null;QueryBuilder match =QueryBuilders.queryStringQuery(text);SearchRequestBuilder search =getClient().prepareSearch().setQuery(match);//分页 可选//搜索返回搜索结果SearchResponse response = search.get();//命中的文档SearchHits hits = response.getHits();//命中总数TotalHits total = hits.getTotalHits();SearchHit[] hitAarr = hits.getHits();//循环查看命中值 resListMap =newArrayList<Map<String,Object>>();for(SearchHit hit : hitAarr){//文档元数据String index = hit.getIndex();//文档的_source的值Map<String,Object> resultMap = hit.getSourceAsMap(); resListMap.add(resultMap);}return resListMap;}privatevoidinit(){try{ client =newPreBuiltTransportClient(settings).addTransportAddress(newTransportAddress(InetAddress.getByName(ConstantUtil.HOSTNAME),ConstantUtil.TCP_PORT));}catch(UnknownHostException e){ e.printStackTrace();}}//用于提前建好索引,相当于关系型数据库当中的数据库publicstaticvoidpreDealCreatIndex(){ElasticSearchUtil esUtils =newElasticSearchUtil();LOG.info("start create index.............."); esUtils.createIndex(ConstantUtil.INDEX_NAME);LOG.info("finished create index !");}/** * 关卡登记信息bayonet:姓名,身份证号,年龄,性别,关卡号,日期时间,通关形式 * 住宿登记信息hotel:姓名,身份证号,年龄,性别,起始日期,结束日期,同行人 * 网吧登记信息internet:姓名,身份证号,年龄,性别,网吧名,日期,逗留时长 * name,id,age,gender, * hotelAddr,hotelInTime,hotelOutTime,acquaintancer, * barAddr,internetDate,timeSpent, * bayonetAddr,crossDate,tripType */publicstaticvoidpreDealSetMapping(){JSONObject mappingTypeJson =newJSONObject();JSONObject propertiesJson =newJSONObject();JSONObject idJson =newJSONObject(); idJson.put("type","keyword"); idJson.put("store","true"); propertiesJson.put("id", idJson);JSONObject nameJson =newJSONObject(); nameJson.put("type","keyword"); propertiesJson.put("name", nameJson);JSONObject uidJson =newJSONObject(); uidJson.put("type","keyword"); uidJson.put("store","false"); propertiesJson.put("uid", uidJson);JSONObject hotelAddr =newJSONObject(); hotelAddr.put("type","text"); propertiesJson.put("address", hotelAddr);JSONObject happenedDate =newJSONObject(); happenedDate.put("type","date"); happenedDate.put("format","yyyy-MM-dd"); propertiesJson.put("happenedDate", happenedDate);JSONObject endDate =newJSONObject(); endDate.put("type","date"); endDate.put("format","yyyy-MM-dd"); propertiesJson.put("endDate", endDate);JSONObject acquaintancer =newJSONObject(); acquaintancer.put("type","keyword"); propertiesJson.put("acquaintancer", acquaintancer); mappingTypeJson.put("properties", propertiesJson);LOG.info("start set mapping to "+ConstantUtil.INDEX_NAME+" "+ConstantUtil.TYPE_NAME+" .....");LOG.info(mappingTypeJson.toString());ElasticSearchUtil esUtils =newElasticSearchUtil(); esUtils.setMapping(ConstantUtil.INDEX_NAME,ConstantUtil.TYPE_NAME, mappingTypeJson.toString());LOG.info("set mapping done!!!");}//用于测试publicstaticvoidtest(){String index ="esindex";System.out.println("createIndex..............");ElasticSearchUtil esUtils =newElasticSearchUtil(); esUtils.createIndex(index);System.out.println("createIndex done!!!!!!!!!!!");System.out.println("isExists = "+ esUtils.isExistsIndex(index));System.out.println("deleteIndex..............."); esUtils.deleteIndex(index);System.out.println("deleteIndex done!!!!");}publicstaticvoidmain(String[] args)throwsIOException{preDealCreatIndex();preDealSetMapping();test();}}
  12. 编写数据写入HBase和ES的实现类,代码如下:packagecom.bigdata.insert;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.util.Bytes;importcom.bigdata.utils.ConstantUtil;importcom.bigdata.utils.ElasticSearchUtil;importcom.bigdata.utils.HBaseUtil;importjava.io.BufferedReader;importjava.io.File;importjava.io.FileReader;importjava.util.*;/** * 读取本地文件并解析数据,之后插入HBase、ElasticSearch */publicclassLoadDataToHBaseAndES{privateHBaseUtil hBaseUtil;privateElasticSearchUtil elasticSearchUtil;publicLoadDataToHBaseAndES(){}/** * 关卡登记信息bayonet:姓名,身份证号,年龄,性别,关卡号,日期时间,通关形式 * 住宿登记信息hotel:姓名,身份证号,年龄,性别,起始日期,结束日期,同行人 * 网吧登记信息internet:姓名,身份证号,年龄,性别,网吧名,日期,逗留时长 * name,uid,age,gender, * hotelAddr,happenedDate,endDate,acquaintancer, * barAddr,happenedDate,duration, * bayonetAddr,happenedDate,tripType */publicvoidinsert()throwsException{ hBaseUtil =newHBaseUtil(); elasticSearchUtil =newElasticSearchUtil();String filePath =ConstantUtil.INPUT_PATH;File dir =newFile(filePath);File[] files = dir.listFiles();if(files !=null){for(File file : files){if(file.isDirectory()){System.out.println(file.getName()+"This is a directory!");}else{//住宿登记信息if(file.getName().contains("hotel")){BufferedReader reader =null; reader =newBufferedReader(newFileReader(filePath + file.getName()));String tempString =null;while((tempString = reader.readLine())!=null){//Blank line judgmentif(!tempString.isEmpty()){List<Put> putList =newArrayList<Put>();String[] elements = tempString.split(",");//生成不重复用户ID,String id =UUID.randomUUID().toString();Put put =newPut(Bytes.toBytes(id));//将数据添加至hbase库 put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("name"),Bytes.toBytes(elements[0])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("uid"),Bytes.toBytes(elements[1])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("age"),Bytes.toBytes(elements[2])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("gender"),Bytes.toBytes(elements[3])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("event"),Bytes.toBytes("hotel")); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("address"),Bytes.toBytes(elements[4])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("happenedDate"),Bytes.toBytes(elements[5])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("endDate"),Bytes.toBytes(elements[6])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("acquaintancer"),Bytes.toBytes(elements[7])); putList.add(put);ConstantUtil.LOG.info("hotel_info start putting to HBase ....:"+ id +" "+ tempString); hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);//将数据添加至ES库Map<String,Object> esMap =newHashMap<String,Object>(); esMap.put("id", id); esMap.put("name", elements[0]); esMap.put("uid", elements[1]); esMap.put("address", elements[4]); esMap.put("happenedDate", elements[5]); esMap.put("endDate", elements[6]); esMap.put("acquaintancer", elements[7]); elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME,ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info("start add document to ES..."+ConstantUtil.INDEX_NAME+" "+ConstantUtil.TYPE_NAME+" "+ id +" "+ esMap);}} reader.close();}//网吧登记信息elseif(file.getName().contains("internet")){BufferedReader reader =null; reader =newBufferedReader(newFileReader(filePath + file.getName()));String tempString =null;while((tempString = reader.readLine())!=null){//Blank line judgmentif(!tempString.isEmpty()){List<Put> putList =newArrayList<Put>();String[] elements = tempString.split(",");//生成不重复用户ID,String id =UUID.randomUUID().toString();Put put =newPut(Bytes.toBytes(id));//将数据添加至hbase库 put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("name"),Bytes.toBytes(elements[0])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("uid"),Bytes.toBytes(elements[1])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("age"),Bytes.toBytes(elements[2])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("gender"),Bytes.toBytes(elements[3])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("event"),Bytes.toBytes("internetBar")); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("address"),Bytes.toBytes(elements[4])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("happenedDate"),Bytes.toBytes(elements[5])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("duration"),Bytes.toBytes(elements[6])); putList.add(put);ConstantUtil.LOG.info("internet_info start putting to HBase ... :"+ id +" "+ tempString); hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);//将数据添加至ES库Map<String,Object> esMap =newHashMap<String,Object>(); esMap.put("id", id); esMap.put("name", elements[0]); esMap.put("uid", elements[1]); esMap.put("address", elements[4]); esMap.put("happenedDate", elements[5]); elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME,ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info("start add document to ES..."+ConstantUtil.INDEX_NAME+" "+ConstantUtil.TYPE_NAME+" "+ id +" "+ esMap);}} reader.close();}//关卡登记信息elseif(file.getName().contains("bayonet")){BufferedReader reader =null; reader =newBufferedReader(newFileReader(filePath + file.getName()));String tempString =null;while((tempString = reader.readLine())!=null){//Blank line judgmentif(!tempString.isEmpty()){List<Put> putList =newArrayList<Put>();String[] elements = tempString.split(",");//生成不重复用户ID,String id =UUID.randomUUID().toString();Put put =newPut(Bytes.toBytes(id));//将数据添加至hbase库 put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("name"),Bytes.toBytes(elements[0])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("uid"),Bytes.toBytes(elements[1])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("age"),Bytes.toBytes(elements[2])); put.addColumn(Bytes.toBytes("Basic"),Bytes.toBytes("gender"),Bytes.toBytes(elements[3])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("event"),Bytes.toBytes("bayonet")); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("address"),Bytes.toBytes(elements[4])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("happenedDate"),Bytes.toBytes(elements[5])); put.addColumn(Bytes.toBytes("OtherInfo"),Bytes.toBytes("tripType"),Bytes.toBytes(elements[6])); putList.add(put); hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);ConstantUtil.LOG.info("bayonet_info start putting to HBase....:"+ id +" "+ tempString);//将数据添加至ES库Map<String,Object> esMap =newHashMap<String,Object>(); esMap.put("id", id); esMap.put("name", elements[0]); esMap.put("uid", elements[1]); esMap.put("address", elements[4]); esMap.put("happenedDate", elements[5]); elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME,ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info("start add document to ES..."+ConstantUtil.INDEX_NAME+" "+ConstantUtil.TYPE_NAME+" "+ id +" "+ esMap);}} reader.close();}//数据描述文件跳过else{continue;}}}ConstantUtil.LOG.info("load and insert done !!!!!!!!!!!!!!!!!!");}}publicstaticvoidstart()throwsException{LoadDataToHBaseAndES load2DB =newLoadDataToHBaseAndES(); load2DB.insert();}publicstaticvoidmain(String[] args)throwsException{start();}}
  13. 编写Query查询类,代码如下:packagecom.bigdata.query;importcom.alibaba.fastjson.JSONObject;importcom.bigdata.utils.ConstantUtil;importcom.bigdata.utils.ElasticSearchUtil;importcom.bigdata.utils.HBaseUtil;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.CellUtil;importorg.apache.hadoop.hbase.client.Result;importorg.apache.hadoop.hbase.util.Bytes;importjava.io.IOException;importjava.util.List;importjava.util.Map;/** * * 搜索逻辑是先搜索ElasticSearch,再查HBase */publicclassQuery{privateHBaseUtil hBaseUtil =newHBaseUtil();privateElasticSearchUtil elasticSearchUtil =newElasticSearchUtil();privateJSONObject result =newJSONObject();privateJSONObject tmpJS =newJSONObject();publicStringquery(String target){ result.clear(); tmpJS.clear();long startTime =System.currentTimeMillis();List<Map<String,Object>> listMap = elasticSearchUtil.queryStringQuery(target);long endTime =System.currentTimeMillis();float seconds =(endTime - startTime)/1000F;ConstantUtil.LOG.info("ElasticSearch查询耗时"+Float.toString(seconds)+" seconds.");for(Map<String,Object> m : listMap){String id = m.get("id").toString();JSONObject tmpJS =newJSONObject(); tmpJS.put("id", id);Result res =null;try{long s1 =System.currentTimeMillis(); res = hBaseUtil.get(ConstantUtil.TABLE_NAME, id);long e1 =System.currentTimeMillis();float se1 =(e1 - s1)/1000F;ConstantUtil.LOG.info("HBase查询耗时"+Float.toString(se1)+" seconds.");Cell[] cells = res.rawCells();for(Cell cell : cells){String col =Bytes.toString(CellUtil.cloneQualifier(cell));System.out.println(col);String value =Bytes.toString(CellUtil.cloneValue(cell));System.out.println(value); tmpJS.put(col, value);} result.put(id, tmpJS);}catch(IOException e){ e.printStackTrace(); result.put(id,"查询失败!");}}return result.toString();}publicstaticvoidmain(String[] args)throwsException{Query query =newQuery();long startTime =System.currentTimeMillis();System.out.println(query.query("100004"));long endTime =System.currentTimeMillis();float seconds =(endTime - startTime)/1000F;ConstantUtil.LOG.info(" 耗时"+Float.toString(seconds)+" seconds.");}}
  14. 编写ManagerQuery查询类,代码如下:packagecom.bigdata.manager;importorg.springframework.stereotype.Component;importcom.bigdata.query.Query;@ComponentpublicclassManagerQuery{privatestaticQuery query =newQuery();publicstaticStringgetQueryResult(String target){try{String result = query.query(target);System.out.println(result);return result;}catch(Exception e){ e.printStackTrace();return"查询出现异常,请通知研发人员!";}}publicstaticvoidmain(String[] args){String target ="牧之桃";String result =ManagerQuery.getQueryResult(target);System.out.println(result);}}
  15. 编写SearchService服务类(可参考SpringMVC代码写作),代码如下:packagecom.bigdata.service;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.EnableAutoConfiguration;importorg.springframework.context.annotation.ComponentScan;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importcom.bigdata.manager.ManagerQuery;@RestController@EnableAutoConfiguration@ComponentScan(basePackages ={"com.bigdata"})publicclassSearchService{@RequestMapping("/search")publicStringsearch(String target){try{returnManagerQuery.getQueryResult(target);}catch(Exception e){ e.printStackTrace();}return"不小心出错了!";}// 主方法,像一般的Java类一般去右击run as application时候,执行该方法publicstaticvoidmain(String[] args)throwsException{SpringApplication.run(SearchService.class, args);}}
  16. 编写SearchController控制类(可参考SpringMVC代码写作),代码如下:packagecom.bigdata.controller;importorg.springframework.boot.SpringApplication;importorg.springframework.stereotype.Controller;importorg.springframework.ui.ModelMap;importorg.springframework.web.bind.annotation.RequestMapping;/** * 注解声明,该类为Controller类 并自动加载所需要的其它类 */@ControllerpublicclassSearchController{@RequestMapping("/index")Stringtestdo(ModelMap map){//这里返回HTML页面return"index_search";}// 主方法,像一般的Java类一般去右击run as application时候,执行该方法publicstaticvoidmain(String[] args){SpringApplication.run(SearchController.class, args);}}
  17. 编写ApplicationBootSystem启动类,代码如下:packagecom.bigdata.boot;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.annotation.ComponentScan;/** * 根启动类 */@SpringBootApplication@ComponentScan(basePackages ="com.bigdata")publicclassApplicationBootSystem{publicstaticvoidmain(String[] args){SpringApplication.run(ApplicationBootSystem.class, args);}}
  18. 新建static并在其下新建plugins,并将bootstrap-3.3.7和bootstrap-table包复制到该目录下在这里插入图片描述
  19. 新建template目录,并在其下面新建index_search.html文件在这里插入图片描述 具体代码如下:<!DOCTYPEhtml><htmllang="en"><head><metacharset="utf-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge"><metaname="viewport"content="width=device-width, initial-scale=1"><!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --><title>Realtime Search</title><!-- Bootstrap --><linkhref="plugins/bootstrap-3.3.7/css/bootstrap.min.css"rel="stylesheet"><linkhref="plugins/bootstrap-table/bootstrap-table.min.css"rel="stylesheet"><!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --><!-- WARNING: Respond.js doesn't work if you view the page via file:// --><!--[if lt IE 9]> <script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script>; <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>; <![endif]--></head><body><divclass="container"><divclass="row"><!-- onsubmit设置成return false,不再显式提交form --><divclass="col-md-8 col-md-offset-2 text-center"onsubmit="returnfalse"><formclass="form-inline"><divclass="form-group"><labelfor="target">请输入条件</label><inputtype="text"class="form-control"id="target"name="target"placeholder="请输入条件"></div><buttontype="submit"id="submit"class="btn btn-primary">搜一下</button></form></div></div><!-- 在下一行中,添加一个bs系统自带的表格 --><divclass="row"><tableid="table"></table></div></div><!-- jQuery (necessary for Bootstrap's JavaScript plugins) --><scriptsrc="http://code.jquery.com/jquery-1.12.1.min.js";></script><!-- Include all compiled plugins (below), or include individual files as needed --><scriptsrc="plugins/bootstrap-3.3.7/js/bootstrap.min.js"></script><!-- 加入bootstrap table依赖 --><scriptsrc="plugins/bootstrap-table/bootstrap-table.min.js"></script><scriptsrc="plugins/bootstrap-table/bootstrap-table-locale-all.min.js"></script><scripttype="text/javascript">$(function(){<!--初始化表格的样式 -->$('#table').bootstrapTable({columns:[{field:'id',title:'记录id',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'name',title:'姓名',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'uid',title:'用户id',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'age',title:'年龄',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'gender',title:'性别',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'event',title:'事件',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'address',title:'地址',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'happenedDate',title:'发生时间',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'acquaintancer',title:'同行人',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}},{field:'endDate',title:'结束时间',formatter:function(value, row, index){var a ="";if(value ==$("#target").val()){ a ='<span style="color:#5858FA">'+ value +'</span>';}else{ a ='<span style="color#190707">'+ value +'</span>';}return a;}}]});//为submit按钮绑定click事件,填充点击查询后的数据查询$("#submit").click(function(){ $.ajax({url:'/bigdata/search',data:"target="+$("#target").val(),cache:false,//false是不缓存,true为缓存async:true,//true为异步,false为同步beforeSend:function(){//请求前},success:function(result){try{var resultArray =newArray(); js =JSON.parse(result);for(var p in js){ resultArray.push(js[p]) console.log(js[p]);} console.log(resultArray);$("#table").bootstrapTable('load', resultArray);}catch(e){ window.alert(result);$("#table").bootstrapTable('load',[{"result":"什么也没有找到"}]);}},complete:function(){//请求结束时},error:function(){//请求失败时}})});});</script></body></html>
  20. 写完成后,项目结构如下所示在这里插入图片描述

四、测试流程

  1. 先执行HBaseUtil工具类main方法,完成HBase测试表和目标表的创建,验证程序和hbase的连通性;
  2. 再执行ElasticSearch工具类main方法,完成ElasticSearch测试表和目标表的创建,验证程序和ElasticSearch的连通性;
  3. 再执行LoadDataToHBaseAndES类,完成数据写入HBase和ElasticSearch中;
  4. 再执行ApplicationBootSystem启动类,启动springboot入口程序;
  5. 最后打开浏览器输入:http://localhost:8084/bigdata/index,在打开的界面中的搜索框输入查询关键字,如输入3,点击【搜一下】按钮,正常情况下会看到如下结果:![在这里插入图片描述](https://img-blog.csdnimg.cn/406c492eb6a64c62bcd8fbd9062a1bc2.png)
  6. 尝试输入不同的条件,查看到不同的结果,注意: 需要观察检索的实时性或者速度是很快的。另外,还可以尝试下修改测试数据集,使得其数据量变得更大些,然后再查看其检索速度,读者可以自行尝试。

本文转载自: https://blog.csdn.net/sujiangming/article/details/132073535
版权归原作者 若兰幽竹 所有, 如有侵权,请联系我们删除。

“【基于HBase和ElasticSearch构建大数据实时检索项目】”的评论:

还没有评论