Springboot整合HBase数据库
1、添加依赖
<!-- Spring Boot HBase 依赖 --><dependency><groupId>com.spring4all</groupId><artifactId>spring-boot-starter-hbase</artifactId></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-hadoop-hbase</artifactId><version>2.5.0.RELEASE</version></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-hadoop</artifactId><version>2.5.0.RELEASE</version></dependency>
2、添加配置
通过Yaml方式配置
spring:hbase:zookeeper:quorum: hbase1.xxx.org,hbase2.xxx.org,hbase3.xxx.org
property:clientPort:2181data:hbase:quorum: XXX
rootDir: XXX
nodeParent: XXX
zookeeper:znode:parent: /hbase
3、添加配置类
@ConfigurationpublicclassHBaseConfig{@BeanpublicHBaseServicegetHbaseService(){//设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到//System.setProperty("hadoop.home.dir", "D:\\Program Files\\Hadoop");//执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xmlorg.apache.hadoop.conf.Configuration conf =HBaseConfiguration.create();returnnewHBaseService(conf);}}
4、工具类的方式实现HBASE操作
@ServicepublicclassHBaseService{privateAdmin admin =null;privateConnection connection =null;publicHBaseService(Configuration conf){
connection =ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();}//创建表 create <table>, {NAME => <column family>, VERSIONS => <VERSIONS>}publicbooleancreatTable(String tableName,List<String> columnFamily){//列族column familyList<ColumnFamilyDescriptor> cfDesc =newArrayList<>(columnFamily.size());
columnFamily.forEach(cf ->{
cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());});//表 tableTableDescriptor tableDesc =TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamilies(cfDesc).build();if(admin.tableExists(TableName.valueOf(tableName))){
log.debug("table Exists!");}else{
admin.createTable(tableDesc);
log.debug("create table Success!");}close(admin,null,null);returntrue;}publicList<String>getAllTableNames(){List<String> result =newArrayList<>();TableName[] tableNames = admin.listTableNames();for(TableName tableName : tableNames){
result.add(tableName.getNameAsString());}close(admin,null,null);return result;}publicMap<String,Map<String,String>>getResultScanner(String tableName){Scan scan =newScan();returnthis.queryData(tableName, scan);}privateMap<String,Map<String,String>>queryData(String tableName,Scan scan){// <rowKey,对应的行数据>Map<String,Map<String,String>> result =newHashMap<>();ResultScanner rs =null;//获取表Table table =null;
table =getTable(tableName);
rs = table.getScanner(scan);for(Result r : rs){// 每一行数据Map<String,String> columnMap =newHashMap<>();String rowKey =null;// 行键,列族和列限定符一起确定一个单元(Cell)for(Cell cell : r.listCells()){if(rowKey ==null){
rowKey =Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());}
columnMap.put(//列限定符Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),//列族Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}if(rowKey !=null){
result.put(rowKey, columnMap);}}close(null, rs, table);return result;}publicvoidputData(String tableName,String rowKey,String familyName,String[] columns,String[] values){Table table =null;
table =getTable(tableName);putData(table, rowKey, tableName, familyName, columns, values);close(null,null, table);}privatevoidputData(Table table,String rowKey,String tableName,String familyName,String[] columns,String[] values){//设置rowkeyPut put =newPut(Bytes.toBytes(rowKey));if(columns !=null&& values !=null&& columns.length == values.length){for(int i =0; i < columns.length; i++){if(columns[i]!=null&& values[i]!=null){
put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes(columns[i]),Bytes.toBytes(values[i]));}else{thrownewNullPointerException(MessageFormat.format("列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));}}}
table.put(put);
log.debug("putData add or update data Success,rowKey:"+ rowKey);
table.close();}privateTablegetTable(String tableName)throwsIOException{return connection.getTable(TableName.valueOf(tableName));}privatevoidclose(Admin admin,ResultScanner rs,Table table){if(admin !=null){try{
admin.close();}catch(IOException e){
log.error("关闭Admin失败", e);}if(rs !=null){
rs.close();}if(table !=null){
rs.close();}if(table !=null){try{
table.close();}catch(IOException e){
log.error("关闭Table失败", e);}}}}}
测试类
@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestclassHBaseApplicationTests{@ResourceprivateHBaseService hbaseService;//测试创建表@TestpublicvoidtestCreateTable(){
hbaseService.creatTable("test_base",Arrays.asList("a","back"));}//测试加入数据@TestpublicvoidtestPutData(){
hbaseService.putData("test_base","000001","a",newString[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},newString[]{"40866","mob_3","0.9416","0.0000","12.2293","null"});
hbaseService.putData("test_base","000002","a",newString[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},newString[]{"40866","idno_prov","0.9317","0.0000","9.8679","null"});
hbaseService.putData("test_base","000003","a",newString[]{"project_id","varName","coefs","pvalues","tvalues","create_time"},newString[]{"40866","education","0.8984","0.0000","25.5649","null"});}//测试遍历全表@TestpublicvoidtestGetResultScanner(){Map<String,Map<String,String>> result2 = hbaseService.getResultScanner("test_base");System.out.println("-----遍历查询全表内容-----");
result2.forEach((k, value)->{System.out.println(k +"--->"+ value);});}}
三、使用spring-data-hadoop-hbase
3、配置类
@ConfigurationpublicclassHBaseConfiguration{@Value("${hbase.zookeeper.quorum}")privateString zookeeperQuorum;@Value("${hbase.zookeeper.property.clientPort}")privateString clientPort;@Value("${zookeeper.znode.parent}")privateString znodeParent;@BeanpublicHbaseTemplatehbaseTemplate(){org.apache.hadoop.conf.Configuration conf =neworg.apache.hadoop.conf.Configuration();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", clientPort);
conf.set("zookeeper.znode.parent", znodeParent);returnnewHbaseTemplate(conf);}}
4、业务类中使用HbaseTemplate
这个是作为工具类
@Service@Slf4jpublicclassHBaseService{@AutowiredprivateHbaseTemplate hbaseTemplate;//查询列簇publicList<Result>getRowKeyAndColumn(String tableName,String startRowkey,String stopRowkey,String column,String qualifier){FilterList filterList =newFilterList(FilterList.Operator.MUST_PASS_ALL);if(StringUtils.isNotBlank(column)){
log.debug("{}", column);
filterList.addFilter(newFamilyFilter(CompareFilter.CompareOp.EQUAL,newBinaryComparator(Bytes.toBytes(column))));}if(StringUtils.isNotBlank(qualifier)){
log.debug("{}", qualifier);
filterList.addFilter(newQualifierFilter(CompareFilter.CompareOp.EQUAL,newBinaryComparator(Bytes.toBytes(qualifier))));}Scan scan =newScan();if(filterList.getFilters().size()>0){
scan.setFilter(filterList);}
scan.setStartRow(Bytes.toBytes(startRowkey));
scan.setStopRow(Bytes.toBytes(stopRowkey));return hbaseTemplate.find(tableName, scan,(rowMapper, rowNum)-> rowMapper);}publicList<Result>getListRowkeyData(String tableName,List<String> rowKeys,String familyColumn,String column){return rowKeys.stream().map(rk ->{if(StringUtils.isNotBlank(familyColumn)){if(StringUtils.isNotBlank(column)){return hbaseTemplate.get(tableName, rk, familyColumn,
column,(rowMapper, rowNum)-> rowMapper);}else{return hbaseTemplate.get(tableName, rk, familyColumn,(rowMapper, rowNum)-> rowMapper);}}return hbaseTemplate.get(tableName, rk,(rowMapper, rowNum)-> rowMapper);}).collect(Collectors.toList());}}
四、使用spring-boot-starter-data-hbase
参考:https://blog.csdn.net/cpongo1/article/details/89550486
## 下载spring-boot-starter-hbase代码
git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git
## 安装
cd spring-boot-starter-hbase
mvn clean install
2、添加配置项
- spring.data.hbase.quorum 指定 HBase 的 zk 地址
- spring.data.hbase.rootDir 指定 HBase 在 HDFS 上存储的路径
- spring.data.hbase.nodeParent 指定 ZK 中 HBase 的根 ZNode
3、定义好DTO
@DatapublicclassCity{privateLong id;privateInteger age;privateString cityName;}
4、创建对应rowMapper
publicclassCityRowMapperimplementsRowMapper<City>{privatestaticbyte[]COLUMN_FAMILY="f".getBytes();privatestaticbyte[]NAME="name".getBytes();privatestaticbyte[]AGE="age".getBytes();@OverridepublicCitymapRow(Result result,int rowNum)throwsException{String name =Bytes.toString(result.getValue(COLUMN_FAMILY,NAME));int age =Bytes.toInt(result.getValue(COLUMN_FAMILY,AGE));City dto =newCity();
dto.setCityName(name);
dto.setAge(age);return dto;}}
5、操作实现增改查
- HbaseTemplate.find 返回 HBase 映射的 City 列表
- HbaseTemplate.get 返回 row 对应的 City 信息
- HbaseTemplate.saveOrUpdates 保存或者更新 如果 HbaseTemplate 操作不满足需求,完全可以使用 hbaseTemplate 的getConnection() 方法,获取连接。进而类似 HbaseTemplate 实现的逻辑,实现更复杂的需求查询等功能
@ServicepublicclassCityServiceImplimplementsCityService{@AutowiredprivateHbaseTemplate hbaseTemplate;//查询publicList<City>query(String startRow,String stopRow){Scan scan =newScan(Bytes.toBytes(startRow),Bytes.toBytes(stopRow));
scan.setCaching(5000);List<City> dtos =this.hbaseTemplate.find("people_table", scan,newCityRowMapper());return dtos;}//查询publicCityquery(String row){City dto =this.hbaseTemplate.get("people_table", row,newCityRowMapper());return dto;}//新增或者更新publicvoidsaveOrUpdate(){List<Mutation> saveOrUpdates =newArrayList<Mutation>();Put put =newPut(Bytes.toBytes("135xxxxxx"));
put.addColumn(Bytes.toBytes("people"),Bytes.toBytes("name"),Bytes.toBytes("test"));
saveOrUpdates.add(put);this.hbaseTemplate.saveOrUpdates("people_table", saveOrUpdates);}}
Springboot整合Influxdb
中文文档:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html
注意,项目建立在spring-boot-web基础上
1、添加依赖
<dependency><groupId>org.influxdb</groupId><artifactId>influxdb-java</artifactId><version>2.15</version></dependency>
2、添加配置
spring:influx:database: my_sensor1
password: admin
url: http://127.0.0.1:6086user: admin
3、编写配置类
@ConfigurationpublicclassInfluxdbConfig{@Value("${spring.influx.url}")privateString influxDBUrl;@Value("${spring.influx.user}")privateString userName;@Value("${spring.influx.password}")privateString password;@Value("${spring.influx.database}")privateString database;@Bean("influxDB")publicInfluxDBinfluxdb(){InfluxDB influxDB =InfluxDBFactory.connect(influxDBUrl, userName, password);try{/**
* 异步插入:
* enableBatch这里第一个是point的个数,第二个是时间,单位毫秒
* point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒
* 满足任何一个条件就会发送一次写的请求。
*/
influxDB.setDatabase(database).enableBatch(100,1000*60,TimeUnit.MILLISECONDS);}catch(Exception e){
e.printStackTrace();}finally{//设置默认策略
influxDB.setRetentionPolicy("sensor_retention");}//设置日志输出级别
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);return influxDB;}}
4、InfluxDB原生API实现
@SpringBootTest(classes ={MainApplication.class})@RunWith(SpringJUnit4ClassRunner.class)publicclassInfluxdbDBTest{@AutowiredprivateInfluxDB influxDB;//measurementprivatefinalString measurement ="sensor";@Value("${spring.influx.database}")privateString database;/**
* 批量插入第一种方式
*/@Testpublicvoidinsert(){List<String> lines =newArrayList<String>();Point point =null;for(int i=0;i<50;i++){
point =Point.measurement(measurement).tag("deviceId","sensor"+ i).addField("temp",3).addField("voltage",145+i).addField("A1","4i").addField("A2","4i").build();
lines.add(point.lineProtocol());}//写入
influxDB.write(lines);}/**
* 批量插入第二种方式
*/@TestpublicvoidbatchInsert(){BatchPoints batchPoints =BatchPoints.database(database).consistency(InfluxDB.ConsistencyLevel.ALL).build();//遍历sqlserver获取数据for(int i=0;i<50;i++){//创建单条数据对象——表名Point point =Point.measurement(measurement)//tag属性——只能存储String类型.tag("deviceId","sensor"+ i).addField("temp",3).addField("voltage",145+i).addField("A1","4i").addField("A2","4i").build();//将单条数据存储到集合中
batchPoints.point(point);}//批量插入
influxDB.write(batchPoints);}/**
* 获取数据
*/@Testpublicvoiddatas(@RequestParamInteger page){int pageSize =10;// InfluxDB支持分页查询,因此可以设置分页查询条件String pageQuery =" LIMIT "+ pageSize +" OFFSET "+(page -1)* pageSize;String queryCondition ="";//查询条件暂且为空// 此处查询所有内容,如果String queryCmd ="SELECT * FROM "// 查询指定设备下的日志信息// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;// + 策略name + "." + measurement+ measurement
// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)+ queryCondition
// 查询结果需要按照时间排序+" ORDER BY time DESC"// 添加分页查询条件+ pageQuery;QueryResult queryResult = influxDB.query(newQuery(queryCmd, database));System.out.println("query result => "+queryResult);}}
5、采用封装工具类
1、创建实体类
@Data@Measurement(name ="sensor")publicclassSensor{@Column(name="deviceId",tag=true)privateString deviceId;@Column(name="temp")privatefloat temp;@Column(name="voltage")privatefloat voltage;@Column(name="A1")privatefloatA1;@Column(name="A2")privatefloatA2;@Column(name="time")privateString time;}
2、创建工具类
@ComponentpublicclassInfluxdbUtils{@AutowiredprivateInfluxDB influxDB;@Value("${spring.influx.database}")privateString database;/**
* 新增单条记录,利用java的反射机制进行新增操作
*/@SneakyThrowspublicvoidinsertOne(Object obj){//获取度量Class<?> clasz = obj.getClass();Measurement measurement = clasz.getAnnotation(Measurement.class);//构建Point.Builder builder =Point.measurement(measurement.name());// 获取对象属性Field[] fieldArray = clasz.getDeclaredFields();Column column =null;for(Field field : fieldArray){
column = field.getAnnotation(Column.class);//设置属性可操作
field.setAccessible(true);if(column.tag()){//tag属性只能存储String类型
builder.tag(column.name(), field.get(obj).toString());}else{//设置fieldif(field.get(obj)!=null){
builder.addField(column.name(), field.get(obj).toString());}}}
influxDB.write(builder.build());}/**
* 批量新增,方法一
*/@SneakyThrowspublicvoidinsertBatchByRecords(List<?> records){List<String> lines =newArrayList<String>();
records.forEach(record->{Class<?> clasz = record.getClass();//获取度量Measurement measurement = clasz.getAnnotation(Measurement.class);//构建Point.Builder builder =Point.measurement(measurement.name());Field[] fieldArray = clasz.getDeclaredFields();Column column =null;for(Field field : fieldArray){
column = field.getAnnotation(Column.class);//设置属性可操作
field.setAccessible(true);if(column.tag()){//tag属性只能存储String类型
builder.tag(column.name(), field.get(record).toString());}else{//设置fieldif(field.get(record)!=null){
builder.addField(column.name(), field.get(record).toString());}}}
lines.add(builder.build().lineProtocol());});
influxDB.write(lines);}/**
* 批量新增,方法二
*/@SneakyThrowspublicvoidinsertBatchByPoints(List<?> records){BatchPoints batchPoints =BatchPoints.database(database).consistency(InfluxDB.ConsistencyLevel.ALL).build();
records.forEach(record->{Class<?> clasz = record.getClass();//获取度量Measurement measurement = clasz.getAnnotation(Measurement.class);//构建Point.Builder builder =Point.measurement(measurement.name());Field[] fieldArray = clasz.getDeclaredFields();Column column =null;for(Field field : fieldArray){
column = field.getAnnotation(Column.class);//设置属性可操作
field.setAccessible(true);if(column.tag()){//tag属性只能存储String类型
builder.tag(column.name(), field.get(record).toString());}else{//设置fieldif(field.get(record)!=null){
builder.addField(column.name(), field.get(record).toString());}}}
batchPoints.point(builder.build());});
influxDB.write(batchPoints);}/**
* 查询,返回Map集合
* @param query 完整的查询语句
*/publicList<Object>fetchRecords(String query){List<Object> results =newArrayList<Object>();QueryResult queryResult = influxDB.query(newQuery(query, database));
queryResult.getResults().forEach(result->{
result.getSeries().forEach(serial->{List<String> columns = serial.getColumns();int fieldSize = columns.size();
serial.getValues().forEach(value->{Map<String,Object> obj =newHashMap<String,Object>();for(int i=0;i<fieldSize;i++){
obj.put(columns.get(i), value.get(i));}
results.add(obj);});});});return results;}/**
* 查询,返回map集合
* @param fieldKeys 查询的字段,不可为空;不可为单独的tag
* @param measurement 度量,不可为空;
*/publicList<Object>fetchRecords(String fieldKeys,String measurement){StringBuilder query =newStringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);returnthis.fetchRecords(query.toString());}/**
* 查询,返回map集合
* @param fieldKeys 查询的字段,不可为空;不可为单独的tag
* @param measurement 度量,不可为空;
*/publicList<Object>fetchRecords(String fieldKeys,String measurement,String order){StringBuilder query =newStringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);returnthis.fetchRecords(query.toString());}/**
* 查询,返回map集合
* @param fieldKeys 查询的字段,不可为空;不可为单独的tag
* @param measurement 度量,不可为空;
*/publicList<Object>fetchRecords(String fieldKeys,String measurement,String order,String limit){StringBuilder query =newStringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);
query.append(limit);returnthis.fetchRecords(query.toString());}/**
* 查询,返回对象的list集合
*/@SneakyThrowspublic<T>List<T>fetchResults(String query,Class<?> clasz){List results =newArrayList<>();QueryResult queryResult = influxDB.query(newQuery(query, database));
queryResult.getResults().forEach(result->{
result.getSeries().forEach(serial->{List<String> columns = serial.getColumns();int fieldSize = columns.size();
serial.getValues().forEach(value->{Object obj =null;
obj = clasz.newInstance();for(int i=0;i<fieldSize;i++){String fieldName = columns.get(i);Field field = clasz.getDeclaredField(fieldName);
field.setAccessible(true);Class<?> type = field.getType();if(type ==float.class){
field.set(obj,Float.valueOf(value.get(i).toString()));}else{
field.set(obj, value.get(i));}}
results.add(obj);});});});return results;}/**
* 查询,返回对象的list集合
*/public<T>List<T>fetchResults(String fieldKeys,String measurement,Class<?> clasz){StringBuilder query =newStringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);returnthis.fetchResults(query.toString(), clasz);}/**
* 查询,返回对象的list集合
*/public<T>List<T>fetchResults(String fieldKeys,String measurement,String order,Class<?> clasz){StringBuilder query =newStringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);returnthis.fetchResults(query.toString(), clasz);}/**
* 查询,返回对象的list集合
*/public<T>List<T>fetchResults(String fieldKeys,String measurement,String order,String limit,Class<?> clasz){StringBuilder query =newStringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);
query.append(limit);returnthis.fetchResults(query.toString(), clasz);}}
3、使用工具类的测试代码
@SpringBootTest(classes ={MainApplication.class})@RunWith(SpringJUnit4ClassRunner.class)publicclassInfluxdbUtilTest{@AutowiredprivateInfluxdbUtils influxdbUtils;/**
* 插入单条记录
*/@Testpublicvoidinsert(){Sensor sensor =newSensor();
sensor.setA1(10);
sensor.setA2(10);
sensor.setDeviceId("0002");
sensor.setTemp(10L);
sensor.setTime("2021-01-19");
sensor.setVoltage(10);
influxdbUtils.insertOne(sensor);}/**
* 批量插入第一种方式
*/@GetMapping("/index22")publicvoidbatchInsert(){List<Sensor> sensorList =newArrayList<Sensor>();for(int i=0; i<50; i++){Sensor sensor =newSensor();
sensor.setA1(2);
sensor.setA2(12);
sensor.setTemp(9);
sensor.setVoltage(12);
sensor.setDeviceId("sensor4545-"+i);
sensorList.add(sensor);}
influxdbUtils.insertBatchByRecords(sensorList);}/**
* 批量插入第二种方式
*/@GetMapping("/index23")publicvoidbatchInsert1(){List<Sensor> sensorList =newArrayList<Sensor>();Sensor sensor =null;for(int i=0; i<50; i++){
sensor =newSensor();
sensor.setA1(2);
sensor.setA2(12);
sensor.setTemp(9);
sensor.setVoltage(12);
sensor.setDeviceId("sensor4545-"+i);
sensorList.add(sensor);}
influxdbUtils.insertBatchByPoints(sensorList);}/**
* 查询数据
*/@GetMapping("/datas2")publicvoiddatas(@RequestParamInteger page){int pageSize =10;// InfluxDB支持分页查询,因此可以设置分页查询条件String pageQuery =" LIMIT "+ pageSize +" OFFSET "+(page -1)* pageSize;String queryCondition ="";//查询条件暂且为空// 此处查询所有内容,如果String queryCmd ="SELECT * FROM sensor"// 查询指定设备下的日志信息// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;// + 策略name + "." + measurement// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)+ queryCondition
// 查询结果需要按照时间排序+" ORDER BY time DESC"// 添加分页查询条件+ pageQuery;List<Object> sensorList = influxdbUtils.fetchRecords(queryCmd);System.out.println("query result => {}"+sensorList );}/**
* 获取数据
*/@GetMapping("/datas21")publicvoiddatas1(@RequestParamInteger page){int pageSize =10;// InfluxDB支持分页查询,因此可以设置分页查询条件String pageQuery =" LIMIT "+ pageSize +" OFFSET "+(page -1)* pageSize;String queryCondition ="";//查询条件暂且为空// 此处查询所有内容,如果String queryCmd ="SELECT * FROM sensor"// 查询指定设备下的日志信息// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;// + 策略name + "." + measurement// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)+ queryCondition
// 查询结果需要按照时间排序+" ORDER BY time DESC"// 添加分页查询条件+ pageQuery;List<Sensor> sensorList = influxdbUtils.fetchResults(queryCmd,Sensor.class);//List<Sensor> sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class);
sensorList.forEach(sensor->{System.out.println("query result => {}"+sensorList );});}}
6、采用封装数据模型的方式
1、在Influxdb库中创建存储策略
CREATE RETENTION POLICY "rp_order_payment"ON"db_order" DURATION 30d REPLICATION1DEFAULT
2、创建数据模型
@Data@Measurement(name ="m_order_payment",
database ="db_order",
retentionPolicy ="rp_order_payment")publicclassOrderPaymentimplementsSerializable{// 统计批次@Column(name ="batch_id", tag =true)privateString batchId;// 哪个BU@Column(name ="bu_id", tag =true)privateString buId;// BU 名称@Column(name ="bu_name")privateString buName;// 总数@Column(name ="total_count", tag =true)privateString totalCount;// 支付量@Column(name ="pay_count", tag =true)privateString payCount;// 金额@Column(name ="total_money", tag =true)privateString totalMoney;}
3、创建Mapper
publicclassInfluxMapperextendsInfluxDBMapper{publicInfluxMapper(InfluxDB influxDB){super(influxDB);}}
4、配置Mapper
@Log4j2@ConfigurationpublicclassInfluxAutoConfiguration{@BeanpublicInfluxMapperinfluxMapper(InfluxDB influxDB){InfluxMapper influxMapper =newInfluxMapper(influxDB);return influxMapper;}}
5、测试CRUD
@SpringBootTest(classes ={MainApplication.class})@RunWith(SpringJUnit4ClassRunner.class)publicclassInfluxdbMapperTest{@AutowiredprivateInfluxMapper influxMapper;@Testpublicvoidsave(OrderPayment product){
influxMapper.save(product);}@TestpublicvoidqueryAll(){List<OrderPayment> products = influxMapper.query(OrderPayment.class);System.out.println(products);}@TestpublicvoidqueryByBu(String bu){String sql =String.format("%s'%s'","select * from m_order_payment where bu_id = ", bu);Query query =newQuery(sql,"db_order");List<OrderPayment> products = influxMapper.query(query,OrderPayment.class);System.out.println(products);}}
参考:https://blog.csdn.net/cpongo1/article/details/89550486
版权归原作者 天道酬勤的博客 所有, 如有侵权,请联系我们删除。