0


Canal-adapter简单介绍及部分源码解析

整体介绍

canal-adapter是阿里开源的一款基于canal server订阅Mysql binglog日志增量同步数据的一款工具。它整体包含启动器和适配器两个模块,启动器采用springboot项目框架,基于spring的

SPI

机制,启动器动态加载不同的适配器(plugins), 目前支持rdbAdapter,esAdapter和hbaseAdatper等,canal-adapter通过这种动态加载外部适配器(plugins)的方式,使项目的扩张性非常强,用户能够根据具体需求自主适配符合自己的外部适配器。同时,通过

FileAlterationMonitor

实现了动态监听和加载配置文件的变更,用户可以不停机动态更新配置。

源码解析

1.系统启动的主入口为:

com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
@SpringBootApplicationpublicclassCanalAdapterApplication{publicstaticvoidmain(String[] args){SpringApplication application =newSpringApplication(CanalAdapterApplication.class);
        application.setBannerMode(Banner.Mode.OFF);
        application.run(args);}}

2.通过

@PostConstruct

注解,启动时加载

com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

中的init()方法,初始化

CanalAdapterLoader

@PostConstructpublicsynchronizedvoidinit(){if(running){return;}try{
            logger.info("## start the canal client adapters.");
            adapterLoader =newCanalAdapterLoader(adapterCanalConfig);
            adapterLoader.init();  
            running =true;
            logger.info("## the canal client adapters are running now ......");}catch(Exception e){
            logger.error("## something goes wrong when starting up the canal client adapters:", e);}}
  1. com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
    
    。CanalAdapterLoader类中的init()方法,根据在canal-adapter的
    application.yml
    
    中配置的
    canal.conf.mode
    
    来确定adapter-worker模式,因为本次采用
    tcp
    
    模式,所以通过
    worker.start() 
    
    方法来启动监听的。
canal.conf:mode: tcp # kafka rocketMQcanalServerHost: 127.0.0.1:11111batchSize:500syncBatchSize:1000retries:3timeout:accessKey:secretKey:srcDataSources:
publicvoidinit(){
        loader =ExtensionLoader.getExtensionLoader(OuterAdapter.class);String canalServerHost =this.canalClientConfig.getCanalServerHost();SocketAddress sa =null;if(canalServerHost !=null){String[] ipPort = canalServerHost.split(":");
            sa =newInetSocketAddress(ipPort[0],Integer.parseInt(ipPort[1]));}String zkHosts =this.canalClientConfig.getZookeeperHosts();if("tcp".equalsIgnoreCase(canalClientConfig.getMode())){// 初始化canal-client的适配器//.....省略部分代码
             worker.start();  
             logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance());}elseif("kafka".equalsIgnoreCase(canalClientConfig.getMode())){// 初始化canal-client-kafka的适配器//.....省略部分代码
            canalKafkaWorker.start();
            logger.info("Start adapter for canal-client mq topic: {} succeed",
            canalAdapter.getInstance()+"-"+ group.getGroupId());}elseif("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())){// 初始化canal-client-rocketMQ的适配器//.....省略部分代码
             rocketMQWorker.start();
             logger.info("Start adapter for canal-client mq topic: {} succeed",
             canalAdapter.getInstance()+"-"+ group.getGroupId());}}
  1. com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
    
    worker.start()
    
    是该抽象类中的方法,它会单独启动一个线程来处理消息。
    process()
    
    方法是一个抽象方法,它有多种实现,根据client模式,选择不同的消息处理方法,它具体实现包含CanalAdapterWorker,CanalAdapterKafkaWorker和CanalAdapterRocketMQWorker等。
publicvoidstart(){if(!running){
            thread =newThread(this::process);
            thread.setUncaughtExceptionHandler(handler);
            thread.start();
            running =true;}}protectedabstractvoidprocess();
  1. com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
    
    类是处理tcp模式消息的一个实现。
    CanalAdapterWorker.process()
    
    方法是
    AbstractCanalAdapterWorker.process()
    
    的一个具体实现。这个方法中:

