0


Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

最近需求,仅想提高sink2es的qps,所以仅调节了sink2es的并行度,但在调节不同算子并行度时遇到一些问题,找出问题的根本原因解决问题,并分析整理。

实例代码

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 msSETtable.exec.state.ttl=2592000s;--30 days,默认: 0 msCREATETABLE kafka_table (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     account_id ASIF(cur['account_id']ISNOTNULL, cur['account_id'], src ['account_id']),
     publish_time ASIF(cur['publish_time']ISNOTNULL, cur['publish_time'], src ['publish_time']),
     msg_status ASIF(cur['msg_status']ISNOTNULL, cur['msg_status'], src ['msg_status']),
     send_type ASIF(cur['send_type']ISNOTNULL, cur['send_type'], src ['send_type'])--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND)WITH('connector'='kafka','topic'='t1','properties.bootstrap.servers'='xx.xx.xx.xx:9092','properties.group.id'='g1','scan.startup.mode'='earliest-offset',--group-offsets/earliest-offset/latest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format'='json');CREATETABLE es_sink(
     send_type      STRING
    ,account_id     STRING
    ,publish_time   STRING
    ,grouping_id       INTEGER,init           INTEGER,init_cancel    INTEGER,push          INTEGER,succ           INTEGER,fail           INTEGER,init_delete    INTEGER,update_time    STRING
    ,PRIMARYKEY(group_id,send_type,account_id,publish_time)NOT ENFORCED
)with('connector'='elasticsearch-6','index'='es_sink','document-type'='es_sink','hosts'='http://xxx:9200','format'='json','filter.null-value'='true','sink.bulk-flush.max-actions'='1000','sink.bulk-flush.max-size'='10mb');CREATEview  tmp asselect
    send_type,
    account_id,
    publish_time,
    msg_status,casewhen UPPER(opt)='INSERT'and msg_status='0'then1else0endAS init,casewhen UPPER(opt)='UPDATE'and send_type='1'and msg_status='4'then1else0endAS init_cancel,casewhen UPPER(opt)='UPDATE'and msg_status='3'then1else0endAS push,casewhen UPPER(opt)='UPDATE'and(msg_status='1'or msg_status='5')then1else0endAS succ,casewhen UPPER(opt)='UPDATE'and(msg_status='2'or msg_status='6')then1else0endAS fail,casewhen UPPER(opt)='DELETE'and send_type='1'and msg_status='0'then1else0endAS init_delete,
    event_time,
    opt,
    ts
FROM kafka_table
where(UPPER(opt)='INSERT'and msg_status='0')or(UPPER(opt)='UPDATE'and msg_status in('1','2','3','4','5','6'))or(UPPER(opt)='DELETE'and send_type='1'and msg_status='0');--send_type=1          send_type=0--初始化->0             初始化->0--取消->4--推送->3               推送->3--成功->1               成功->5--失败->2               失败->6CREATEview  tmp_groupby asselectCOALESCE(send_type,'N')AS send_type
,COALESCE(account_id,'N')AS account_id
,COALESCE(publish_time,'N')AS publish_time
,casewhen send_type isnulland account_id isnulland publish_time isnullthen1when send_type isnotnulland account_id isnulland publish_time isnullthen2when send_type isnotnulland account_id isnotnulland publish_time isnullthen3when send_type isnotnulland account_id isnotnulland publish_time isnotnullthen4end grouping_id
,sum(init)as init
,sum(init_cancel)as init_cancel
,sum(push)as push
,sum(succ)as succ
,sum(fail)as fail
,sum(init_delete)as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())GROUPBY ROLLUP (send_type,account_id,publish_time);--等同于以上INSERTINTO es_sink
select
     send_type
    ,account_id
    ,publish_time
    ,grouping_id
    ,init
    ,init_cancel
    ,push
    ,succ
    ,fail
    ,init_delete
    ,CAST(LOCALTIMESTAMP AS STRING)as update_time
from tmp_groupby

发现问题

由于groupby或join聚合等算子操作的并行度与sink2es算子操作的并行度不同,上游算子同一个key的数据可能会下发到下游多个不同算子中。
导致sink2es出现多个subtask同时操作同一个key(这里key作为主键id),报错如下:

...Caused by:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]]ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-1516:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]
        at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
        at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
        at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
        at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
        at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
        at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
        at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
        at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
        at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
        at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
        at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
        at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
        at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
        at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
        at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)...1 more
    [CIRCULAR REFERENCE:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]]ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-1516:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]]

问题原因

Flink中存在八种分区策略,常用Operator Chain链接方式有三种分区器:

  • forward:上下游并行度相同,且不发生shuffle,直连的分区器
  • hash:将数据按照key的Hash值下发到下游的算子中
  • rebalance:数据会被循环或者随机下发到下游算子中,改变并行度若无keyby,默认使用RebalancePartitioner分区策略

rebalance分区器,可能会将上游算子的同一个key随机下发到下游不同算子中,因而引起报错,如下图:
在这里插入图片描述在这里插入图片描述模型如下:

在这里插入图片描述

解决方案

  • 分组聚合算子与sink2es算子配置成相同的并行度,即使用forward分区器,如下图:在这里插入图片描述在这里插入图片描述 另外sink2es forward分区器上游operator chain已经通过hash分区器保证了同一个key只能下发到下游一个subtask实例中

模型如下:
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/215b7ba208b142da803a78d85b0f474e.png

  • sink2es算子的并行度配置为1,如下图:

operator chain为forward分区器模型如下:
在这里插入图片描述

总结

归根结底就是需要保证:上游subtask中同一个key只能下发到下游一个subtask中

标签: flink 大数据

本文转载自: https://blog.csdn.net/weixin_38251332/article/details/135854618
版权归原作者 PONY LEE 所有, 如有侵权,请联系我们删除。

“Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】”的评论:

还没有评论