1.你如何优化JDBC连接池配置以提高性能?
首先,了解ETL项目的具体需求,包括数据量大小、处理频率、并发用户数等
根据状况确定合适的连接池大小。
根据这些情况 使用HikariCP
我们设置初始化连接数initialSize为源数据库连接数的20%
设置最大连接数maxTotal为源数据库连接数的50%
设置最小空闲连接数minIdle为源数据库连接数的10%,确保在低峰时段也有足够空闲连接
设置连接超时的时间connectionTimeout为30秒
设置连接回收策略softMinEvictableIdleTimeMillis为10分钟,允许连接在空闲10分钟后被回收。
通过这些我们在ETL项目中可以保证数据访问的高效性和稳定性。
2.你选择使用哪种集成数据源管理工具?为什么选择这个工具?
在负责ETL项目的数据源管理时,我们选择了DataX作为基础工具,并且基于DataX开发了DBSwitch
因为datax是这样的,datax其实只是跑任务的,基于job的Json去跑一个同步任务,但是它并不会对我们整个的数据源做一套管理。所以说我们这块就开发了一个数据源管理模块,也就是我们需要知道类似于跟never cat的一个PC版一样,可以输入IP,输入数据库的类型,输入这些信息,我能测试连接、库表预览、数据预览等要能做这些东西。实际上这一块是难点之一,比如说对数据源管理的话,我们是以插件式的形式去做的,比如说我如果现在想要集成一个 Es我需要写好es的读执行器,es的写执行器,es转换执行器,以插件式的形式来去简化我们的工作,这一块我们也是参考电子插的设计架构来的。
3.你如何处理数据清洗过程中遇到的常见问题,例如不完整的值、重复记录和异常值
A. 不完整的数据,其特征是是一些应该有的信息缺失,如供应商的名称,分公司的名称,客户的区域信息缺失、业务系统中主表与明细表不能匹配等。需要将这一类数据过滤出来,按缺失的内容分别写入不同Excel文件向客户提交,要求在规定的时间内补全。补全后才写入数据仓库。
解决措施
根据缺失的内容,将不同类型的缺失数据分别写入不同的Excel文件中。每个文件应标明缺失的内容类别,并准备好模板提醒客户填补缺失信息。
定期检查补全后的数据,确保完整后再将数据写入数据仓库,以保证数据质量。
B. 错误的数据,产生原因是业务系统不够健全,在接收输入后没有进行判断直接写入后台数据库造成的,比如数值数据输成全角数字字符、字符串数据后面有一个回车、日期格式不正确、日期越界等。这一类数据也要分类,对于类似于全角字符、数据前后有不面见字符的问题只能写SQL的方式找出来,然后要求客户在业务系统修正之后抽取;日期格式不正确的或者是日期越界的这一类错误会导致ETL运行失败,这一类错误需要去业务系统数据库用SQL的方式挑出来,交给业务主管部门要求限期修正,修正之后再抽取。
解决措施
使用SQL查询或脚本检查数据库中的错误数据,比如全角字符、不可见字符、日期格式错误、日期越界等。
将检测到的错误数据分为不同的类别:可自动修正的(如全角字符、不可见字符)、需要业务系统修正的(如日期格式错误、日期越界)
C. 重复的数据,特别是维表中比较常见,将重复的数据的记录所有字段导出来,让客户确认并整理。
解决措施
编写SQL查询或脚本从维表或其他数据表中识别重复的记录,包括所有字段。
将识别到的重复数据导出,并提交给客户确认和整理。确保客户确认后的数据是最新和准确的版本
4.你使用哪些技术或工具来进行数据转换?请举例说明你如何将数据从一种格式转换为另一种格式
1.方法SQL查询:
对于关系型数据库中的数据,我经常使用SQL查询来进行数据转换。例如,我可以使用SELECT语句结合CASE或COALESCE函数来转换列的数据类型或填补缺失值
2.使用工具
主要使用了Apache Flink,这是一个功能强大的流处理框架,同时也支持批处理。Flink提供了丰富的API和算子,可以轻松地实现复杂的数据转换逻辑。以下是我使用Flink进行数据转换的一些关键点
像flink的话这一块,我们用它一个 cdh做了一个和在现场实现的功能,就是能对卡夫卡的数据,然后做一个布控比对的操作产生告警,主要做了这一块
5.如何设计数据加载过程以确保高效性和准确性?请谈谈你使用的批量加载和增量加载策略?
使用了Apache Flink的Change Data Capture (CDC) Connector来实现数据的高效和准确加载
流程
根据目标数据库(如MySQL、PostgreSQL等),我选择了相应的Flink CDC Connector,这些连接器能够捕获数据库变更并实时地将变更数据输出
对于初始数据加载,我使用Flink的批量加载策略。这涉及到从数据库中导出全量数据,并通过Flink进行一次性的数据加载。为了提高效率,我会利用Flink的并行执行和状态管理能力来加速全量数据的处理
在完成初始批量加载后,我使用Flink CDC Connector进行增量数据加载。CDC Connector能够订阅数据库的变更日志(如MySQL的binlog),并实时捕获插入、更新和删除操作
6.如何确保ETL过程中的数据质量?请谈谈你使用的数据质量检查和验证方法?
确保数据质量
1.首先明确业务需求和目标
在开始ETL项目之前,首先要明确业务需求和目标。这包括了解数据将如何被使用,业务用户需要哪些数据,以及数据对业务决策的影响。这有助于确定ETL项目的范围,优先级和关键性能指标。
2.选择合适的ETL工具
使用DataX数据源离线同步工具
项目集成了DBSwitch
3.设计健壮的数据抽取策略
数据抽取是ETL的第一步,需要设计一个健壮的策略来确保数据的完整性和一致性。这可能包括使用增量抽取、批量抽取或实时抽取,具体取决于数据源的特性和业务需求 (使用Flink 增量同步)
7.你选择了哪种动态语言来与Java代码集成?为什么选择这种语言?
选择使用了Groovy脚本来处理数据脱敏。
具体实现流程
创建一个方法来实现数据脱敏的逻辑,例如将敏感数据替换为特定字符或进行其他处理。
将需要脱敏的数据传递给这个方法,并获取返回的脱敏后的数据。
将脱敏后的数据用于需要保护敏感信息的场景,如日志输出、展示等
为什么使用Groovy
易于集成:
Groovy可以无缝集成到Java应用程序中,利用现有的Java库和框架,使得与Java生态系统中的其他组件(如数据库访问、文件操作等)集成非常方便。
动态性和灵活性:
Groovy是一种动态语言,允许在运行时修改和扩展代码,这在处理不同格式和结构的数据时非常有用。它还支持运行时类型转换,可以根据需要更改变量类型,简化复杂数据处理逻辑。
8.你如何实现对增量数据的捕获?请描述你使用的具体技术和工具?
- 选择Flink CDC Connector:
- 选择了适合我们源数据库的Flink CDC Connector。例如,如果我们的源数据库是MySQL,我就会使用Flink的MySQL CDC Connector。
- 配置Flink CDC Connector:
- 配置了Flink CDC Connector来连接到源数据库,并订阅数据库的变更日志(如MySQL的binlog)。这允许我实时捕获数据库中的增量变更。
- 使用Flink的流处理能力:
- 利用Flink的流处理能力,我对捕获的变更数据进行了必要的清洗和转换,以确保数据符合后续处理和存储的需求。
- 数据格式化和转换:
- 在Flink流中,我可能对数据进行了一些格式化和转换操作,比如将数据从一种格式转换为另一种格式,或者添加了一些业务逻辑字段。
- Doris作为目标数据存储:
- 使用Doris作为目标数据存储系统。Doris是一个易于使用的、高性能的分析型数据库,它支持高并发的数据写入和实时的数据分析。
- Flink与Doris的集成:
- 利用Flink提供的Doris Connector(如果可用),或者自定义的Sink Function,将处理后的数据写入到Doris中。这确保了数据可以高效地从Flink传输到Doris。
- 确保数据一致性和准确性:
- 在整个数据捕获和加载过程中,我实施了数据一致性保障措施,比如使用Flink的检查点和保存点机制,以及Doris的事务支持,来确保数据的准确性和一致性。
9.数据一致性如何保证?
Flink CDC通过Flink CheckPoint 机制 结合 Doris的两段提交可以实现端端的Exactly Once 语义
具体流程
事务开启 (Flink job 启动及Doris事务开启):当Flink 任务启动后,Doris Sink会发起Precommit请求,随后开启写入事务
数据传输(Flink job的运行和数据传输): 在Flink job 运行过程中,Doris Sink 不断从上游算子获取数据,并通过Http Chunked 的方式持续将数据传输到Doris。
事务的预提交:当Flink 开始进行Checkpoint时,Flink 会发起CheckPoint请求,此时Flink 各个算子会进行Barrier对齐和快照保存,Doris Sink发出停止 Stream Load写入的请求,并发起一个事务提交请求到Doris。这步完成之后,这批数据已经完全写入BE中,但是BE没有进行数据发布前对用户是不可见的
事务提交: 当Flink的Checkpoint完成之后,将通知各个算子,Doris发起一次事务提交到Doris BE,BE对此次数据进行发布,最终完成数据流的写入
版权归原作者 风吹麦浪Κ 所有, 如有侵权,请联系我们删除。