文章目录
背景
Flink版本:1.14.3
自定义KafkaAppender
可以在自己项目中自定义这个类,也可以将该类打成Jar包方式引用
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.logging.log4j.Level;importorg.apache.logging.log4j.core.Filter;importorg.apache.logging.log4j.core.Layout;importorg.apache.logging.log4j.core.LogEvent;importorg.apache.logging.log4j.core.appender.AbstractAppender;importorg.apache.logging.log4j.core.appender.AppenderLoggingException;importorg.apache.logging.log4j.core.config.plugins.Plugin;importorg.apache.logging.log4j.core.config.plugins.PluginAttribute;importorg.apache.logging.log4j.core.config.plugins.PluginElement;importorg.apache.logging.log4j.core.config.plugins.PluginFactory;importorg.apache.logging.log4j.core.layout.PatternLayout;importjava.io.Serializable;importjava.util.HashMap;importjava.util.HashSet;importjava.util.Map;importjava.util.Set;importjava.util.concurrent.Future;@SuppressWarnings("unused")@Plugin(name ="KafkaAppender", category ="Core", elementType ="appender", printObject =true)publicfinalclassKafkaAppenderextendsAbstractAppender{privateKafkaProducer<String,String> producer =null;privateString topic;/** Kafka地址 */privateString kafkaBroker;privateboolean append =true;/** 日志发送等级 */privateLevel level;privateLayout<?extendsSerializable> layout;/** 包含规则条件 */privateSet<String> includeSet =newHashSet<>();privateSet<String> includeMatchSet =newHashSet<>();/** 不包含规则条件 */privateSet<String> excludeSet =newHashSet<>();privateSet<String> excludeMatchSet =newHashSet<>();publicstaticvoidmain(String[] args){String a ="x.x.*1";System.out.println(a.endsWith(".*"));System.out.println(a.replace(".*",""));}publicKafkaAppender(String name,String topic,String kafkaBroker,Filter filter,Layout<?extendsSerializable> layout,boolean append,String level,String includes,String excludes){super(name, filter, layout);System.out.println("初始化加载 Kafka Appender , Kafka Broker:"+ kafkaBroker
+" , Log Topic:"+ topic
+" , Log Level:"+ level);if(includes !=null){for(String include : includes.split(",")){if(include.length()>0){if(include.endsWith(".*")){
includeMatchSet.add(include.replace(".*",""));}else{
includeSet.add(include);}}}}if(excludes !=null){for(String exclude : excludes.split(",")){if(exclude.length()>0){if(exclude.endsWith(".*")){
excludeMatchSet.add(exclude.replace(".*",""));}else{
excludeSet.add(exclude);}}}}this.topic = topic;this.kafkaBroker = kafkaBroker;Map<String,Object> props =newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producer =newKafkaProducer<>(props);this.append = append;if(level ==null){
level ="INFO";}if(layout ==null){
layout =PatternLayout.createDefaultLayout();}this.layout = layout;this.level =Level.toLevel(level);}@PluginFactorypublicstaticKafkaAppendercreateAppender(/** 发送到的Topic */@PluginAttribute("topic")String topic,/** Kafka地址 */@PluginAttribute("kafkaBroker")String kafkaBroker,/** 设置的数据格式Layout */@PluginElement("Layout")Layout<?extendsSerializable> layout,@PluginAttribute("name")String name,@PluginAttribute("append")boolean append,/** 日志等级 */@PluginAttribute("level")String level,/** 设置打印包含的包名,前缀匹配,逗号分隔多个 */@PluginAttribute("includes")String includes,/** 设置打印不包含的包名,前缀匹配,同时存在会被排除,逗号分隔多个 */@PluginAttribute("excludes")String excludes){returnnewKafkaAppender(
name, topic, kafkaBroker,null, layout, append, level, includes, excludes);}@Overridepublicfinalvoidstop(){super.stop();if(producer !=null){
producer.close();}}@Overridepublicvoidappend(LogEvent event){if(event.getLevel().isMoreSpecificThan(this.level)){if(filterPackageName(event)){return;}try{if(producer !=null){Future<RecordMetadata> result =
producer.send(newProducerRecord<String,String>(
topic,getLayout().toSerializable(event).toString()));// result.get();}}catch(Exception e){LOGGER.error("Unable to write to kafka for appender [{}].",this.getName(), e);thrownewAppenderLoggingException("Unable to write to kafka in appender: "+ e.getMessage(), e);}finally{}}}/**
* 过滤包名,如果为True则不发送到Kafka
*
* @name filterPackageName
* @date 2023/2/28 下午4:07
* @return boolean
* @param event
* @author Jast
*/privatebooleanfilterPackageName(LogEvent event){boolean flag =true;if(includeSet.size()==0&& includeMatchSet.size()==0&& excludeSet.size()==0&& excludeMatchSet.size()==0){returnfalse;}if(includeSet.size()==0&& includeMatchSet.size()==0){
flag =false;}/** 打印日志类/名称 */String loggerName = event.getLoggerName();for(String include : includeSet){if(loggerName.equals(include)){
flag =false;}}for(String include : includeMatchSet){if(loggerName.startsWith(include)){
flag =false;}}for(String exclude : excludeMatchSet){if(loggerName.startsWith(exclude)){
flag =true;}}for(String exclude : excludeSet){if(loggerName.equals(exclude)){
flag =true;}}return flag;}}
log4j.properties配置文件修改
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level=INFO
rootLogger.appenderRef.file.ref=MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name=akka
logger.akka.level=INFO
logger.kafka.name=org.apache.kafka
logger.kafka.level=INFO
logger.hadoop.name=org.apache.hadoop
logger.hadoop.level=INFO
logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=INFO
logger.shaded_zookeeper.name=org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level=INFO
# Log all infos in the given file
appender.main.name=MainAppender
appender.main.type=RollingFile
appender.main.append=true
appender.main.fileName=${sys:log.file}
appender.main.filePattern=${sys:log.file}.%i
appender.main.layout.type=PatternLayout
appender.main.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type=Policies
appender.main.policies.size.type=SizeBasedTriggeringPolicy
appender.main.policies.size.size=100MB
appender.main.policies.startup.type=OnStartupTriggeringPolicy
appender.main.strategy.type=DefaultRolloverStrategy
appender.main.strategy.max=${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name=org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level=OFF
# 从这里开始上面是Flink官方配置文件内容,下面是我们自定义的Kafka配置
# kafka appender config
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.type=KafkaAppender
appender.kafka.name=KafkaAppender
# 日志发送到的Topic
appender.kafka.topic=flink_job_logs
# Kafka Broker
appender.kafka.kafkaBroker=172.16.24.194:9092
# 发送到Kafka日志等级
appender.kafka.level=info
# 过滤指定包名的文件
appender.kafka.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
## kafka的输出的日志pattern,JSONLayout与PatternLayout 二选一
#appender.kafka.layout.type=PatternLayout
#appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} ${sys:log.file} ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
# 日志文件路径和名称,可以根据这个值区分出执行所在机器
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}
# Flink的执行名称这个需要在执行启动命令时候手动指定,指定方法 -yD env.java.opts="-Dflink_per_job_name=profile-platform"
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=flink_job_name
appender.kafka.layout.additionalField2.value=${sys:flink_job_name}
启动命令指定配置文件
通过
-yD \$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties" \
指定配置文件
/opt/flink/bin/flink run \-m yarn-cluster \-p$P\-ys$YS\-yjm$YJM\-ytm$YTM\-yt$JAR_PATH/lib \-ynm$YNM\-yD\$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties"\-yDenv.java.opts="-Dfile.encoding=UTF-8"\-yDenv.java.opts="-Dflink_per_job_name=profile-platform"\-c$START_CLASS\$JAR_PATH/$JAR_NAME.jar $1
在Kafka中消费数据格式
{"thread":"Sink Label Group Data Send Topic (4/5)#0","level":"INFO","loggerName":"org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler","message":"Committing the state for checkpoint 23","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1677741263,"nanoOfSecond":777000000},"threadId":104,"threadPriority":5,"logdir":"/hadoop/yarn/log/application_1675237371712_0142/container_e25_1675237371712_0142_01_000003/taskmanager.log","flink_per_job_name":"avris-profile-platform-0301"}
字段说明
字段名称说明epochSecond日志时间单位秒logdir日志文件名称loggerName打印日志的类名level日志等级thread线程名message程序中打印的日志内容
一键应用
为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下
- maven中引用依赖
<dependency><groupId>com.gitee.jastee</groupId><artifactId>kafka-log4j-appender</artifactId><version>1.0.4</version></dependency>
- 在代码中使用Log打印日志
- 配置文件
log4j.properties
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level=INFO
rootLogger.appenderRef.file.ref=MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name=akka
logger.akka.level=INFO
logger.kafka.name=org.apache.kafka
logger.kafka.level=INFO
logger.hadoop.name=org.apache.hadoop
logger.hadoop.level=INFO
logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=INFO
logger.shaded_zookeeper.name=org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level=INFO
# Log all infos in the given file
appender.main.name=MainAppender
appender.main.type=RollingFile
appender.main.append=true
appender.main.fileName=${sys:log.file}
appender.main.filePattern=${sys:log.file}.%i
appender.main.layout.type=PatternLayout
appender.main.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type=Policies
appender.main.policies.size.type=SizeBasedTriggeringPolicy
appender.main.policies.size.size=100MB
appender.main.policies.startup.type=OnStartupTriggeringPolicy
appender.main.strategy.type=DefaultRolloverStrategy
appender.main.strategy.max=${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name=org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level=OFF
# 从这里开始上面是Flink官方配置文件内容,下面是我们自定义的Kafka配置
# kafka appender config
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.type=KafkaAppender
appender.kafka.name=KafkaAppender
# 日志发送到的Topic
appender.kafka.topic=flink_job_logs
# Kafka Broker
appender.kafka.kafkaBroker=172.16.24.194:9092
# 发送到Kafka日志等级
appender.kafka.level=info
# 过滤指定包名的文件
appender.kafka.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
## kafka的输出的日志pattern,JSONLayout与PatternLayout 二选一
#appender.kafka.layout.type=PatternLayout
#appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} ${sys:log.file} ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
# 日志文件路径和名称,可以根据这个值区分出执行所在机器
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}
# Flink的执行名称这个需要在执行启动命令时候手动指定,指定方法 -yD env.java.opts="-Dflink_per_job_name=profile-platform"
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=flink_job_name
appender.kafka.layout.additionalField2.value=${sys:flink_job_name}
- 启动命令中指定日志配置文件
-yD\$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties"\
- 启动
/opt/flink/bin/flink run \-m yarn-cluster \-p$P\-ys$YS\-yjm$YJM\-ytm$YTM\-yt$JAR_PATH/lib \-ynm$YNM\-yD\$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties"\-yDenv.java.opts="-Dfile.encoding=UTF-8"\-yDenv.java.opts="-Dflink_per_job_name=profile-platform"\-c$START_CLASS\$JAR_PATH/$JAR_NAME.jar $1
参考链接
https://blog.csdn.net/weixin_44500374/article/details/117931457
https://blog.csdn.net/shi_xiansheng/article/details/119778656
版权归原作者 jast_zsh 所有, 如有侵权,请联系我们删除。