0


使用clickhouse kafka表引擎消费kafka写入clickhouse

系列文章目录

1:seatunnel 消费kafka数据写入clickhouse


文章目录

前言

本文使用

seatunnel 消费kafka数据写入clickhouse文章的kafka topic以及格式,用另一种方式写入clickhouse,也是练习下clickhouse kafka引擎。

本文默认已安装了kafka与clickhouse,这方面的安装文档很多,这里不做详述;

前提准备 kafka :2.7.0 ; topic: filebeat_**** ;通过filebeat 写入kafka

           clickhouse: 22.6.3.35 。

一、kafka数据格式

topic:filebeat_****

使用kafka 命令查看数据格式:

/bin/kafka-console-consumer.sh --bootstrap-server kafka001:9092,kafka002:9092,kafka003:9092 --topic filebeat_****

{
    "@timestamp": "2022-12-15T09:04:54.870Z", 
    "@metadata": {
        "beat": "filebeat", 
        "type": "_doc", 
        "version": "7.13.1"
    }, 
    "input": {
        "type": "log"
    }, 
    "fields": {
        "fields_under_root": true, 
        "filetype": "heart"
    }, 
    "agent": {
        "type": "filebeat", 
        "version": "7.13.1", 
        "hostname": "hostname", 
        "ephemeral_id": "e2b48c2b-4459-4310-ae6d-396a9494c536", 
        "id": "72eafd44-b71d-452f-bdb5-b986d2a12c15", 
        "name": "hd.n12"
    }, 
    "ecs": {
        "version": "1.8.0"
    }, 
    "host": {
        "name": "hostname"
    }, 
    "log": {
        "offset": 62571676, 
        "file": {
            "path": "/opt/servers/tomcatServers/tomcat-*/t2/webapps/recordlogs/****.log"
        }
    }, 
    "message": "[2022-12-15 17:04:54] [DataLog(10)] [ConsumeMessageThread_3] [INFO]-20000    人名    桌面    四川省    德阳市    旌阳区    0000936861276195024    7380936861276195024    738    104.432322    31.157213    null    中国四川省某某市某某区某某路    110.189.206.*    1    1    1671095093998    1671095094013    \"B2Q15-301\"    00:a5:0a:00:3e:42    -49    中国标准时间    1671095095916    V2.0.2_202109021711"
}

主要是message区域,[2022-12-15 17:04:54] 为日志上传时间,[INFO]- 之后区域 以\t分割,结构按固定的顺序依次

    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `code` String,
    `real_code` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` String,
    `time` String,
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `current_time` String,
    `program_version` String

二、clickhouse

1.创建kafka 引擎表

CREATE TABLE default.kafka_filebeat_hearts
(
message String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka001:9092,kafka002:9092,kafka003:9092',
kafka_topic_list = 'filebeat_****',
kafka_group_name = 'consumer_group_clickhouse_filebeat_***',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 1,
kafka_num_consumers = 2 ;

2.创建clickhouse

MergeTree

CREATE TABLE default.tab_mt_results
(
    `upload_time` DateTime64(3),
    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `machine_no` String,
    `real_machineno` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` DateTime64(3),
    `time` DateTime64(3),
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `machine_current_time` String,
    `program_version` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(upload_time)
PRIMARY KEY (userid, machine_no)
ORDER BY (userid, machine_no)
TTL toDate(upload_time) + toIntervalMonth(1)
SETTINGS index_granularity = 8192 ;

upload_time

设置TTL 一个月过期时间。

3.创建kafka物化视图写入结构表

