0


利用 Vector 将 Kafka 中的日志数据高效写入 GreptimeDB

Kafka 是一款具备高吞吐量、高可靠性和高可扩展性的分布式消息队列,而 GreptimeDB 是专门用于存储时间序列数据的开源时序数据库。两者在各自的领域都表现出色,但如何高效地连接它们以实现数据的无缝传输和处理?

Vector 作为一个高速且可扩展的数据管道工具发挥了作用。它能够从多个来源(如应用日志、系统指标)收集、转换并传输数据,并将这些数据发送到不同的目标(如数据库、监控系统)。

而随着 GreptimeDB 现已全面支持日志数据的存储与分析,日志接收功能 greptime log sink 已被集成到 Vector 中,使用户可以通过 **

greptime_logs

sink** 将来自 Vector 的各种数据源轻松写入 GreptimeDB。详情和示例代码可参考此文:《Vector 增加 GreptimeDB 日志写入支持,连接数十种数据源》。

接下来,本文将详细介绍如何使用 Vector 从 Kafka 读取日志数据并将其写入 GreptimeDB,包括具体的实现步骤与示例代码。

准备工作

假设我们已经有一个 Kafka 集群,其中有一个名为

test_topic

的 topic,里面存储了日志数据。Kafka 中的示例数据内容如下:

127.0.0.1--[04/Sep/2024:15:46:13-0700]"GET / HTTP/1.1"200615"-""Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0"

接下来,我们需要安装 Vector 和 GreptimeDB。

安装 & 配置 Vector

Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。我们可以使用 Vector 从 Kafka 读取数据,并将数据写入 GreptimeDB。

安装 Vector 非常简单,可通过二进制容器等进行安装,具体安装步骤请参考 Vector 官方文档。

安装完成后,我们需要配置 Vector,使其能够从 Kafka 读取数据并写入 GreptimeDB。下面是一个简单的 Vector 配置文件:

[sources.mq]
type = "kafka"
group_id = "vector0"
topics = ["test_topic"]
bootstrap_servers = "kafka:9092"

[sinks.console]
type = "console"
inputs = [ "mq" ]
encoding.codec = "text"

[sinks.sink_greptime_logs]
type = "greptimedb_logs"
table = "demo_logs"
pipeline_name = "demo_pipeline"
compression = "gzip"
inputs = [ "mq" ]
endpoint = "http://greptimedb:4000"

上面的配置文件中,我们定义了一个名为

mq

的 source,用于从 Kafka 读取数据。我们还定义了一个名为

sink_greptime_logs

的 sink,用于将数据写入 GreptimeDB。

安装 & 配置 GreptimeDB

GreptimeDB 是一个开源的时序数据库,专门用于存储时间序列数据。我们可以使用 GreptimeDB 存储从 Kafka 读取的日志数据。

安装 GreptimeDB 同样非常简单,可通过二进制、容器等进行安装。具体安装步骤请参考 GreptimeDB 官方文档。

安装完成后,我们使用默认配置即可。因为日志数据多种多样,我们提供了 Pipeline 功能来处理和过滤日志数据,只保留日志中我们关心的数据,我们将在后续技术博客中分享 Pipeline 引擎的实现原理和方案步骤,敬请期待。

如下例子对我们提供的 nginx 日志格式进行了解析,我们使用如下所示的 Pipeline 配置文件。

processors:-dissect:fields:- message
      patterns:-'%{ip} - - [%{datetime}] "%{method} %{path} %{protocol}" %{status} %{size} "-" "%{user_agent}"'-date:fields:- datetime
      formats:-"%d/%b/%Y:%H:%M:%S %z"-date:fields:- timestamp
      formats:-"%Y-%m-%dT%H:%M:%S%.3fZ"transform:-fields:- ip
      - path
    type: string
  -fields:- method
      - protocol
    type: string
    index: tag
  -fields:- user_agent
    type: string
    index: fulltext
  -fields:- status
    type: uint32
    index: tag
  -fields:- size
    type: uint32
  -fields:- datetime
    type: timestamp
    index: timestamp
  -fields:- timestamp
    type: timestamp

在上面的 Pipeline 配置文件中,我们使用

dissect

processor 对日志数据进行解析。本来非结构化的日志数据,被拆分并进行格式转化后,获得了一个结构化的数据,包含

ip

datatime

method

path

protocol

status

size

user_agent

。然后使用

date

processor 对时间两个不同格式的时间字段进行解析。最后使用

transform

对字段进行转换,并设置 index。

关于

index

,我们指定了

method

protocol

status

