0


【Flink Sink 流数据批量写入数据库】

概要

Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端:
1、写入频繁造成数据库压力大
2、写入速度慢、效率低,造成反压
所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。

批量写入功能实现

主函数

KeyedStream keyedStream=sinkStream.keyBy(newHashModKeySelector(keyIndexList,paralleSize));
winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))).process(newRowProcessWindowFunction(keyIndexList));DataStreamSink  sink=winStream.addSink(newDbSinkFunction(conf,writeSql));

1、对业务数据进行分组HashModKeySelector

publicclassHashModKeySelectorimplementsKeySelector<Row,String>{privatestaticfinalLogger logger =LoggerFactory.getLogger(HashModKeySelector2.class);privatestaticfinallong serialVersionUID =1L;/**
     * key在row中的索引
     */privateList<Integer> keyIndexList=null;privateInteger paralleSize;privateMap<String,String> md5Map =newConcurrentHashMap<>();publicHashModKeySelector2(List<Integer> keyIndexList,Integer paralleSize){this.keyIndexList=keyIndexList;this.paralleSize=paralleSize;}@OverridepublicStringgetKey(Row value){int size=keyIndexList.size();Row keyRow=newRow(size);for(int i=0;i<size;i++){int index=keyIndexList.get(i);
            keyRow.setField(i, value.getField(index));}int keyHash=keyRow.hashCode()%paralleSize;String strKey=String.valueOf(keyHash);String md5Value = md5Map.get(strKey);if(StringUtils.isBlank(md5Value)){
            md5Value=md5(strKey);
            md5Map.put(strKey,md5Value);}return md5Value;}publicstaticStringmd5(String key){String result="";try{// 创建MD5消息摘要对象MessageDigest md =MessageDigest.getInstance("MD5");// 计算消息的摘要byte[] digest = md.digest(key.getBytes());// 将摘要转换为十六进制字符串String hexString =bytesToHex(digest);
            result=hexString;}catch(Exception e){
            logger.error("计算{}md5值失败:",key,e);return key;}return result;}publicstaticStringbytesToHex(byte[] bytes){StringBuilder hexString =newStringBuilder();for(byte b : bytes){String hex =Integer.toHexString(0xff& b);if(hex.length()==1){
                hexString.append('0');}
            hexString.append(hex);}return hexString.toString();}}

2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游

publicclassRowProcessWindowFunctionextendsProcessWindowFunction<Row,Row[],String,TimeWindow>{privatestaticfinalLoggerLOG=LoggerFactory.getLogger(RowProcessWindowFunction.class);/**
     * key在row中的索引
     */privateList<Integer> keyIndexList;publicRowProcessWindowFunction(List<Integer> keyIndexList){if(keyIndexList==null||keyIndexList.size()==0){LOG.error("keyIndexList is empty");thrownewRuntimeException("keyIndexList is empty");}this.keyIndexList=keyIndexList;}@Overridepublicvoidprocess(String key,Context context,Iterable<Row> inRow,Collector<Row[]> out)throwsException{List<Row> rowList=newArrayList<>();for(Row row : inRow){
            rowList.add(row);}int size=rowList.size();Row[] rows=newRow[size];int index=0;for(Row tmpRow:rowList){
            rows[index]=tmpRow;
            index=index+1;}
        out.collect(rows);}}

3、批量写入

publicclassDbSinkFunction<I>extendsRichSinkFunction<I>{privatestaticfinalLoggerLOG=LoggerFactory.getLogger(DbSinkFunction.class);privateString driver =null;privateString sql =null;DbConnectionPool pool =null;privateInteger laodRate;privateint columnTypes[];publicDbSinkFunction(String dbDriver,String dmlSql){this.driver = dbDriver;this.sql = dmlSql;}@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);//创建连接池
        pool =newDbConnectionPool(conf, driver);}@Overridepublicvoidclose()throwsException{//关闭资源、释放资源super.close();//关闭连接池
        pool.close();}/**
     * 写入数据库
     */@Overridepublicvoidinvoke(I record,Context context)throwsException{PreparedStatement ps =null;Boolean isBatch =false;String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));int length=1;Connection connection =null;try{
            connection =pool.getConnection();
            ps = connection.prepareStatement(sql);//如果是批量数据if(recordinstanceofRow[]){
                isBatch =true;
                connection.setAutoCommit(false);Row[] rowArray =(Row[]) record;
                length=rowArray.length;LOG.info("Row array:{}",length);int no=0;for(int i=1;i<=length;i++){Row row=rowArray[i-1];fillPreparedStatement(ps, row);
                    ps.addBatch();if(i%3000==0){
                        ps.executeBatch();
                        connection.commit();
                        ps.clearBatch();
                        no=0;}
                    no=no+1;}if(no>0){
                    ps.executeBatch();
                    connection.commit();
                    ps.clearBatch();}}elseif(recordinstanceofRow){//单条数据
                isBatch =false;Row row =(Row) record;fillPreparedStatement(ps, row);
                ps.execute();}else{thrownewRuntimeException("不支持的数据类型 "+ record.getClass());}}catch(SQLException e){
            connection.rollback();if(isBatch){doOneInsert(sql, connection,(Row[]) record);}}catch(Exception e){LOG.error("写入失败", e);}finally{closeDBResources(ps,connection);}}/**
     * 批量失败时 单条写入
     *
     * @param sql
     * @param connection
     * @param rowArray
     */protectedvoiddoOneInsert(String sql,Connection connection,Row[] rowArray){PreparedStatement ps =null;String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));try{Integer allSize = rowArray.length;Integer errCount =0;
            connection.setAutoCommit(true);
            ps = connection.prepareStatement(sql);for(Row row : rowArray){try{fillPreparedStatement(ps, row);
                    ps.execute();}catch(SQLException e){
                    errCount++;}finally{
                    ps.clearParameters();}}}catch(Exception e){LOG.error(e.getMessage(), e);}finally{closeDBResources(ps,null);}}privatevoidcloseDBResources(Statement stmt,Connection conn){try{if(!(null== stmt||stmt.isClosed())){
                 stmt.close();}if(!(null== conn||conn.isClosed())){
                conn.close();}}catch(SQLException e){}}
标签: 数据库 flink windows

本文转载自: https://blog.csdn.net/gwc791224/article/details/135109118
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。

“【Flink Sink 流数据批量写入数据库】”的评论:

还没有评论