参考文档:时间序列数据库 TSDB_时间序列数据库 TSDB-阿里云帮助中心
什么是时序数据库
时序数据是随时间不断产生的一系列数据,简单来说,就是带时间戳的数据。数据可能来自服务器和应用程序的指标、物联网传感器的读数、网站或应用程序上的用户交互或金融市场上的交易活动等。
时序数据的主要数据属性如下:
- 每个数据点都包含用于索引、聚合和采样的时间戳;
- 写多读少,高频写入,写入操作几乎是顺序添加,大多数时候数据到达后都以时间排序;
- 多用于数据的汇总视图,数据汇总入库,时间段查询;
时序数据库相关名词解释
- 度量 Metric:Metric 类似关系型数据库里的表(Table),代表一系列同类数据集合。
- 标签 Tag:Tag 描述数据源的特征,通常不随时间变化,数据库内部会自动为 Tag 建立索引,支持根据 Tag 来进行多维检索查询,Tag 由 Key、Value 组成,两者均为 String 类型;
- 时间戳 Timestamp:Timestamp 代表数据产生的时间点,可以写入时指定,也可由系统自动生成;
- 量测值 Field:Field 描述数据源的量测指标,通常随着时间不断变化,且通常为可计算的数值;
- 数据点 Data Point: 数据源在某个时间产生的某个量测指标值(Field Value)称为一个数据点,数据库查询、写入时按数据点数来作为统计指标;
- 时间线 Time Series :数据源的某一个指标随时间变化,形成时间线,Metric + Tags 组合确定一条时间线;针对时序数据的计算包括降采样、聚合(sum、count、max、min等)、插值等都基于时间线维度进行;
- 聚合( Aggregation):当同一个度量(Metric)的查询有多条时间线产生(多个指标采集设备),那么为了将空间的多维数据展现为成同一条时间线,需要进行合并计算。
- 降采样(Downsampling):当查询的时间区间跨度较长而原始数据时间精度较细时,为了满足业务需求的场景、提升查询效率,就会降低数据的查询展现精度,这就叫做降采样,比如按秒采集一年的数据,按照天级别查询展现。
TSDB 技术支持与语法
参考文档:调用SDK写入数据_时间序列数据库 TSDB-阿里云帮助中心
客户端创建
TSDBConfig config = TSDBConfig
// 配置地址,第一个参数可以是 TSDB 的域名或 IP。第二个参数表示 TSDB 端口。
.address(host, port)
// user和password 表示用于用户认证的用户名和密码。TSDB用户可在实例管理控制台创建。如果实例未启用用户鉴权功能,则创建Config对象时无需调用basicAuth()方法。
.basicAuth(tsdbUser, basicPwd)
// 只读开关,默认为 false。当 readonly 设置为 true 时,异步写开关会被关闭。
.readonly(false)
// 网络连接池大小,默认为64。
.httpConnectionPool(connectPool)
// HTTP 等待时间,单位为秒,默认为90秒。
.httpConnectTimeout(timeOut)
// HTTP连接存在时间长度。单位为秒。默认为0,即不生效,为长连接。建议设置为一个合理值,服务端故障恢复后,客户端通过重新建立连接可达到服务端负载均衡。
.httpConnectionLiveTime(liveTime)
// IO 线程数,默认为1。
.ioThreadCount(1)
// 异步写开关。默认为 true。推荐异步写。
.asyncPut(true)
// 异步写相关,客户端缓冲队列长度,默认为10000。
.batchPutBufferSize(batchPutBufferSize)
// 异步写相关,缓冲队列消费线程数,默认为 1。
.batchPutConsumerThreadCount(batchPutConsumerThreadCount)
// 异步写相关,每次批次提交给客户端点的个数,默认为 500。
.batchPutSize(batchPutSize)
// 异步写相关,每次等待最大时间限制,单位为 ms,默认为 300。
.batchPutTimeLimit(batchPutTimeLimit)
// 异步写相关,写请求队列数,默认等于连接池数。可根据读写次数的比例进行配置。
.putRequestLimit(putRequestLimit)
//多值写入相关配置
.multiFieldBatchPutBufferSize(multiFieldBatchPutBufferSize)
.multiFieldBatchPutConsumerThreadCount(multiFieldBatchPutConsumerThreadCount)
// 流量限制,设置每秒最大提交 Point 的个数。
.maxTPS(maxTps)
// 多值写入回调函数
.listenMultiFieldBatchPut(multiFieldCallback)
// 单值写入回调函数
.listenBatchPut(callback)
.config();
// 创建客户端
TSDB tsdb = TSDBClientFactory.connect(config):
写入数据
// 构造 Point
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.tag("tagk2", "tagv2")
.tag("tagk3", "tagv3")
.timestamp(timestamp)
.value(123.456)
.build();
带回调可以获取:成功数、返回数和失败原因,不要在回调方法中做耗时操作;
同步写入
出于写入性能的考虑,同步写建议手动将数据点打包成一批数据,并且建议这批数据包含500~1000个数据点。此外,也建议多个线程并发进行提交;
空返回:tsdb.putSync(points)
摘要返回:tsdb.putSync(points, SummaryResult.class);
详情返回:tsdb.putSync(points, DetailsResult.class);
异步非阻塞写入
空返回:tsdb.put(points)
摘要返回:tsdb.put(points, BatchPutSummaryCallback.class);
详情返回:tsdb.put(points, BatchPutDetailsCallback.class)
查询数据:
同步查询
List<QueryResult> result = tsdb.query(query);
System.out.println("返回结果:" + result);
异步查询
QueryCallback callback = new QueryCallback() {
@Override
public void response(Query input, List<QueryResult> result) {
System.out.println("查询参数:" + input);
System.out.println("返回结果:" + result);
}
};
tsdb.query(query, callback);
Query 查询语法
语法如下
Query query = Query
.timeRange(startTime, endTime) // 设置查询时间条件
// 设置子查询1 简单查询
.sub(SubQuery.metric("hello").aggregator(Aggregator.AVG).tag("tagk1", "tagv1").build())
// 设置子查询2 简单查询
.sub(
SubQuery subQuery = SubQuery
.metric("test-metric")
.aggregator(Aggregator.AVG)
.downsample("60m-avg")
.tag("tagk1", "tagv1")
.tag("tagk2", "tagv2")
.build();
)
// 设置子查询3 复杂查询
.sub(
SubQuery subQuery = SubQuery
.metric("test-metric")
.aggregator(Aggregator.AVG)
.downsample("60m-avg")
.filter(Filter
.filter(FilterType.LiteralOr, "", "")
.build()
)
.build();
)
.build();
timeRange
左闭右闭,即包含开始事件,也包含结束事件;
FilterType 类别
LiteralOr:等于查询,或查询,类似 SQL 里的 IN 查询;
NotLiteralOr:等于查询,或查询,类似 SQL 里的 NOT IN 查询;
Wildcard:模糊匹配,类似 SQL 里的 like 查询;
Regexp:正则匹配;
aggregator 与 downsample
假设有以下数据,共四条时间线;
时间线计算方式 = Metric + Tags, 即 Metric + orgCode + portId + inOut;
time
数据组
Timestamp
orgCode
portId
inOut
value
19:10
1
1675077000000
200019
16604
8
20:10
2
1675080600000
200019
16604
12
21:10
3
1675084200000
200019
16601
IN
2
21:10
4
1675084200000
200019
16601
OUT
8
22:10
5
1675087800000
200019
16605
4
23:10
6
1675091400000
200019
16606
OUT
6
aggregator 聚合
**用于控制返回时间线的多少,即 **QueryResult 的条数;
例如查询:
{
"start": 1675077000000,
"end": 1675091400000,
"queries": [
{
"metric": "xx",
"aggregator": "none",
"tags": {
"orgCode": "200019"
}
}
]
}
会返回 5 组时间线,即 1、2 一组,3、4、5、6 各一组,时间线计算方式如上图所示;
{"tags": {"orgCode": "200019", "portId": "16604"},"dps": {1675077000000: 8.0, 1675080600000: 12.0}},
{"tags": {"orgCode": "200019","inOut": "IN", "portId": "16601"},"dps": {1675084200000: 2.0}},
{"tags": {"orgCode": "200019", "portId": "16605"},"dps": {1675087800000: 4.0}},
{"tags": {"orgCode": "200019","inOut": "OUT", "portId": "16606"},"dps": {1675091400000: 6.0}}
{"tags": {"orgCode": "200019","portId": "16601", "inOut": "OUT"},"dps": {1675084200000: 8.0}},
aggregator 的作用则是:将除查询 tag 外的其他时间线合并,当 Timestamp 相同时,则会用 aggregator 指定的算法计算,例如:
{
"start": 1675077000000,
"end": 1675091400000,
"queries": [
{
"metric": "xx",
"aggregator": "sum",
"tags": {
"orgCode": "200019"
}
}
]
}
返回 1 组时间线,因为 1、2、3、4、5 的 orgCode 都相同,会将 4 条时间合并,但 1675084200000 变成了 10,因为该时间点有两条数据,只是 tag 不同罢了,两条数据用了 sum:
{
"aggregateTags": [
"inOut",
"portId"
],
"dps": {
1675077000000: 8.0,
1675080600000: 12.0,
1675084200000: 10.0,
1675087800000: 4.0,
1675091400000: 6.0
},
"tags": {
"orgCode": "200019"
}
}
downsample 降采样
**用于控制返回时间线中数据点的多少,即 **dps 的数;
格式:
**[0all | (1)[ *s |m | h | d ]]* - **[*avg | sum | last | first | count | min | max .... ]*[**-none | null | zero ]
采样区间 区间操作 补全策略
采样区间计算方式:Timestamp - (Timestamp % Interval)
补全策略包括:
- none - 默认行为,在序列化期间不输出缺失值。- nan - 当序列中缺少所有值时,在序列化输出中输出NaN ,计算方式与 none 有类似。- null - 与NaN的行为相同,只是在序列化期间它会发出一个null,而不是一个NaN。- zero - 缺少时间戳时替换为零。零值将合并到汇总结果中。
例如:
{
"start": 1675077000000, // 19:10
"end": 1675091400000, // 23:10
"queries": [
{
"metric": "xx",
"aggregator": "sum",
"downsample": "2h-sum-none",
"tags": {
"orgCode": "200019"
}
}
]
}
返回 2 小时一个汇总,即 18:00、20:00、22:00,关于区间的计算上面已有解释:
{
"aggregateTags": [
"inOut",
"portId"
],
"dps": {
1675072800000: 8.0,
1675080000000: 14.0,
1675087200000: 10.0
},
"tags": {
"orgCode": "200019"
}
}
注:此处的 dps 开始时间 1675072800000 是 2023-01-30 18:00:00,为什么不是查询的开始时间,因为 tsdb 的采样区间算法是 Timestamp - (Timestamp % Interval),即** 2023-01-30 19:10:00 - (2023-01-30 19:10:00 % 2h)= **2023-01-30 18:00:00,所以当使用采样时,往往计算的结果不准确,原因在这里;
其他接口支持:
以下参考文档:TSDB如何调用其他接口_时间序列数据库 TSDB-阿里云帮助中心
设置数据时效
tsdb.ttl(time);
查询 Metric,TagKey,TagValue
参数:(类型,模糊值,查询条数)
tsdb.suggest(Suggest.Metrics,"hel",10);
tsdb.suggest(Suggest.Tagk,"hel",10);
tsdb.suggest(Suggest.TagV,"hel",10);
TagKey 查 TagValue
参数:(类型,模糊值,查询条数)
tsdb.dumpMeta("tagk1","tagv1",10);
删除一段时间的数据
tsdb.deleteData("hello", startTime, nowTime);
删除指定时间线
Timeline timeline =Timeline.metric("hello").tag("tagk1","tagv1").build();
tsdb.deleteMeta(timeline);
清空所有表
tsdb.truncate()
版权归原作者 An_Illuion 所有, 如有侵权,请联系我们删除。