为 tag 字段,主要用于高效的查询,一些不确定值的数量的字段,或者值的数量特别多的,不建议设置为 tag,这会导致高基问题。所以

ip

size

均没有被设置为 tag 字段。

path

user_agent

字段,我们增加了全文索引。以便可以使用模糊搜索来快速找到的关心的内容。详细的查询语法可参考此处。

上述配置文件可通过 HTTP 接口上传到 GreptimeDB 中,以创建一个名为

demo_pipeline

的 Pipeline 用于日志的解析与修剪,然后存入 GreptimeDB 中。

curl -X 'POST''http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v

运行 Vector & GreptimeDB

现在,我们已经准备好了 Vector 和 GreptimeDB,现在就可以运行它们了。成功后,Vector 将从 Kafka 读取数据,并将数据写入 GreptimeDB。

我们可以通过 MySQL 协议连接 GreptimeDB,查看数据。

mysql> show tables;
+-------------+
| Tables      |
+-------------+
| demo_logs   || numbers     |
+-------------+
3 rows inset(0.00 sec)

mysql>select * from demo_logs order by timestamp desc limit 10;
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
|ip| method | protocol | path     | user_agent                                                                                                                                                                                    | status | size | datetime            | timestamp                  |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
|37.254.223.207  | DELETE | HTTP/2.0 | /about   | Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)|201|495|2024-10-28 03:39:29 |2024-10-28 03:39:29.982000 ||113.26.47.170   | PUT    | HTTP/2.0 | /contact | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)|404|183|2024-10-28 03:39:26 |2024-10-28 03:39:26.977000 ||33.80.49.13     | PUT    | HTTP/2.0 | /about   | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11                                                                        |500|150|2024-10-28 03:39:23 |2024-10-28 03:39:23.973000 ||240.14.156.37   | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13                                                                 |200|155|2024-10-28 03:39:20 |2024-10-28 03:39:20.969000 ||210.90.39.41    | POST   | HTTP/2.0 | /about   | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)|201|188|2024-10-28 03:39:17 |2024-10-28 03:39:17.964000 ||219.88.194.150  | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)|404|704|2024-10-28 03:39:14 |2024-10-28 03:39:14.963000 ||130.255.0.241   | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)|500|816|2024-10-28 03:39:11 |2024-10-28 03:39:11.959000 ||168.144.155.215 | POST   | HTTP/1.1 | /        | Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5                                          |500|511|2024-10-28 03:39:08 |2024-10-28 03:39:08.954000 ||28.112.30.158   | GET    | HTTP/1.1 | /about   | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50|200|842|2024-10-28 03:39:05 |2024-10-28 03:39:05.950000 ||166.9.187.104   | GET    | HTTP/2.0 | /blog    | Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36                                                                                  |201|970|2024-10-28 03:39:02 |2024-10-28 03:39:02.946000 |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows inset(0.00 sec)

mysql> desc demo_logs;
+------------+---------------------+------+------+---------+---------------+
| Column     | Type                | Key  | Null | Default | Semantic Type |
+------------+---------------------+------+------+---------+---------------+
|ip| String              || YES  || FIELD         || method     | String              | PRI  | YES  || TAG           || protocol   | String              | PRI  | YES  || TAG           || path       | String              || YES  || FIELD         || user_agent | String              || YES  || FIELD         || status     | UInt32              | PRI  | YES  || TAG           || size       | UInt32              || YES  || FIELD         || datetime   | TimestampNanosecond | PRI  | NO   || TIMESTAMP     || timestamp  | TimestampNanosecond || YES  || FIELD         |
+------------+---------------------+------+------+---------+---------------+
9 rows inset(0.00 sec)

mysql> show create table demo_logs;
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table     | Create Table                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| demo_logs | CREATE TABLE IF NOT EXISTS `demo_logs`(`ip` STRING NULL,
  `method` STRING NULL,
  `protocol` STRING NULL,
  `path` STRING NULL FULLTEXT WITH(analyzer ='English', case_sensitive ='false'),
  `user_agent` STRING NULL FULLTEXT WITH(analyzer ='English', case_sensitive ='false'),
  `status` INT UNSIGNED NULL,
  `size` INT UNSIGNED NULL,
  `datetime` TIMESTAMP(9) NOT NULL,
  `timestamp` TIMESTAMP(9) NULL,
  TIME INDEX (`datetime`),
  PRIMARY KEY (`method`, `protocol`, `status`))ENGINE=mito
WITH(
  append_mode ='true')|
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row inset(0.00 sec)

现在我们的数据已经入库,可以利用 GreptimeDB 提供的一些功能来快速过滤我们关心的数据,比如通过全文搜索我们可以对

UA