CREATE MATERIALIZED VIEW default.view_consumer_kafka2ck TO default.tab_mt_results
(
    `upload_time` DateTime64(3),
    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `machine_no` String,
    `real_machineno` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` DateTime64(3),
    `time` DateTime64(3),
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `machine_current_time` String,
    `program_version` String
) AS
SELECT
    replaceAll(splitByString(']', splitByString('[INFO]-', message)[1])[1], '[', '') AS upload_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[1] AS userid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[2] AS username,
    splitByString('\t', splitByString('[INFO]-', message)[2])[3] AS app_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[4] AS province_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[5] AS city_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[6] AS district_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[7] AS machine_no,
    splitByString('\t', splitByString('[INFO]-', message)[2])[8] AS real_machineno,
    splitByString('\t', splitByString('[INFO]-', message)[2])[9] AS product_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[10] AS longitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[11] AS latitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[12] AS rd,
    splitByString('\t', splitByString('[INFO]-', message)[2])[13] AS address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[14] AS ip,
    splitByString('\t', splitByString('[INFO]-', message)[2])[15] AS screenlight,
    splitByString('\t', splitByString('[INFO]-', message)[2])[16] AS screenlock,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[17], 'DateTime64') AS hearttime,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[18], 'DateTime64') AS time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[19] AS ssid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[20] AS mac_address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[21] AS rssi,
    splitByString('\t', splitByString('[INFO]-', message)[2])[22] AS timezone,
    splitByString('\t', splitByString('[INFO]-', message)[2])[23] AS machine_current_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[24] AS program_version
FROM hive.kafka_filebeat_hearts;

1、message 过滤掉不符合格式的数据,避免出错;

2、

hearttime,time转换为dateTime64格式

三、问题

1、修改物化视图

第一次创建物化视图后,没有写入结果表,查看clickhouse日志,发现是解析失败导致,解决方式,修改物化视图,并且过滤掉不符合格式的数据;

 DETACH TABLE default.view_consumer_kafka2ck;
 
ATTACH MATERIALIZED VIEW default.view_consumer_kafka2ck TO default.tab_mt_results
(
    `upload_time` DateTime64(3),
    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `machine_no` String,
    `real_machineno` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` DateTime64(3),
    `time` DateTime64(3),
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `machine_current_time` String,
    `program_version` String
) AS
SELECT
    replaceAll(splitByString(']', splitByString('[INFO]-', message)[1])[1], '[', '') AS upload_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[1] AS userid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[2] AS username,
    splitByString('\t', splitByString('[INFO]-', message)[2])[3] AS app_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[4] AS province_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[5] AS city_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[6] AS district_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[7] AS machine_no,
    splitByString('\t', splitByString('[INFO]-', message)[2])[8] AS real_machineno,
    splitByString('\t', splitByString('[INFO]-', message)[2])[9] AS product_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[10] AS longitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[11] AS latitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[12] AS rd,
    splitByString('\t', splitByString('[INFO]-', message)[2])[13] AS address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[14] AS ip,
    splitByString('\t', splitByString('[INFO]-', message)[2])[15] AS screenlight,
    splitByString('\t', splitByString('[INFO]-', message)[2])[16] AS screenlock,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[17], 'DateTime64') AS hearttime,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[18], 'DateTime64') AS time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[19] AS ssid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[20] AS mac_address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[21] AS rssi,
    splitByString('\t', splitByString('[INFO]-', message)[2])[22] AS timezone,
    splitByString('\t', splitByString('[INFO]-', message)[2])[23] AS machine_current_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[24] AS program_version
FROM hive.kafka_filebeat_hearts
WHERE message LIKE '%[INFO]-%' ;

查看结果表,可以看到结果表已经有数据了。


总结

kafka表引擎还是比较好用,主要是在解析message部分,将message拆分成固定的结构,其中常用的splitByString ,visitParamExtractRaw,可以查看

函数 | ClickHouse Docs

总之,clickhouse强大的第三方引擎,多看官方文档,必然可以熟练使用。

本人有多年的大数据经验,欢迎各位大咖随时交流学习。


本文转载自: https://blog.csdn.net/flye/article/details/128332189
版权归原作者 冰帆< 所有, 如有侵权,请联系我们删除。

“使用clickhouse kafka表引擎消费kafka写入clickhouse”的评论:

还没有评论