0


Flink CDC 3.0 表结构变更的处理流程

Flink CDC 3.0 表结构变更的处理流程

​ 表结构变更主要涉及到三个类

SchemaOperator

DataSinkWriterOperator

Sink

端)和

SchemaRegistry

(协调器);

SchemaOperator

接收结构变更消息时会通知

sink

端和协调器,并等待结构变更操作在协调器执行完毕后在处理后续数据,具体流程参考如下。

前提条件

cdc版本:Flink-cdc 3.0

Flink版本:Flink 1.18

SchemaOperator

Source

抓表结构变更事件推送到

SchemaOperator

时,

SchemaOperator

会向协调器(也就是

SchemaRegistry

)发起变更请求;如果是表结构变更,则向

Sink

发送

flushEvent

,让其(

Sink

flush

内存中数据(

Sink

是经过

DataSinkWriterOperator

包装),最后阻塞数据流;

SchemaOperator

处理表结构变更事件

# SchemaOperator@OverridepublicvoidprocessElement(StreamRecord<Event> streamRecord){Event event = streamRecord.getValue();//如果是schame change事件if(event instanceofSchemaChangeEvent){TableId tableId =((SchemaChangeEvent) event).tableId();
        LOG.info("Table {} received SchemaChangeEvent and start to be blocked.",
                tableId.toString());//处理schame change事件handleSchemaChangeEvent(tableId,(SchemaChangeEvent) event);return;}
    output.collect(streamRecord);}privatevoidhandleSchemaChangeEvent(TableId tableId,SchemaChangeEvent schemaChangeEvent){// The request will need to send a FlushEvent or block until flushing finished//向协调节点(SchemaRegistry)发送表结构变更请求,是表结构变更会返回true 如果是建表则返回falseSchemaChangeResponse response =requestSchemaChange(tableId, schemaChangeEvent);if(response.isShouldSendFlushEvent()){
        LOG.info("Sending the FlushEvent for table {} in subtask {}.",
                tableId,getRuntimeContext().getIndexOfThisSubtask());//向sink发送 flush事件和schame信息
        output.collect(newStreamRecord<>(newFlushEvent(tableId)));
        output.collect(newStreamRecord<>(schemaChangeEvent));// The request will block until flushing finished in each sink writer// 这个请求查询协调器,当前schame是否执行完毕,如果没有则阻塞等待,直到协调器完成schame change操作requestReleaseUpstream();}}

Sink

Sink

flush

掉变更前的数据,并上报给协调器(

SchemaRegistry

)缓存刷新完成

Sink

端处理表结构变更事件,并上报给协调器

# DataSinkWriterOperator 创建sink时会使用DataSinkWriterOperator包装,用于处理FlushEvent和CreateTableEvent事件
@OverridepublicvoidprocessElement(StreamRecord<Event> element)throwsException{Event event = element.getValue();// 处理FlushEvent事件if(event instanceofFlushEvent){handleFlushEvent(((FlushEvent) event));return;}// CreateTableEvent marks the table as processed directlyif(event instanceofCreateTableEvent){
        processedTableIds.add(((CreateTableEvent) event).tableId());this.<OneInputStreamOperator<Event,CommittableMessage<CommT>>>getFlinkWriterOperator().processElement(element);return;}// Check if the table is processed before emitting all other events, because we have to make// sure that sink have a view of the full schema before processing any change events,// including schema changes.ChangeEvent changeEvent =(ChangeEvent) event;if(!processedTableIds.contains(changeEvent.tableId())){emitLatestSchema(changeEvent.tableId());
        processedTableIds.add(changeEvent.tableId());}
    processedTableIds.add(changeEvent.tableId());this.<OneInputStreamOperator<Event,CommittableMessage<CommT>>>getFlinkWriterOperator().processElement(element);}

# handleFlushEvent 向协调节点(SchemaRegistry)发送`FlushSuccess`请求
privatevoidhandleFlushEvent(FlushEvent event)throwsException{
    copySinkWriter.flush(false);
    schemaEvolutionClient.notifyFlushSuccess(getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());}

协调器

​ 协调节点收到所有

Sink

flush

完成通知后,然后执行结构变更操作,最后通知完成给等待的

requestReleaseUpstream

请求。

协调节点处理

FlushSuccess

请求

publicvoidflushSuccess(TableId tableId,int sinkSubtask){
    flushedSinkWriters.add(sinkSubtask);//所有节点都处理完成if(flushedSinkWriters.equals(activeSinkWriters)){
        LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.",
                tableId.toString());PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);//执行表结构变更操作applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());//通知等待的SchemaOperator,结构变更完成!
        waitFlushSuccess.getResponseFuture().complete(wrap(newReleaseUpstreamResponse()));if(RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())){startNextSchemaChangeRequest();}}}

更多请参考github:参考地址

标签: 大数据 flink

本文转载自: https://blog.csdn.net/weixin_36340771/article/details/135904141
版权归原作者 过悟 所有, 如有侵权,请联系我们删除。

“Flink CDC 3.0 表结构变更的处理流程”的评论:

还没有评论