将数据从 Kafka 通过 Flink 处理并最终落到 Doris 的流程可以概括为以下几个步骤:
1. 数据生产与消费:
- Kafka 作为消息队列,接收和存储数据。这些数据可以由各种生产者(如应用程序、日志系统等)写入 Kafka 的主题(topic)。
- Flink 作为流处理框架,从 Kafka 的主题中消费数据。Flink 支持实时流处理,因此可以立即处理 Kafka 中的消息。
2. 数据处理:
- 在 Flink 中,Kafka 消费者(Kafka Source)会将数据流输入 Flink 的数据流引擎。Flink 可以对数据进行各种处理,如过滤、转换、聚合、窗口计算等。
- Flink 作业(Job)会定义如何处理这些数据流。例如,将原始数据解析为结构化的格式,对字段进行转换,或者应用复杂的计算逻辑。
3. 数据写入 Doris:
- Doris Sink: Flink 提供了将处理后的数据写入 Doris 的功能,这通常通过 Flink 的
Sink
机制实现。 - Batch vs. Streaming: 根据需要,Flink 可以选择以批处理或流式处理的方式将数据写入 Doris。通常,流式处理用于实时写入,而批处理适合定时批量写入。
- Data Mapping: Flink 作业会根据 Doris 表的 schema 将数据转换成 Doris 能够接受的格式。Doris 支持各种数据导入方式,如
INSERT
语句、流式导入(Stream Load)、Broker Load 等。Flink 会调用相应的 API 或 SQL 语句将数据写入 Doris。
4. 数据存储与查询:
- Doris 作为 OLAP 数据库,接收来自 Flink 的数据并存储在相应的表中。Doris 优化了对大规模数据的分析查询,支持快速的 SQL 查询。
- 数据一旦进入 Doris,用户可以通过 SQL 查询对数据进行分析、报表生成等操作。
脚本样例:
Flink 作业,将数据从 Kafka 读取并处理后,写入 Doris 数据库。下面是对脚本的详细解析:
- Flink 配置设置
set execution.target=yarn-per-job;
set yarn.application.name=kafka_to_doris;
set yarn.application.queue=root_default;
set execution.checkpointing.interval=3min;
- execution.target: 设置 Flink 作业的执行模式为
yarn-per-job
,即每个作业在 YARN 上独立运行。 - yarn.application.name: 设置 YARN 上应用的名称为
kafka_to_doris
。 - yarn.application.queue: 设置 YARN 作业的队列为
root_default
。 - execution.checkpointing.interval: 设置 Flink 的 checkpoint 间隔为 3 分钟,以确保数据的一致性和容错性。
- 创建 Catalog
CREATE CATALOG yarn_doris WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '' --hadoop中hive数据库目标地址
);
USE CATALOG yarn_doris;
- CREATE CATALOG: 创建一个名为 yarn_doris的 Catalog,并指定其类型为
hive
。这个 Catalog 可能是为了访问 Hive 元数据。 - USE CATALOG: 切换到 Catalog 下,以便之后的操作在此 Catalog 中进行。
- 创建 Kafka Source 表
CREATE TABLE IF NOT EXISTS fdb.table_doris (
`gs_gd_json_string` string
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic', -- kafka的topic 主题
'properties.bootstrap.servers' = '', -- 集群地址:端口(三个节点都需要)
'properties.group.id' = '',
'scan.startup.mode' = 'latest-offset',
'format' = 'raw',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="your_username" password="your_password";'
);
- CREATE TABLE: 创建一个 Flink 表
fdb.table_doris
,它从 Kafka 主题中读取数据。 - gs_gd_json_string: 定义表的一个字段,类型为
string
,存储 Kafka 消息的内容。 - Kafka 配置: - properties.bootstrap.servers: Kafka 集群的地址(需要填入实际的服务器地址)。- properties.group.id: 消费组的 ID。- scan.startup.mode: 设置从 Kafka 消费的起始位置为
latest-offset
,即从最新的数据开始消费。- format: 设置消息格式为raw
,表示直接读取消息内容。- SASL 配置: 配置 Kafka 的安全性参数,使用SASL_PLAINTEXT
和SCRAM-SHA-512
机制进行认证。- username 和 password 是连接 Kafka 时所需的凭据。
- 创建 Doris Sink 表
CREATE TABLE IF NOT EXISTS fdb._doris_sink (
acct_date date,
kafka_id varchar(40),
data_infos string,
insert_date varchar(20)
)
WITH (
'connector' = 'doris',
'fenodes' = '',
'table.identifier' = 'database.table', -- 目标数据库.表
'username' = '', -- 数据库用户名
'password' = '', -- 密码
'sink.label-prefix' = 'doris_label' -- 标签具有唯一性
);
字段定义:
- acct_date: 存储当前日期。
- kafka_id: 存储每条数据的唯一 ID(通过
uuid()
生成)。 - data_infos: 存储处理后的 JSON 数据。
- insert_date: 存储数据插入时间。
Doris 配置:
- connector: 指定连接器为
doris
。 - fenodes: Doris 的 FE 节点地址(需要填入实际的地址)。
- table.identifier: 指定目标表在 Doris 中的标识符。
- username/password: 用于连接 Doris 的凭证。
- sink.label-prefix: 为每个导入任务指定一个标签前缀,以标识导入作业。
- 数据插入操作
insert into fdb.table_doris
select
CURRENT_DATE,
uuid(),
replace(gs_gd_json_string,chr(10),';') as gs_gd_json_string,
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')
fromfdb._doris_sink;
select 语句:
- CURRENT_DATE: 获取当前日期并插入
acct_date
字段。 - uuid(): 生成唯一标识符并插入
kafka_id
字段。 - replace: 对从 Kafka 读取的 JSON 字符串中的换行符(
chr(10)
)进行替换,将其替换为;
,并插入data_infos
字段。 - DATE_FORMAT: 获取当前时间并格式化为
yyyy-MM-dd HH:mm:ss
,插入insert_date
字段。
总结
整个流程的关键在于利用 Flink 的流处理能力,将 Kafka 中实时生成的数据高效处理后,直接导入 Doris 中,以便支持后续的分析和查询工作。通过这种集成,可以实现高效的实时数据分析平台,支持海量数据的处理和快速响应的业务需求。
版权归原作者 WnHj 所有, 如有侵权,请联系我们删除。