一、基本语法的介绍:
1. 数据插入(INSERT)
INSERT cpu_usage,host=serverA,region=us_west value=0.65 1434055562000000000
其中,
cpu_usage
是测量名称,
host=serverA
和
region=us_west
是标签,
value=0.65
是字段,
1434055562000000000
是时间戳。
2. 数据查询(SELECT)
SELECT * FROM cpu_usage WHERE host ='serverA'
3. 时间范围查询
SELECT * FROM cpu_usage WHERE time >= '2024-07-28T00:00:00Z' AND time <= '2024-07-28T23:59:59Z'
4. 聚合函数
例如,计算平均值:
SELECT MEAN(value) FROM cpu_usage
常见的聚合函数还有
SUM
(求和)、
MIN
(最小值)、
MAX
(最大值)等。
5. GROUP BY 分组
SELECT mean(value) FROM cpu_usage GROUP BY host
按照
host
标签进行分组计算平均值。
6. 删除数据(DELETE)
DELETE FROM cpu_usage WHERE time < '2024-07-27T00:00:00Z'
整合sping条件查询语句
Flux查询代码
from(bucket: "桶名")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)//例如时间查询从六小时前开始查range(start: -6h)
|> filter(fn: (r) => r["_measurement"] == "相当于表名" and r["device_id"] == "123")
filter过滤条件
例如
from(bucket: "position1"):指定要从名为 position1 的桶中获取数据。
|> range(start: -6h):表示设置时间范围,获取从当前时间往前 6 小时的数据。
|> filter(fn: (r) => r["_measurement"] == "device_history_location"):使用 filter 函数对数据进行筛选,只保留 _measurement 字段值为 device_history_location 的记录。
|> pivot(rowKey:["_time"],columnKey: ["_field"],valueColumn: "_value"):使用 pivot 操作将数据进行行列转换,以 _time 作为行键,_field 作为列键,_value 作为值列。
如果原始数据中有 _time、_measurement、_field 和 _value 等列,通过这个查询,可以将不同的 _field 值转换为列,方便以特定的格式查看和处理数据。
假设原始数据如下:
_time _measurement _field _value
2024-07-28T10:00:00Z device_history_location latitude 30.0
2024-07-28T10:00:00Z device_history_location longitude 120.0
经过上述查询处理后,可能会转换为类似以下的格式:
_time latitude longitude
2024-07-28T10:00:00Z 30.0 120.0
二 SpringBoot整合
1.引入pom
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.7.0</version>
</dependency>
2.配置文件
influxdb.yml
influxdb:
url: "http://locahost:8086"
token: "your_token"
org: "your_orgorganizationName"
bukcet: "your_bucketName"
config
@ConfigurationProperties(prefix = "influxdb")
public class InfluxDBConfig {
private String url;
private char[] token;
private String org;
private String bucket;
public InfluxDBClient createClient(){
return InfluxDBClientFactory.create(this.url,
this.token, this.org,
this.bucket);
}
}
JavaBean示例
实体类
@Data
@Accessors(chain = true)
@Measurement(name = "device_history_location")
public class InfluxPosition {
/**
* 设备id
*/
@Column(name = "device_id",tag = true)
private String deviceId;
/**
* 车辆id
*/
@Column(name = "vehicle_id")
private String vehicleId;
/**
* 位置时间
*/
@Column(timestamp = true)
private Instant locationTime;
/**
* 经度
*/
@Column(name = "longitude")
private BigDecimal longitude;
/**
* 纬度
*/
@Column(name = "latitude")
private BigDecimal latitude;
}
写入
private static void writeData(InfluxDBClient client) {
WriteApiBlocking writeApiBlocking = client.getWriteApiBlocking();
List<InfluxPosition> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
InfluxPosition position = new InfluxPosition()
.setDeviceId("123")
.setVehicleId("321")
.setLocationTime(Instant.now())
.setLongitude(new BigDecimal("113.12313"))
.setLatitude( new BigDecimal("23.8524"))
list.add(position);
}
writeApiBlocking.writeMeasurements("position1","test",WritePrecision.MS,list);
}
查询
private static List<FluxTable> queryData(InfluxDBClient client) {
String flux = "from(bucket: \"position1\")\n" +
" |> range(start: -6h)\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"device_history_location\")" +
" |> pivot(rowKey:[\"_time\"],columnKey: [\"_field\"],valueColumn: \"_value\")";
List<FluxTable> query = client.getQueryApi().query(flux);
for (FluxTable table : query) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
log.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
}
}
return query;
}
测试
//插入数据
public static void main(String[] args) {
InfluxDBClient client = InfluxDB.creatClient;
writeData(client);//
}
查询
public static void main(String[] args) {
InfluxDBClient client = getClient();
List<FluxTable> query = queryData(client);
FluxResultMapper resultMapper = new FluxResultMapper();
for (FluxTable table : query) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
log.info("values:{}---time:{}", record.getValues(),record.getTime());
//进行转换或者其他操作...
InfluxPosition influxPosition = resultMapper.toPOJO(record,
InfluxPosition.class);
}
}
}
time在转换时时区可能会发生变化,可以参考如下方法进行时区转换
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
public class TimeZoneConversionExample {
public static void convertToBeijingTime(String timeString) {
// 假设原始的时间字符串格式为 "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
// 解析原始时间字符串为 ZonedDateTime 对象
ZonedDateTime originalZonedDateTime = ZonedDateTime.parse(timeString, formatter);
// 将原始时区的时间转换为东八区(北京时间)
ZonedDateTime beijingZonedDateTime = originalZonedDateTime.withZoneSameInstant(ZoneId.of("Asia/Shanghai"));
// 输出转换后的时间
System.out.println("转换后的北京时间: " + beijingZonedDateTime);
}
public static void main(String[] args) {
// 示例输入的时间字符串
String time = "2024-07-28T12:30:00.000Z";
convertToBeijingTime(time);
}
}
版权归原作者 DCDDDDcccc 所有, 如有侵权,请联系我们删除。