概要
Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端:
1、写入频繁造成数据库压力大
2、写入速度慢、效率低,造成反压
所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。
批量写入功能实现
主函数
KeyedStream keyedStream=sinkStream.keyBy(newHashModKeySelector(keyIndexList,paralleSize));
winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))).process(newRowProcessWindowFunction(keyIndexList));DataStreamSink sink=winStream.addSink(newDbSinkFunction(conf,writeSql));
1、对业务数据进行分组HashModKeySelector
publicclassHashModKeySelectorimplementsKeySelector<Row,String>{privatestaticfinalLogger logger =LoggerFactory.getLogger(HashModKeySelector2.class);privatestaticfinallong serialVersionUID =1L;/**
* key在row中的索引
*/privateList<Integer> keyIndexList=null;privateInteger paralleSize;privateMap<String,String> md5Map =newConcurrentHashMap<>();publicHashModKeySelector2(List<Integer> keyIndexList,Integer paralleSize){this.keyIndexList=keyIndexList;this.paralleSize=paralleSize;}@OverridepublicStringgetKey(Row value){int size=keyIndexList.size();Row keyRow=newRow(size);for(int i=0;i<size;i++){int index=keyIndexList.get(i);
keyRow.setField(i, value.getField(index));}int keyHash=keyRow.hashCode()%paralleSize;String strKey=String.valueOf(keyHash);String md5Value = md5Map.get(strKey);if(StringUtils.isBlank(md5Value)){
md5Value=md5(strKey);
md5Map.put(strKey,md5Value);}return md5Value;}publicstaticStringmd5(String key){String result="";try{// 创建MD5消息摘要对象MessageDigest md =MessageDigest.getInstance("MD5");// 计算消息的摘要byte[] digest = md.digest(key.getBytes());// 将摘要转换为十六进制字符串String hexString =bytesToHex(digest);
result=hexString;}catch(Exception e){
logger.error("计算{}md5值失败:",key,e);return key;}return result;}publicstaticStringbytesToHex(byte[] bytes){StringBuilder hexString =newStringBuilder();for(byte b : bytes){String hex =Integer.toHexString(0xff& b);if(hex.length()==1){
hexString.append('0');}
hexString.append(hex);}return hexString.toString();}}
2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游
publicclassRowProcessWindowFunctionextendsProcessWindowFunction<Row,Row[],String,TimeWindow>{privatestaticfinalLoggerLOG=LoggerFactory.getLogger(RowProcessWindowFunction.class);/**
* key在row中的索引
*/privateList<Integer> keyIndexList;publicRowProcessWindowFunction(List<Integer> keyIndexList){if(keyIndexList==null||keyIndexList.size()==0){LOG.error("keyIndexList is empty");thrownewRuntimeException("keyIndexList is empty");}this.keyIndexList=keyIndexList;}@Overridepublicvoidprocess(String key,Context context,Iterable<Row> inRow,Collector<Row[]> out)throwsException{List<Row> rowList=newArrayList<>();for(Row row : inRow){
rowList.add(row);}int size=rowList.size();Row[] rows=newRow[size];int index=0;for(Row tmpRow:rowList){
rows[index]=tmpRow;
index=index+1;}
out.collect(rows);}}
3、批量写入
publicclassDbSinkFunction<I>extendsRichSinkFunction<I>{privatestaticfinalLoggerLOG=LoggerFactory.getLogger(DbSinkFunction.class);privateString driver =null;privateString sql =null;DbConnectionPool pool =null;privateInteger laodRate;privateint columnTypes[];publicDbSinkFunction(String dbDriver,String dmlSql){this.driver = dbDriver;this.sql = dmlSql;}@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);//创建连接池
pool =newDbConnectionPool(conf, driver);}@Overridepublicvoidclose()throwsException{//关闭资源、释放资源super.close();//关闭连接池
pool.close();}/**
* 写入数据库
*/@Overridepublicvoidinvoke(I record,Context context)throwsException{PreparedStatement ps =null;Boolean isBatch =false;String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));int length=1;Connection connection =null;try{
connection =pool.getConnection();
ps = connection.prepareStatement(sql);//如果是批量数据if(recordinstanceofRow[]){
isBatch =true;
connection.setAutoCommit(false);Row[] rowArray =(Row[]) record;
length=rowArray.length;LOG.info("Row array:{}",length);int no=0;for(int i=1;i<=length;i++){Row row=rowArray[i-1];fillPreparedStatement(ps, row);
ps.addBatch();if(i%3000==0){
ps.executeBatch();
connection.commit();
ps.clearBatch();
no=0;}
no=no+1;}if(no>0){
ps.executeBatch();
connection.commit();
ps.clearBatch();}}elseif(recordinstanceofRow){//单条数据
isBatch =false;Row row =(Row) record;fillPreparedStatement(ps, row);
ps.execute();}else{thrownewRuntimeException("不支持的数据类型 "+ record.getClass());}}catch(SQLException e){
connection.rollback();if(isBatch){doOneInsert(sql, connection,(Row[]) record);}}catch(Exception e){LOG.error("写入失败", e);}finally{closeDBResources(ps,connection);}}/**
* 批量失败时 单条写入
*
* @param sql
* @param connection
* @param rowArray
*/protectedvoiddoOneInsert(String sql,Connection connection,Row[] rowArray){PreparedStatement ps =null;String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));try{Integer allSize = rowArray.length;Integer errCount =0;
connection.setAutoCommit(true);
ps = connection.prepareStatement(sql);for(Row row : rowArray){try{fillPreparedStatement(ps, row);
ps.execute();}catch(SQLException e){
errCount++;}finally{
ps.clearParameters();}}}catch(Exception e){LOG.error(e.getMessage(), e);}finally{closeDBResources(ps,null);}}privatevoidcloseDBResources(Statement stmt,Connection conn){try{if(!(null== stmt||stmt.isClosed())){
stmt.close();}if(!(null== conn||conn.isClosed())){
conn.close();}}catch(SQLException e){}}
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。