系列文章目录
1:seatunnel 消费kafka数据写入clickhouse
文章目录
前言
本文使用
seatunnel 消费kafka数据写入clickhouse文章的kafka topic以及格式,用另一种方式写入clickhouse,也是练习下clickhouse kafka引擎。
本文默认已安装了kafka与clickhouse,这方面的安装文档很多,这里不做详述;
前提准备 kafka :2.7.0 ; topic: filebeat_**** ;通过filebeat 写入kafka
clickhouse: 22.6.3.35 。
一、kafka数据格式
topic:filebeat_****
使用kafka 命令查看数据格式:
/bin/kafka-console-consumer.sh --bootstrap-server kafka001:9092,kafka002:9092,kafka003:9092 --topic filebeat_****
{
"@timestamp": "2022-12-15T09:04:54.870Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.13.1"
},
"input": {
"type": "log"
},
"fields": {
"fields_under_root": true,
"filetype": "heart"
},
"agent": {
"type": "filebeat",
"version": "7.13.1",
"hostname": "hostname",
"ephemeral_id": "e2b48c2b-4459-4310-ae6d-396a9494c536",
"id": "72eafd44-b71d-452f-bdb5-b986d2a12c15",
"name": "hd.n12"
},
"ecs": {
"version": "1.8.0"
},
"host": {
"name": "hostname"
},
"log": {
"offset": 62571676,
"file": {
"path": "/opt/servers/tomcatServers/tomcat-*/t2/webapps/recordlogs/****.log"
}
},
"message": "[2022-12-15 17:04:54] [DataLog(10)] [ConsumeMessageThread_3] [INFO]-20000 人名 桌面 四川省 德阳市 旌阳区 0000936861276195024 7380936861276195024 738 104.432322 31.157213 null 中国四川省某某市某某区某某路 110.189.206.* 1 1 1671095093998 1671095094013 \"B2Q15-301\" 00:a5:0a:00:3e:42 -49 中国标准时间 1671095095916 V2.0.2_202109021711"
}
主要是message区域,[2022-12-15 17:04:54] 为日志上传时间,[INFO]- 之后区域 以\t分割,结构按固定的顺序依次
`userid` String,
`username` String,
`app_name` String,
`province_name` String,
`city_name` String,
`district_name` String,
`code` String,
`real_code` String,
`product_name` String,
`longitude` String,
`latitude` String,
`rd` String,
`address` String,
`ip` String,
`screenlight` String,
`screenlock` String,
`hearttime` String,
`time` String,
`ssid` String,
`mac_address` String,
`rssi` String,
`timezone` String,
`current_time` String,
`program_version` String
二、clickhouse
1.创建kafka 引擎表
CREATE TABLE default.kafka_filebeat_hearts
(
message
String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka001:9092,kafka002:9092,kafka003:9092',
kafka_topic_list = 'filebeat_****',
kafka_group_name = 'consumer_group_clickhouse_filebeat_***',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 1,
kafka_num_consumers = 2 ;
2.创建clickhouse
MergeTree
表
CREATE TABLE default.tab_mt_results
(
`upload_time` DateTime64(3),
`userid` String,
`username` String,
`app_name` String,
`province_name` String,
`city_name` String,
`district_name` String,
`machine_no` String,
`real_machineno` String,
`product_name` String,
`longitude` String,
`latitude` String,
`rd` String,
`address` String,
`ip` String,
`screenlight` String,
`screenlock` String,
`hearttime` DateTime64(3),
`time` DateTime64(3),
`ssid` String,
`mac_address` String,
`rssi` String,
`timezone` String,
`machine_current_time` String,
`program_version` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(upload_time)
PRIMARY KEY (userid, machine_no)
ORDER BY (userid, machine_no)
TTL toDate(upload_time) + toIntervalMonth(1)
SETTINGS index_granularity = 8192 ;
以
upload_time
设置TTL 一个月过期时间。
3.创建kafka物化视图写入结构表
CREATE MATERIALIZED VIEW default.view_consumer_kafka2ck TO default.tab_mt_results
(
`upload_time` DateTime64(3),
`userid` String,
`username` String,
`app_name` String,
`province_name` String,
`city_name` String,
`district_name` String,
`machine_no` String,
`real_machineno` String,
`product_name` String,
`longitude` String,
`latitude` String,
`rd` String,
`address` String,
`ip` String,
`screenlight` String,
`screenlock` String,
`hearttime` DateTime64(3),
`time` DateTime64(3),
`ssid` String,
`mac_address` String,
`rssi` String,
`timezone` String,
`machine_current_time` String,
`program_version` String
) AS
SELECT
replaceAll(splitByString(']', splitByString('[INFO]-', message)[1])[1], '[', '') AS upload_time,
splitByString('\t', splitByString('[INFO]-', message)[2])[1] AS userid,
splitByString('\t', splitByString('[INFO]-', message)[2])[2] AS username,
splitByString('\t', splitByString('[INFO]-', message)[2])[3] AS app_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[4] AS province_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[5] AS city_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[6] AS district_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[7] AS machine_no,
splitByString('\t', splitByString('[INFO]-', message)[2])[8] AS real_machineno,
splitByString('\t', splitByString('[INFO]-', message)[2])[9] AS product_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[10] AS longitude,
splitByString('\t', splitByString('[INFO]-', message)[2])[11] AS latitude,
splitByString('\t', splitByString('[INFO]-', message)[2])[12] AS rd,
splitByString('\t', splitByString('[INFO]-', message)[2])[13] AS address,
splitByString('\t', splitByString('[INFO]-', message)[2])[14] AS ip,
splitByString('\t', splitByString('[INFO]-', message)[2])[15] AS screenlight,
splitByString('\t', splitByString('[INFO]-', message)[2])[16] AS screenlock,
CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[17], 'DateTime64') AS hearttime,
CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[18], 'DateTime64') AS time,
splitByString('\t', splitByString('[INFO]-', message)[2])[19] AS ssid,
splitByString('\t', splitByString('[INFO]-', message)[2])[20] AS mac_address,
splitByString('\t', splitByString('[INFO]-', message)[2])[21] AS rssi,
splitByString('\t', splitByString('[INFO]-', message)[2])[22] AS timezone,
splitByString('\t', splitByString('[INFO]-', message)[2])[23] AS machine_current_time,
splitByString('\t', splitByString('[INFO]-', message)[2])[24] AS program_version
FROM hive.kafka_filebeat_hearts;
1、message 过滤掉不符合格式的数据,避免出错;
2、
hearttime,time转换为dateTime64格式
三、问题
1、修改物化视图
第一次创建物化视图后,没有写入结果表,查看clickhouse日志,发现是解析失败导致,解决方式,修改物化视图,并且过滤掉不符合格式的数据;
DETACH TABLE default.view_consumer_kafka2ck;
ATTACH MATERIALIZED VIEW default.view_consumer_kafka2ck TO default.tab_mt_results
(
`upload_time` DateTime64(3),
`userid` String,
`username` String,
`app_name` String,
`province_name` String,
`city_name` String,
`district_name` String,
`machine_no` String,
`real_machineno` String,
`product_name` String,
`longitude` String,
`latitude` String,
`rd` String,
`address` String,
`ip` String,
`screenlight` String,
`screenlock` String,
`hearttime` DateTime64(3),
`time` DateTime64(3),
`ssid` String,
`mac_address` String,
`rssi` String,
`timezone` String,
`machine_current_time` String,
`program_version` String
) AS
SELECT
replaceAll(splitByString(']', splitByString('[INFO]-', message)[1])[1], '[', '') AS upload_time,
splitByString('\t', splitByString('[INFO]-', message)[2])[1] AS userid,
splitByString('\t', splitByString('[INFO]-', message)[2])[2] AS username,
splitByString('\t', splitByString('[INFO]-', message)[2])[3] AS app_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[4] AS province_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[5] AS city_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[6] AS district_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[7] AS machine_no,
splitByString('\t', splitByString('[INFO]-', message)[2])[8] AS real_machineno,
splitByString('\t', splitByString('[INFO]-', message)[2])[9] AS product_name,
splitByString('\t', splitByString('[INFO]-', message)[2])[10] AS longitude,
splitByString('\t', splitByString('[INFO]-', message)[2])[11] AS latitude,
splitByString('\t', splitByString('[INFO]-', message)[2])[12] AS rd,
splitByString('\t', splitByString('[INFO]-', message)[2])[13] AS address,
splitByString('\t', splitByString('[INFO]-', message)[2])[14] AS ip,
splitByString('\t', splitByString('[INFO]-', message)[2])[15] AS screenlight,
splitByString('\t', splitByString('[INFO]-', message)[2])[16] AS screenlock,
CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[17], 'DateTime64') AS hearttime,
CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[18], 'DateTime64') AS time,
splitByString('\t', splitByString('[INFO]-', message)[2])[19] AS ssid,
splitByString('\t', splitByString('[INFO]-', message)[2])[20] AS mac_address,
splitByString('\t', splitByString('[INFO]-', message)[2])[21] AS rssi,
splitByString('\t', splitByString('[INFO]-', message)[2])[22] AS timezone,
splitByString('\t', splitByString('[INFO]-', message)[2])[23] AS machine_current_time,
splitByString('\t', splitByString('[INFO]-', message)[2])[24] AS program_version
FROM hive.kafka_filebeat_hearts
WHERE message LIKE '%[INFO]-%' ;
查看结果表,可以看到结果表已经有数据了。
总结
kafka表引擎还是比较好用,主要是在解析message部分,将message拆分成固定的结构,其中常用的splitByString ,visitParamExtractRaw,可以查看
函数 | ClickHouse Docs
总之,clickhouse强大的第三方引擎,多看官方文档,必然可以熟练使用。
本人有多年的大数据经验,欢迎各位大咖随时交流学习。
版权归原作者 冰帆< 所有, 如有侵权,请联系我们删除。