Kafka 是一款具备高吞吐量、高可靠性和高可扩展性的分布式消息队列,而 GreptimeDB 是专门用于存储时间序列数据的开源时序数据库。两者在各自的领域都表现出色,但如何高效地连接它们以实现数据的无缝传输和处理?
Vector 作为一个高速且可扩展的数据管道工具发挥了作用。它能够从多个来源(如应用日志、系统指标)收集、转换并传输数据,并将这些数据发送到不同的目标(如数据库、监控系统)。
而随着 GreptimeDB 现已全面支持日志数据的存储与分析,日志接收功能 greptime log sink 已被集成到 Vector 中,使用户可以通过 **
greptime_logs
sink** 将来自 Vector 的各种数据源轻松写入 GreptimeDB。详情和示例代码可参考此文:《Vector 增加 GreptimeDB 日志写入支持,连接数十种数据源》。
接下来,本文将详细介绍如何使用 Vector 从 Kafka 读取日志数据并将其写入 GreptimeDB,包括具体的实现步骤与示例代码。
准备工作
假设我们已经有一个 Kafka 集群,其中有一个名为
test_topic
的 topic,里面存储了日志数据。Kafka 中的示例数据内容如下:
127.0.0.1--[04/Sep/2024:15:46:13-0700]"GET / HTTP/1.1"200615"-""Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0"
接下来,我们需要安装 Vector 和 GreptimeDB。
安装 & 配置 Vector
Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。我们可以使用 Vector 从 Kafka 读取数据,并将数据写入 GreptimeDB。
安装 Vector 非常简单,可通过二进制容器等进行安装,具体安装步骤请参考 Vector 官方文档。
安装完成后,我们需要配置 Vector,使其能够从 Kafka 读取数据并写入 GreptimeDB。下面是一个简单的 Vector 配置文件:
[sources.mq]
type = "kafka"
group_id = "vector0"
topics = ["test_topic"]
bootstrap_servers = "kafka:9092"
[sinks.console]
type = "console"
inputs = [ "mq" ]
encoding.codec = "text"
[sinks.sink_greptime_logs]
type = "greptimedb_logs"
table = "demo_logs"
pipeline_name = "demo_pipeline"
compression = "gzip"
inputs = [ "mq" ]
endpoint = "http://greptimedb:4000"
上面的配置文件中,我们定义了一个名为
mq
的 source,用于从 Kafka 读取数据。我们还定义了一个名为
sink_greptime_logs
的 sink,用于将数据写入 GreptimeDB。
安装 & 配置 GreptimeDB
GreptimeDB 是一个开源的时序数据库,专门用于存储时间序列数据。我们可以使用 GreptimeDB 存储从 Kafka 读取的日志数据。
安装 GreptimeDB 同样非常简单,可通过二进制、容器等进行安装。具体安装步骤请参考 GreptimeDB 官方文档。
安装完成后,我们使用默认配置即可。因为日志数据多种多样,我们提供了 Pipeline 功能来处理和过滤日志数据,只保留日志中我们关心的数据,我们将在后续技术博客中分享 Pipeline 引擎的实现原理和方案步骤,敬请期待。
如下例子对我们提供的 nginx 日志格式进行了解析,我们使用如下所示的 Pipeline 配置文件。
processors:-dissect:fields:- message
patterns:-'%{ip} - - [%{datetime}] "%{method} %{path} %{protocol}" %{status} %{size} "-" "%{user_agent}"'-date:fields:- datetime
formats:-"%d/%b/%Y:%H:%M:%S %z"-date:fields:- timestamp
formats:-"%Y-%m-%dT%H:%M:%S%.3fZ"transform:-fields:- ip
- path
type: string
-fields:- method
- protocol
type: string
index: tag
-fields:- user_agent
type: string
index: fulltext
-fields:- status
type: uint32
index: tag
-fields:- size
type: uint32
-fields:- datetime
type: timestamp
index: timestamp
-fields:- timestamp
type: timestamp
在上面的 Pipeline 配置文件中,我们使用
dissect
processor 对日志数据进行解析。本来非结构化的日志数据,被拆分并进行格式转化后,获得了一个结构化的数据,包含
ip
、
datatime
、
method
、
path
、
protocol
、
status
、
size
和
user_agent
。然后使用
date
processor 对时间两个不同格式的时间字段进行解析。最后使用
transform
对字段进行转换,并设置 index。
关于
index
,我们指定了
method
、
protocol
、
status
为 tag 字段,主要用于高效的查询,一些不确定值的数量的字段,或者值的数量特别多的,不建议设置为 tag,这会导致高基问题。所以
ip
和
size
均没有被设置为 tag 字段。
在
path
和
user_agent
字段,我们增加了全文索引。以便可以使用模糊搜索来快速找到的关心的内容。详细的查询语法可参考此处。
上述配置文件可通过 HTTP 接口上传到 GreptimeDB 中,以创建一个名为
demo_pipeline
的 Pipeline 用于日志的解析与修剪,然后存入 GreptimeDB 中。
curl -X 'POST''http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v
运行 Vector & GreptimeDB
现在,我们已经准备好了 Vector 和 GreptimeDB,现在就可以运行它们了。成功后,Vector 将从 Kafka 读取数据,并将数据写入 GreptimeDB。
我们可以通过 MySQL 协议连接 GreptimeDB,查看数据。
mysql> show tables;
+-------------+
| Tables |
+-------------+
| demo_logs || numbers |
+-------------+
3 rows inset(0.00 sec)
mysql>select * from demo_logs order by timestamp desc limit 10;
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
|ip| method | protocol | path | user_agent | status | size | datetime | timestamp |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
|37.254.223.207 | DELETE | HTTP/2.0 | /about | Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)|201|495|2024-10-28 03:39:29 |2024-10-28 03:39:29.982000 ||113.26.47.170 | PUT | HTTP/2.0 | /contact | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)|404|183|2024-10-28 03:39:26 |2024-10-28 03:39:26.977000 ||33.80.49.13 | PUT | HTTP/2.0 | /about | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11 |500|150|2024-10-28 03:39:23 |2024-10-28 03:39:23.973000 ||240.14.156.37 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13 |200|155|2024-10-28 03:39:20 |2024-10-28 03:39:20.969000 ||210.90.39.41 | POST | HTTP/2.0 | /about | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)|201|188|2024-10-28 03:39:17 |2024-10-28 03:39:17.964000 ||219.88.194.150 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)|404|704|2024-10-28 03:39:14 |2024-10-28 03:39:14.963000 ||130.255.0.241 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)|500|816|2024-10-28 03:39:11 |2024-10-28 03:39:11.959000 ||168.144.155.215 | POST | HTTP/1.1 | / | Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5 |500|511|2024-10-28 03:39:08 |2024-10-28 03:39:08.954000 ||28.112.30.158 | GET | HTTP/1.1 | /about | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50|200|842|2024-10-28 03:39:05 |2024-10-28 03:39:05.950000 ||166.9.187.104 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36 |201|970|2024-10-28 03:39:02 |2024-10-28 03:39:02.946000 |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows inset(0.00 sec)
mysql> desc demo_logs;
+------------+---------------------+------+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+---------------------+------+------+---------+---------------+
|ip| String || YES || FIELD || method | String | PRI | YES || TAG || protocol | String | PRI | YES || TAG || path | String || YES || FIELD || user_agent | String || YES || FIELD || status | UInt32 | PRI | YES || TAG || size | UInt32 || YES || FIELD || datetime | TimestampNanosecond | PRI | NO || TIMESTAMP || timestamp | TimestampNanosecond || YES || FIELD |
+------------+---------------------+------+------+---------+---------------+
9 rows inset(0.00 sec)
mysql> show create table demo_logs;
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| demo_logs | CREATE TABLE IF NOT EXISTS `demo_logs`(`ip` STRING NULL,
`method` STRING NULL,
`protocol` STRING NULL,
`path` STRING NULL FULLTEXT WITH(analyzer ='English', case_sensitive ='false'),
`user_agent` STRING NULL FULLTEXT WITH(analyzer ='English', case_sensitive ='false'),
`status` INT UNSIGNED NULL,
`size` INT UNSIGNED NULL,
`datetime` TIMESTAMP(9) NOT NULL,
`timestamp` TIMESTAMP(9) NULL,
TIME INDEX (`datetime`),
PRIMARY KEY (`method`, `protocol`, `status`))ENGINE=mito
WITH(
append_mode ='true')|
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row inset(0.00 sec)
现在我们的数据已经入库,可以利用 GreptimeDB 提供的一些功能来快速过滤我们关心的数据,比如通过全文搜索我们可以对
UA
进行模糊匹配,快速找到
UA
包含
Android
的数据。
mysql>SELECT*FROM demo_logs WHERE MATCHES(user_agent,'Android')limit10;+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+| ip | method | protocol | path | user_agent |status| size |datetime|timestamp|+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+|240.14.156.37|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13(KHTML,like Gecko) Version/4.0 Safari/534.13|200|155|2024-10-2803:39:20|2024-10-2803:39:20.969000||186.44.204.29|DELETE| HTTP/1.1|/| Opera/9.80(Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10|201|343|2024-10-2803:45:33|2024-10-2803:45:33.459000||75.246.111.167|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|869|2024-10-2803:38:59|2024-10-2803:38:59.942000||236.239.192.109|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 2.2.1; zh-cn; HTC_Wildfire_A3333 Build/FRG83D) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|500|892|2024-10-2803:38:53|2024-10-2803:38:53.934000||232.232.14.176|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|500|644|2024-10-2803:46:42|2024-10-2803:46:42.550000||135.16.130.172|DELETE| HTTP/2.0|/| MQQBrowser/26 Mozilla/5.0(Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|177|2024-10-2803:47:27|2024-10-2803:47:27.613000||69.23.7.123| GET | HTTP/1.1|/blog | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|201|770|2024-10-2803:45:09|2024-10-2803:45:09.425000||37.61.6.211| GET | HTTP/1.1|/blog | MQQBrowser/26 Mozilla/5.0(Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|298|2024-10-2803:45:21|2024-10-2803:45:21.442000||244.166.255.46| GET | HTTP/2.0|/blog | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|201|963|2024-10-2803:45:48|2024-10-2803:45:48.478000||35.169.107.238| GET | HTTP/2.0|/blog | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|249|2024-10-2803:46:48|2024-10-2803:46:48.558000|+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+10rowsinset(0.01 sec)
在进行一些统计工作或者问题排查的时候,经常性的需要区分用户的渠道和 url。例如,我们可能需要筛选出 Android 渠道的用户,并查看访问 blog 页面时的 HTTP 状态码分布。通过以下 SQL 查询,可以快速获取所需结果,显著减少数据处理时间。
mysql>SELECT method,status,count(*)FROM demo_logs WHERE MATCHES(user_agent,'Android')and MATCHES(path,'blog')groupby method,status;+--------+--------+----------+| method |status|COUNT(*)|+--------+--------+----------+| GET |404|2|| GET |201|3|| PUT |500|2|| POST |404|1|+--------+--------+----------+4rowsinset(0.01 sec)
我们已经将此过程打包成一个 docker compose 文件,欢迎前往 GitHub demo-scene repo 获取相关源码和指南:
https://github.com/GreptimeTeam/demo-scene/tree/main/kafka-ingestion
总结
本文介绍了如何利用 Vector 从 Kafka 读取日志数据并写入 GreptimeDB。Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。目前已支持 GreptimeDB 的 sink,可以很方便的将原先系统中的监控数据导入 GreptimeDB 中。
本文介绍了如何利用 Vector 工具将 Kafka 中的日志数据无缝传输至 GreptimeDB 中,充分利用 GreptimeDB 在存储和分析时序数据上的优势,以及 Vector 的灵活性让数据处理更加高效。
GreptimeDB 强大的日志存储和查询功能为日志分析提供了可靠保障,无论是构建日志管理系统,还是进行实时监控与分析,Kafka + Vector + GreptimeDB 的组合能够帮助用户实现高效的数据流转与处理。
未来我们将进一步介绍如何通过 GreptimeDB 的 Pipeline 引擎实现更加复杂的日志处理和数据过滤,敬请期待!
11 月 9 日我们将在深圳举办「云平台及 AI 时代下的可观测性技术演进」线下沙龙,欢迎报名: https://1965290734055.huodongxing.com/event/6777706662000
关于 Greptime
Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。
欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~
Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb
官网:https://greptime.cn/
文档:https://docs.greptime.cn/
Twitter: https://twitter.com/Greptime
Slack: https://greptime.com/slack
LinkedIn: https://www.linkedin.com/company/greptime/
版权归原作者 Greptime 所有, 如有侵权,请联系我们删除。