(1)首先会加载canal-adapter客户端的一些基础配置信息,包括消息发送失败的最大重试次数

retries

(如果retries =-1则表示无限重试),以及从adapter从canal server instance批量拉取消息的一个大小

batchSize

(2)包含两个while循环。第一个while循环主要是:(2.1)开启canal adapter instance同步开关;(2.2)初始化canal server connetcor;(2.3)与canal adapter中配置的outerAdapters建立连接。第二个while循环主要是从connetcor中获取指定数量的增量数据

Message message = connector.getWithoutAck(batchSize)

,同时将数据

writeOut(message)

到指定的outerAdapters中。这块需要注意一点,如果在同步数据过程中,出现异常,它会根据最大重试次数,重新同步,但一旦超过最大重试次数,adapter会向connetor返回一个ack信号

connector.ack(batchId)

,把本次batchId返回给connector,告诉connector,小于等于本次batchId的消息都已成功同步,因此我理解这块可能会有丢数据情况(个人理解,可以指正)

@Overrideprotectedvoidprocess(){while(!running){// waiting until running == truewhile(!running){try{Thread.sleep(1000);}catch(InterruptedException e){}}}int retry = canalClientConfig.getRetries()==null|| canalClientConfig.getRetries()==0?1: canalClientConfig.getRetries();if(retry ==-1){// 重试次数-1代表异常时一直阻塞重试
            retry =Integer.MAX_VALUE;}// long timeout = canalClientConfig.getTimeout() == null ? 300000 :// canalClientConfig.getTimeout(); // 默认超时5分钟Integer batchSize = canalClientConfig.getBatchSize();if(batchSize ==null){
            batchSize = BATCH_SIZE;}while(running){try{
                syncSwitch.get(canalDestination);

                logger.info("=============> Start to connect destination: {} <=============",this.canalDestination);
                connector.connect();
                logger.info("=============> Start to subscribe destination: {} <=============",this.canalDestination);
                connector.subscribe();
                logger.info("=============> Subscribe destination: {} succeed <=============",this.canalDestination);while(running){try{
                        syncSwitch.get(canalDestination,1L,TimeUnit.MINUTES);}catch(TimeoutException e){break;}if(!running){break;}for(int i =0; i < retry; i++){if(!running){break;}Message message = connector.getWithoutAck(batchSize);// 获取指定数量的数据long batchId = message.getId();try{int size = message.getEntries().size();if(batchId ==-1|| size ==0){Thread.sleep(500);}else{if(logger.isDebugEnabled()){
                                    logger.debug("destination: {} batchId: {} batchSize: {} ",
                                        canalDestination,
                                        batchId,
                                        size);}long begin =System.currentTimeMillis();writeOut(message);if(logger.isDebugEnabled()){
                                    logger.debug("destination: {} batchId: {} elapsed time: {} ms",
                                        canalDestination,
                                        batchId,System.currentTimeMillis()- begin);}}
                            connector.ack(batchId);// 提交确认break;}catch(Exception e){if(i != retry -1){
                                connector.rollback(batchId);// 处理失败, 回滚数据
                                logger.error(e.getMessage()+" Error sync and rollback, execute times: "+(i +1));}else{
                                connector.ack(batchId);
                                logger.error(e.getMessage()+" Error sync but ACK!");}Thread.sleep(500);}}}}catch(Throwable e){
                logger.error("process error!", e);}finally{
                connector.disconnect();
                logger.info("=============> Disconnect destination: {} <=============",this.canalDestination);}if(running){// is reconnecttry{Thread.sleep(1000);}catch(InterruptedException e){// ignore}}}}
  1.  writeOut(message)
    
    也是
    AbstractCanalAdapterWorker
    
    抽象类中的一个方法。通过该类初始化的一个与
    canalOuterAdapters.size()
    
    相等的固长线程池,来异步批量提交消息
    batchSync(dmls, adapter)
    
protectedvoidwriteOut(finalMessage message){List<Future<Boolean>> futures =newArrayList<>();// 组间适配器并行运行
        canalOuterAdapters.forEach(outerAdapters ->{finalList<OuterAdapter> adapters = outerAdapters;
            futures.add(groupInnerExecutorService.submit(()->{try{// 组内适配器穿行运行,尽量不要配置组内适配器
                    adapters.forEach(adapter ->{long begin =System.currentTimeMillis();List<Dml> dmls =MessageUtil.parse4Dml(canalDestination, groupId, message);if(dmls !=null){batchSync(dmls, adapter);if(logger.isDebugEnabled()){
                                logger.debug("{} elapsed time: {}",
                                    adapter.getClass().getName(),(System.currentTimeMillis()- begin));}}});returntrue;}catch(Exception e){
                    logger.error(e.getMessage(), e);returnfalse;}}));//省略部分代码}

7.batchSync()方法也是

AbstractCanalAdapterWorker

抽象类中的一个方法。当dmls条数小于

syncBatchSize

时,会直接同步数据,如果大于则分批同步,但始终调用的还是

 adapter.sync()

这个方法(这块判断是分批还是一次同步,个人感觉有点问题,因为dml.getData()其实也是一个List集合,里面也包含了多条数据,一次性同步时并不能直接使用

dmls.size() <= canalClientConfig.getSyncBatchSize()

来判断是否小于

syncBatchSize

,而应该用List中dml.getData().size()的和来判断)。

privatevoidbatchSync(List<Dml> dmls,OuterAdapter adapter){// 分批同步if(dmls.size()<= canalClientConfig.getSyncBatchSize()){
            adapter.sync(dmls);}else{int len =0;List<Dml> dmlsBatch =newArrayList<>();for(Dml dml : dmls){
                dmlsBatch.add(dml);if(dml.getData()==null|| dml.getData().isEmpty()){
                    len +=1;}else{
                    len += dml.getData().size();}if(len >= canalClientConfig.getSyncBatchSize()){
                    adapter.sync(dmlsBatch);
                    dmlsBatch.clear();
                    len =0;}}if(!dmlsBatch.isEmpty()){
                adapter.sync(dmlsBatch);}}}

8.adapter.sync()是

com/alibaba/otter/canal/client/adapter/OuterAdapter.java

的一个接口。这个接口也有多种实现,包括:ESAdapter,HbaseAdapter,RdbAdapter等。这些实现就是具体的适配器(plugins)实现,本次主要研究

RdbAdapter

适配器。

  1. com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
    
    OuterAdapter
    
    的一个具体实现,
    RdbAdapter.sync()
    
    这个方法,主要通过调用
     rdbSyncService.sync(mappingConfigCache, dmls, envProperties)
    
    来实现Rdb核心同步逻辑处理。
@Overridepublicvoidsync(List<Dml> dmls){if(dmls ==null|| dmls.isEmpty()){return;}try{
            rdbSyncService.sync(mappingConfigCache, dmls, envProperties);
            rdbMirrorDbSyncService.sync(dmls);}catch(Exception e){thrownewRuntimeException(e);}}
  1. com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
    
    RdbSyncService.sync()
    
    方法中,会判断同步语句是否为DDL语句,如果是DDL语句,则直接返回false,换句话说,就是没有schame变更的记录操作。针对DML语句,它会把
    List<Dml>
    
    中所有的
    Dml
    
    拿出来,然后通过
    SingleDml.dml2SingleDmls()
    
    方法,将
    dml.getData()
    
    中所有的数据再全部遍历出来,然后生成一个
    List<SingleDml>
    
    集合,然后根据配置文件中配置的
    concurrent=false/concurrent=true
    
    来判断是否需要并发同步。如果设置了并发处理,则会根据该方法中的
    pkHash()
    
    方法,根据主键值计算hash,返回一个具体的hashCode,最后根据hashCode将单条数据
    SyncItem
    
    存放在不同
    List<SyncItem>[] dmlsPartition
    
    index上,这块其实是一个性能瓶颈。
publicvoidsync(Map<String,Map<String,MappingConfig>> mappingConfig,List<Dml> dmls,Properties envProperties){sync(dmls, dml ->{if(dml.getIsDdl()!=null&& dml.getIsDdl()&&StringUtils.isNotEmpty(dml.getSql())){// DDL
            columnsTypeCache.remove(dml.getDestination()+"."+ dml.getDatabase()+"."+ dml.getTable());returnfalse;}else{// DMLString destination =StringUtils.trimToEmpty(dml.getDestination());String groupId =StringUtils.trimToEmpty(dml.getGroupId());String database = dml.getDatabase();String table = dml.getTable();Map<String,MappingConfig> configMap;if(envProperties !=null&&!"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))){
                configMap = mappingConfig.get(destination +"-"+ groupId +"_"+ database +"-"+ table);}else{
                configMap = mappingConfig.get(destination +"_"+ database +"-"+ table);}if(configMap ==null){returnfalse;}if(configMap.values().isEmpty()){returnfalse;}for(MappingConfig config : configMap.values()){if(config.getConcurrent()){List<SingleDml> singleDmls =SingleDml.dml2SingleDmls(dml);
                    singleDmls.forEach(singleDml ->{int hash =pkHash(config.getDbMapping(), singleDml.getData());SyncItem syncItem =newSyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);});}else{int hash =0;List<SingleDml> singleDmls =SingleDml.dml2SingleDmls(dml);
                    singleDmls.forEach(singleDml ->{SyncItem syncItem =newSyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);});}}returntrue;}});}
