关于Apache SeaTunnel
Apache SeaTunnel 原名 Waterdrop,在 2021 年 10 月更名为 SeaTunnel 并申请加入 Apache孵化器。目前 Apache SeaTunnel 已发布 40+个版本,并在大量企业生产实践中使用,包括 J.P.Morgan、字节跳动、Stey、中国移动、富士康、腾讯云、国双、中科大数据研究院、360、Shoppe、Bilibili、新浪、搜狗、唯品会等企业,广泛应用于海量异构数据集成、CDC 数据同步,SaaS 数据集成以及多源数据处理等场景中。
2021 年 12 月 9 日, Apache SeaTunnel 以全票通过的优秀表现正式成为 Apache 孵化器项目。
2023 年 5 月 17 日,Apache 董事会通过 Apache SeaTunnel 毕业决议,结束了为期 18 个月的孵化,正式确定 Apache SeaTunnel 成为 Apache 顶级项目
Apache SeaTunnel 是新一代高性能、分布式、海量数据集成工具,支持上百种数据源 ( Database/Cloud/SaaS ) 支持海量数据的实时 CDC 和批量同步,可以稳定高效地同步万亿级数据。
作为一款简单一易用、超高性能、支持实时流式和离线批量处理的数据集成平台,Apache SeaTunnel 整体的特征和优势包括:
- 丰富且可扩展的连接器:SeaTunnel提供了一个不依赖于特定执行引擎的连接器API。基于此API开发的连接器(Source, Transform, Sink)可以在许多不同的引擎上运行,例如当前支持的SeaTunnel Engine, Flink和Spark。
- 连接器插件:插件设计允许用户轻松开发自己的连接器并将其集成到SeaTunnel项目中。目前,SeaTunnel支持100多个连接器,而且这个数字还在飙升。下面是当前支持的连接器列表
- 批处理流集成:基于SeaTunnel Connector API开发的连接器完美兼容离线同步、实时同步、全同步、增量同步等场景。它们大大降低了管理数据集成任务的难度。
- 支持分布式快照算法,保证数据一致性。
- 多引擎支持:SeaTunnel默认使用SeaTunnel引擎进行数据同步。SeaTunnel还支持使用Flink或Spark作为连接器的执行引擎,以适应企业现有的技术组件。SeaTunnel支持多个版本的Spark和Flink。
- JDBC多路复用,数据库日志多表解析:SeaTunnel支持多表或整个数据库同步,解决了JDBC过度连接的问题;支持多表或全数据库的日志读取和解析,解决了CDC多表同步场景需要处理日志重复读取和解析的问题。
- 高吞吐量和低延迟:SeaTunnel支持并行读写,提供稳定可靠的高吞吐量和低延迟的数据同步能力。
- 完善的实时监控:SeaTunnel支持对数据同步过程中每一步的详细监控信息,让用户轻松了解同步任务读写的数据数量、数据大小、QPS等信息。
- 支持两种作业开发方法:编码和画布设计。SeaTunnel web项目https://github.com/apache/seatunnel-web提供了作业、调度、运行和监控功能的可视化管理
SeaTunnel系统架构设计
SeaTunnel 安装部署
SeaTunnel 引擎是 SeaTunnel 的默认引擎。SeaTunnel的安装包中已经包含了SeaTunnel Engine的全部内容
SeaTunnel 安装包获取
- 可以通过直接下载编译的包进行安装
https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz
下载完毕之后上传到服务器上面并解压
# 解压到了/opt/module目录下
tar -zxvf apache-seatunnel-2.3.3-bin.tar.gz -C /opt/module
- 通过源代码方式进行编译获取安装包
- 从https://seatunnel.apache.org/download或https://github.com/apache/seatunnel.git获取源码包
- 使用maven命令构建安装包
./mvnw -U -T 1C clean install -DskipTests -D"maven.test.skip"=true -D"maven.javadoc.skip"=true -D"checkstyle.skip"=true -D"license.skipAddThirdParty"
- 然后就可以在 中获取安装包
${Your_code_dir}/seatunnel-dist/target
,例如:apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz
配置 SEATUNNEL_HOME
安装Connectors插件
从2.2.0-beta开始,二进制包默认不提供connectors的依赖,因此在第一次使用它时,需要执行以下命令来安装连接器:(当然,您也可以从Apache Maven Repository[https://repo.maven.apache.org/maven2/org/apache/seatunnel/]手动下载连接器,然后手动移动到connectors/seatunnel目录)
sh bin/install-plugin.sh 2.3.3
如果需要指定connector的版本,以2.3.3版本为例,需要执行
sh bin/install-plugin.sh 2.3.3
一般情况下我们不需要所有的连接器插件,所以你可以通过配置config/plugin_config来指定你需要的插件,例如,你只需要connector-console插件,然后你可以修改plugin.properties,比如
--seatunnel-connectors--
connector-console
--end--
如果希望示例应用程序正常工作,则需要添加以下插件
--seatunnel-connectors--
connector-fake
connector-console
--end--
你可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称
如果想手动安装V2 connector插件,只需要下载自己需要的V2 connector插件,放到${SEATUNNEL_HOME}/connectors/seatunnel目录下即可
启动示例作业
定义一个作业配置文件
我们可以直接使用官方的模版config/v2.batch.config.template, 该模版分别定义了env、source、sink
env:表示运行的参数设置,比如并发数,作业运行模式:BATCH/STRAM、checkpoint等等
source:表示数据源的定义
sink:表示目标端的数据源定义
执行命令:
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
我是用的2.3.3,在运行后会报错,提示缺少:java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
报错信息:
2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error,
2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues
2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed
2023-11-08 17:47:32,244 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
at org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan$Builder.<init>(CheckpointPlan.java:66)
下载安装包
将安装包放入
$SEATUNNEL_HOME/lib
下面
再次运行:
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
运行结果
2023-11-08 17:55:32,575 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 9223372036854775807
2023-11-08 17:55:32,621 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@774572734674894849) notify finished!
2023-11-08 17:55:32,621 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, checkpoint:org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint@7e4b2ce6
2023-11-08 17:55:32,627 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start clean pending checkpoint cause CheckpointCoordinator completed.
2023-11-08 17:55:32,628 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn checkpoint_state_774572734674894849_1 state from null to FINISHED
2023-11-08 17:55:32,672 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}
2023-11-08 17:55:32,672 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1} complete with state FINISHED
2023-11-08 17:55:32,672 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}, state FINISHED
2023-11-08 17:55:32,674 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] turn to end state FINISHED.
2023-11-08 17:55:32,674 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] end with state FINISHED
2023-11-08 17:55:32,684 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
2023-11-08 17:55:32,684 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001} complete with state FINISHED
2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000} complete with state FINISHED
2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}, state FINISHED
2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}, state FINISHED
2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] turn to end state FINISHED.
2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] turn to end state FINISHED.
2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] end with state FINISHED
2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] end with state FINISHED
2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] end with state FINISHED
2023-11-08 17:55:33,506 INFO org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] resource
2023-11-08 17:55:33,507 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,507 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,507 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=3, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,510 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] turn to end state FINISHED.
2023-11-08 17:55:33,511 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774572734674894849) end with state FINISHED
2023-11-08 17:55:33,523 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774572734674894849) end with state FINISHED
2023-11-08 17:55:33,546 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
***********************************************
Job Statistic Information
***********************************************
Start Time : 2023-11-08 17:55:30
End Time : 2023-11-08 17:55:33
Total Time(s) : 2
Total Read Count : 32
Total Write Count : 32
Total Failed Count : 0
***********************************************
2023-11-08 17:55:33,546 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2023-11-08 17:55:33,548 INFO com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-610439] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2023-11-08 17:55:33,548 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-610439] [5.1] Removed connection to endpoint: [localhost]:5801:6f7f4921-7d71-42af-b26a-6f11a7960118, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33602->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 17:55:33.543, lastWriteTime=2023-11-08 17:55:33.523, closedTime=2023-11-08 17:55:33.547, connected server version=5.1}
2023-11-08 17:55:33,548 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2023-11-08 17:55:33,549 INFO com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-610439] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=38fbe3e1-876c-48e9-b145-1989888393ab, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699437330660, latest clientAttributes=lastStatisticsCollectionTime=1699437330713,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699437330653,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=15730012160,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5290000000,os.systemLoadAverage=0.86,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994659352,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2424,runtime.usedMemory=34517992, labels=[]}
2023-11-08 17:55:33,550 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2023-11-08 17:55:33,550 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2023-11-08 17:55:33,551 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTTING_DOWN
2023-11-08 17:55:33,553 INFO com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-610439] [5.1] Shutdown request of Member [localhost]:5801 - 6f7f4921-7d71-42af-b26a-6f11a7960118 this is handled
2023-11-08 17:55:33,557 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down connection manager...
2023-11-08 17:55:33,558 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down node engine...
2023-11-08 17:55:35,980 INFO com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-610439] [5.1] Destroying node NodeExtension.
2023-11-08 17:55:35,980 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Hazelcast Shutdown is completed in 2427 ms.
2023-11-08 17:55:35,980 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTDOWN
2023-11-08 17:55:35,980 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
2023-11-08 17:55:35,981 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2023-11-08 17:55:35,981 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
日志输入如上内容,说明配置成功
Seatunnel示例:mysql-to-mysql
接下来,我们在测试一下使用seatunnel实现从mysql同步到mysql的配置
首先,需要下载mysql jdbc的的依赖,这里我们可以选择在plugin-mapping.properties文件中配置connector-jdbc ,也可以直接将connector-jdbc的jar包放入到 $SEATUNNEL_HOME//connectors/seatunnel 下面
创建mysql库和表
create database test_01;
create table test_01.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);
create database test_02;
create table test_02.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);
插入数据
insert into test_01.user (username) values ("zhangsan");
insert into test_01.user (username) values ("lisi");
创建同步配置文件
env {
job.mode = "BATCH"
}
# 配置数据源
source {
jdbc {
url = "jdbc:mysql://172.1.1.54:3306/test_01"
driver = "com.mysql.cj.jdbc.Driver"
user = "admin"
password = "xxxxxxx"
generate_sink_sql = true
database = "test_01"
table = "user"
query = "select * from test_01.user"
}
}
transform {
}
# 配置目标库
sink {
jdbc {
url = "jdbc:mysql://172.1.1.54:3306/test_02"
driver = "com.mysql.cj.jdbc.Driver"
user = "admin"
password = "xxxxxx"
generate_sink_sql = true
database = "test_02"
table = "user"
}
}
运行命令:
./bin/seatunnel.sh -e LOCAL-c ./config/mysql-to-mysql.conf
输出如下信息表示同步成功:
023-11-08 20:55:00,468 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
2023-11-08 20:55:00,468 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
2023-11-08 20:55:00,470 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774617897816293377), Pipeline: [(1/1)] turn to end state FINISHED.
2023-11-08 20:55:00,471 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774617897816293377) end with state FINISHED
2023-11-08 20:55:00,483 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774617897816293377) end with state FINISHED
2023-11-08 20:55:00,511 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
***********************************************
Job Statistic Information
***********************************************
Start Time : 2023-11-08 20:54:58
End Time : 2023-11-08 20:55:00
Total Time(s) : 1
Total Read Count : 2
Total Write Count : 2
Total Failed Count : 0
***********************************************
2023-11-08 20:55:00,511 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2023-11-08 20:55:00,514 INFO com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-250845] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2023-11-08 20:55:00,514 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-250845] [5.1] Removed connection to endpoint: [localhost]:5801:1373b501-f19f-47a2-9ef2-66d14e5a31c0, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33254->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 20:55:00.508, lastWriteTime=2023-11-08 20:55:00.483, closedTime=2023-11-08 20:55:00.512, connected server version=5.1}
2023-11-08 20:55:00,515 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2023-11-08 20:55:00,516 INFO com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-250845] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699448098390, latest clientAttributes=lastStatisticsCollectionTime=1699448098443,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699448098383,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=12655243264,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5340000000,os.systemLoadAverage=0.24,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994543624,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2526,runtime.usedMemory=34633720, labels=[]}
2023-11-08 20:55:00,516 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2023-11-08 20:55:00,516 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2023-11-08 20:55:00,517 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTTING_DOWN
2023-11-08 20:55:00,520 INFO com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-250845] [5.1] Shutdown request of Member [localhost]:5801 - 1373b501-f19f-47a2-9ef2-66d14e5a31c0 this is handled
2023-11-08 20:55:00,523 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down connection manager...
2023-11-08 20:55:00,525 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down node engine...
2023-11-08 20:55:03,377 INFO com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-250845] [5.1] Destroying node NodeExtension.
2023-11-08 20:55:03,377 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Hazelcast Shutdown is completed in 2858 ms.
2023-11-08 20:55:03,378 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTDOWN
2023-11-08 20:55:03,378 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
2023-11-08 20:55:03,378 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2023-11-08 20:55:03,379 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
查看同步结果
MySQL [(none)]>
MySQL [(none)]> select * from test_02.user;
+--------+----------+
| userid | username |
+--------+----------+
| 1 | zhangsan |
| 2 | lisi |
+--------+----------+
2 rows in set (0.00 sec)
Seatunnel 集成Flink&Spark引擎
编辑seatunnel-env.sh文件
修改FLINK_HOME为flink部署目录
修改Spark_HOME为spark部署目录
FLINK_HOME = /data/flink-1.14.5/
SPRK_HOME = /data/spark-2.4.6/
启动Flink集群
./bin/start_cluster.sh
执行命令,还是拿mysql-to-msyq.conf 为例
注意:如果是同步mysql的话,需要将jdbc的jar包放在flink/lib目录下,这次其实和使用flink做一些数据同步一样,相关的依赖包都给到flink。
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql-to-mysql.conf
执行结果如下:
Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /mnt/apache-seatunnel-2.3.3/starter/seatunnel-flink-13-starter.jar --config ./config/mysql-to-mysql.conf --name SeaTunnel
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/kmr/flink1/1/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/mnt/kmr/hadoop/1/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID a643a1990705f817479b1d2880c9f038
Program execution finished
Job with JobID a643a1990705f817479b1d2880c9f038 has finished.
Job Runtime: 2356 ms
清空test_02.user表,使用spark导入
MySQL [(none)]> truncate table test_02.user;
Query OK, 0 rows affected (0.01 sec)
执行导入命令
./bin/start-seatunnel-spark-2-connector-v2.sh \
--master local[2] \
--deploy-mode client \
--config ./config/mysql-to-mysql.conf
执行结果如下:
23/11/08 21:19:57 INFO executor.FieldNamedPreparedStatement: PrepareStatement sql is:
INSERT INTO `test_02`.`user` (`userid`, `username`) VALUES (?, ?)
23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Commit authorized for partition 0 (task 0, attempt 0, stage 0.0)
23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Committed partition 0 (task 0, attempt 0, stage 0.0)
23/11/08 21:19:57 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1148 bytes result sent to driver
23/11/08 21:19:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1486 ms on localhost (executor driver) (1/1)
23/11/08 21:19:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/11/08 21:19:57 INFO scheduler.DAGScheduler: ResultStage 0 (save at SinkExecuteProcessor.java:117) finished in 1.568 s
23/11/08 21:19:57 INFO scheduler.DAGScheduler: Job 0 finished: save at SinkExecuteProcessor.java:117, took 1.610054 s
23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e is committing.
23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e committed.
23/11/08 21:19:57 INFO execution.SparkExecution: Spark Execution started
23/11/08 21:19:57 INFO spark.SparkContext: Invoking stop() from shutdown hook
23/11/08 21:19:57 INFO server.AbstractConnector: Stopped Spark@6e8a9c30{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
23/11/08 21:19:57 INFO ui.SparkUI: Stopped Spark web UI at http://kmr-b55b8d33-gn-0a6e9139-az1-master-1-2.ksc.com:4040
23/11/08 21:19:57 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/08 21:19:57 INFO memory.MemoryStore: MemoryStore cleared
23/11/08 21:19:57 INFO storage.BlockManager: BlockManager stopped
23/11/08 21:19:57 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
23/11/08 21:19:57 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/08 21:19:57 INFO spark.SparkContext: Successfully stopped SparkContext
23/11/08 21:19:57 INFO util.ShutdownHookManager: Shutdown hook called
23/11/08 21:19:57 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6dfc2825-0da5-4e4c-9623-bddcccab0849
查询数据表,已导入成功
MySQL [(none)]> truncate table test_02.user;
Query OK, 0 rows affected (0.01 sec)
MySQL [(none)]> select * from test_02.user;
+--------+----------+
| userid | username |
+--------+----------+
| 1 | zhangsan |
| 2 | lisi |
+--------+----------+
版权归原作者 云原生大数据 所有, 如有侵权,请联系我们删除。