进行模糊匹配,快速找到

UA

包含

Android

的数据。

mysql>SELECT*FROM demo_logs WHERE MATCHES(user_agent,'Android')limit10;+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+| ip              | method | protocol | path     | user_agent                                                                                                                                                         |status| size |datetime|timestamp|+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+|240.14.156.37|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13(KHTML,like Gecko) Version/4.0 Safari/534.13|200|155|2024-10-2803:39:20|2024-10-2803:39:20.969000||186.44.204.29|DELETE| HTTP/1.1|/| Opera/9.80(Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10|201|343|2024-10-2803:45:33|2024-10-2803:45:33.459000||75.246.111.167|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|869|2024-10-2803:38:59|2024-10-2803:38:59.942000||236.239.192.109|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 2.2.1; zh-cn; HTC_Wildfire_A3333 Build/FRG83D) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|500|892|2024-10-2803:38:53|2024-10-2803:38:53.934000||232.232.14.176|DELETE| HTTP/1.1|/contact | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|500|644|2024-10-2803:46:42|2024-10-2803:46:42.550000||135.16.130.172|DELETE| HTTP/2.0|/| MQQBrowser/26 Mozilla/5.0(Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|177|2024-10-2803:47:27|2024-10-2803:47:27.613000||69.23.7.123| GET    | HTTP/1.1|/blog    | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|201|770|2024-10-2803:45:09|2024-10-2803:45:09.425000||37.61.6.211| GET    | HTTP/1.1|/blog    | MQQBrowser/26 Mozilla/5.0(Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|298|2024-10-2803:45:21|2024-10-2803:45:21.442000||244.166.255.46| GET    | HTTP/2.0|/blog    | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|201|963|2024-10-2803:45:48|2024-10-2803:45:48.478000||35.169.107.238| GET    | HTTP/2.0|/blog    | Mozilla/5.0(Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1(KHTML,like Gecko) Version/4.0 Mobile Safari/533.1|404|249|2024-10-2803:46:48|2024-10-2803:46:48.558000|+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+10rowsinset(0.01 sec)

在进行一些统计工作或者问题排查的时候,经常性的需要区分用户的渠道和 url。例如,我们可能需要筛选出 Android 渠道的用户,并查看访问 blog 页面时的 HTTP 状态码分布。通过以下 SQL 查询,可以快速获取所需结果,显著减少数据处理时间。

mysql>SELECT method,status,count(*)FROM demo_logs WHERE MATCHES(user_agent,'Android')and MATCHES(path,'blog')groupby method,status;+--------+--------+----------+| method |status|COUNT(*)|+--------+--------+----------+| GET    |404|2|| GET    |201|3|| PUT    |500|2|| POST   |404|1|+--------+--------+----------+4rowsinset(0.01 sec)

我们已经将此过程打包成一个 docker compose 文件,欢迎前往 GitHub demo-scene repo 获取相关源码和指南:

https://github.com/GreptimeTeam/demo-scene/tree/main/kafka-ingestion

总结

本文介绍了如何利用 Vector 从 Kafka 读取日志数据并写入 GreptimeDB。Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。目前已支持 GreptimeDB 的 sink,可以很方便的将原先系统中的监控数据导入 GreptimeDB 中。

本文介绍了如何利用 Vector 工具将 Kafka 中的日志数据无缝传输至 GreptimeDB 中,充分利用 GreptimeDB 在存储和分析时序数据上的优势,以及 Vector 的灵活性让数据处理更加高效。

GreptimeDB 强大的日志存储和查询功能为日志分析提供了可靠保障,无论是构建日志管理系统,还是进行实时监控与分析,Kafka + Vector + GreptimeDB 的组合能够帮助用户实现高效的数据流转与处理

未来我们将进一步介绍如何通过 GreptimeDB 的 Pipeline 引擎实现更加复杂的日志处理和数据过滤,敬请期待!

11 月 9 日我们将在深圳举办「云平台及 AI 时代下的可观测性技术演进」线下沙龙,欢迎报名: https://1965290734055.huodongxing.com/event/6777706662000

关于 Greptime

Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。

欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~

Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb

官网:https://greptime.cn/
文档:https://docs.greptime.cn/
Twitter: https://twitter.com/Greptime
Slack: https://greptime.com/slack
LinkedIn: https://www.linkedin.com/company/greptime/

标签: 数据 Kafka Vector

本文转载自: https://blog.csdn.net/beary_tang/article/details/143333754
版权归原作者 Greptime 所有, 如有侵权,请联系我们删除。

“利用 Vector 将 Kafka 中的日志数据高效写入 GreptimeDB”的评论:

还没有评论