publicintpkHash(DbMapping dbMapping,Map<String,Object> d){returnpkHash(dbMapping, d,null);}publicintpkHash(DbMapping dbMapping,Map<String,Object> d,Map<String,Object> o){int hash =0;// 取主键for(Map.Entry<String,String> entry : dbMapping.getTargetPk().entrySet()){String targetColumnName = entry.getKey();String srcColumnName = entry.getValue();if(srcColumnName ==null){
                srcColumnName =Util.cleanColumn(targetColumnName);}Object value =null;if(o !=null&& o.containsKey(srcColumnName)){
                value = o.get(srcColumnName);}elseif(d !=null){
                value = d.get(srcColumnName);}if(value !=null){
                hash += value.hashCode();}}
        hash =Math.abs(hash)% threads;returnMath.abs(hash);}

11.同样在

com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

类中,另一个

RdbSyncService.sync()

重载方法主要是来异步处理

dmlsPartition

中的数据。这个方法中,默认会开启3个futcher线程(代码中写死的),然后开始遍历

dmlsPartition

,通过

sync(batchExecutors[j]

来处理数据。

privateint                               threads =3;publicvoidsync(List<Dml> dmls,Function<Dml,Boolean> function){try{boolean toExecute =false;for(Dml dml : dmls){if(!toExecute){
                    toExecute = function.apply(dml);}else{
                    function.apply(dml);}}if(toExecute){List<Future<Boolean>> futures =newArrayList<>();for(int i =0; i < threads; i++){int j = i;if(dmlsPartition[j].isEmpty()){// bypasscontinue;}

                    futures.add(executorThreads[i].submit(()->{try{
                            dmlsPartition[j].forEach(syncItem ->sync(batchExecutors[j],
                                syncItem.config,
                                syncItem.singleDml));
                            dmlsPartition[j].clear();
                            batchExecutors[j].commit();returntrue;}catch(Throwable e){
                            batchExecutors[j].rollback();thrownewRuntimeException(e);}}));}

                futures.forEach(future ->{try{
                        future.get();}catch(ExecutionException|InterruptedException e){thrownewRuntimeException(e);}});}}finally{for(BatchExecutor batchExecutor : batchExecutors){if(batchExecutor !=null){
                    batchExecutor.close();}}}}

12.同样在

com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

类中,另一个

RdbSyncService.sync()

重载方法,数据进来之后,会有一个实时Etl清洗操作

streamEtlHitProcess(etlCondition, dml)

,根据配置文件中的

etlCondition

条件来过滤清洗数据。根据

DML

类型来采用不同的

insert

,

update

,

DELETE

truncate

操作来具体执行

batchExecutor

publicvoidsync(BatchExecutor batchExecutor,MappingConfig config,SingleDml dml){if(config !=null){try{String etlCondition = config.getDbMapping().getEtlCondition();if(!streamEtlHitProcess(etlCondition, dml)){if(logger.isDebugEnabled()){
                        logger.debug("etl filter {} success: {}", etlCondition, JSON.toJSONString(dml,SerializerFeature.WriteMapNullValue));}return;}String type = dml.getType();if(type !=null&& type.equalsIgnoreCase("INSERT")){insert(batchExecutor, config, dml);}elseif(type !=null&& type.equalsIgnoreCase("UPDATE")){update(batchExecutor, config, dml);}elseif(type !=null&& type.equalsIgnoreCase("DELETE")){delete(batchExecutor, config, dml);}elseif(type !=null&& type.equalsIgnoreCase("TRUNCATE")){truncate(batchExecutor, config);}if(logger.isDebugEnabled()){
                    logger.debug("DML: {}", JSON.toJSONString(dml,SerializerFeature.WriteMapNullValue));}}catch(SQLException e){thrownewRuntimeException(e);}}}

13.在

com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

类中,以

RdbSyncService.insert()

操作为例,可以从这个方法看到,它做了一个拼接sql的操作,通过拼接后的insert语句,最终完成数据的同步工作。其他几个实现也一样,都是通过拼接SQL的方式,将数据同步到数据库中。

privatevoidinsert(BatchExecutor batchExecutor,MappingConfig config,SingleDml dml)throwsSQLException{Map<String,Object> data = dml.getData();if(data ==null|| data.isEmpty()){return;}DbMapping dbMapping = config.getDbMapping();Map<String,String> columnsMap =SyncUtil.getColumnsMap(dbMapping, data);StringBuilder insertSql =newStringBuilder();
        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");

        columnsMap.forEach((targetColumnName, srcColumnName)-> insertSql.append(targetColumnName).append(","));int len = insertSql.length();
        insertSql.delete(len -1, len).append(") VALUES (");int mapLen = columnsMap.size();for(int i =0; i < mapLen; i++){
            insertSql.append("?,");}
        len = insertSql.length();
        insertSql.delete(len -1, len).append(")");Map<String,Integer> ctype =getTargetColumnType(batchExecutor.getConn(), config);List<Map<String,?>> values =newArrayList<>();for(Map.Entry<String,String> entry : columnsMap.entrySet()){String targetColumnName = entry.getKey();String srcColumnName = entry.getValue();if(srcColumnName ==null){
                srcColumnName =Util.cleanColumn(targetColumnName);}Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());if(type ==null){// throw new RuntimeException("Target Database : " + database + "Table" + table + "Target column: " + targetColumnName + " not matched");thrownewRuntimeException(String.format("Target database:{%s} table:{%s} target column:{%s} not matched", dml.getDatabase(), dml.getTable(), targetColumnName));}Object value = data.get(srcColumnName);BatchExecutor.setValue(values, type, value);}try{
            batchExecutor.execute(insertSql.toString(), values);}catch(SQLException e){if(skipDupException
                &&(e.getMessage().contains("Duplicate entry")|| e.getMessage().startsWith("ORA-00001:"))){// ignore// TODO 增加更多关系数据库的主键冲突的错误码}else{throw e;}}if(logger.isTraceEnabled()){
            logger.trace("Insert into target table, sql: {}", insertSql);}}
最后

欢迎指正。

标签: java 大数据 big data

本文转载自: https://blog.csdn.net/weixin_43914798/article/details/129768159
版权归原作者 码猿小站 所有, 如有侵权,请联系我们删除。

“Canal-adapter简单介绍及部分源码解析”的评论:

还没有评论