0


Flink使用Log4j将日志发送到Kafka

文章目录

背景

Flink版本:1.14.3

自定义KafkaAppender

可以在自己项目中自定义这个类,也可以将该类打成Jar包方式引用

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * <p>http://www.apache.org/licenses/LICENSE-2.0
 *
 * <p>Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing permissions and
 * limitations under the License.
 */importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.logging.log4j.Level;importorg.apache.logging.log4j.core.Filter;importorg.apache.logging.log4j.core.Layout;importorg.apache.logging.log4j.core.LogEvent;importorg.apache.logging.log4j.core.appender.AbstractAppender;importorg.apache.logging.log4j.core.appender.AppenderLoggingException;importorg.apache.logging.log4j.core.config.plugins.Plugin;importorg.apache.logging.log4j.core.config.plugins.PluginAttribute;importorg.apache.logging.log4j.core.config.plugins.PluginElement;importorg.apache.logging.log4j.core.config.plugins.PluginFactory;importorg.apache.logging.log4j.core.layout.PatternLayout;importjava.io.Serializable;importjava.util.HashMap;importjava.util.HashSet;importjava.util.Map;importjava.util.Set;importjava.util.concurrent.Future;@SuppressWarnings("unused")@Plugin(name ="KafkaAppender", category ="Core", elementType ="appender", printObject =true)publicfinalclassKafkaAppenderextendsAbstractAppender{privateKafkaProducer<String,String> producer =null;privateString topic;/** Kafka地址 */privateString kafkaBroker;privateboolean append =true;/** 日志发送等级 */privateLevel level;privateLayout<?extendsSerializable> layout;/** 包含规则条件 */privateSet<String> includeSet =newHashSet<>();privateSet<String> includeMatchSet =newHashSet<>();/** 不包含规则条件 */privateSet<String> excludeSet =newHashSet<>();privateSet<String> excludeMatchSet =newHashSet<>();publicstaticvoidmain(String[] args){String a ="x.x.*1";System.out.println(a.endsWith(".*"));System.out.println(a.replace(".*",""));}publicKafkaAppender(String name,String topic,String kafkaBroker,Filter filter,Layout<?extendsSerializable> layout,boolean append,String level,String includes,String excludes){super(name, filter, layout);System.out.println("初始化加载 Kafka Appender , Kafka Broker:"+ kafkaBroker
                        +" , Log Topic:"+ topic
                        +" , Log Level:"+ level);if(includes !=null){for(String include : includes.split(",")){if(include.length()>0){if(include.endsWith(".*")){
                        includeMatchSet.add(include.replace(".*",""));}else{
                        includeSet.add(include);}}}}if(excludes !=null){for(String exclude : excludes.split(",")){if(exclude.length()>0){if(exclude.endsWith(".*")){
                        excludeMatchSet.add(exclude.replace(".*",""));}else{
                        excludeSet.add(exclude);}}}}this.topic = topic;this.kafkaBroker = kafkaBroker;Map<String,Object> props =newHashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        producer =newKafkaProducer<>(props);this.append = append;if(level ==null){
            level ="INFO";}if(layout ==null){
            layout =PatternLayout.createDefaultLayout();}this.layout = layout;this.level =Level.toLevel(level);}@PluginFactorypublicstaticKafkaAppendercreateAppender(/** 发送到的Topic */@PluginAttribute("topic")String topic,/** Kafka地址 */@PluginAttribute("kafkaBroker")String kafkaBroker,/** 设置的数据格式Layout */@PluginElement("Layout")Layout<?extendsSerializable> layout,@PluginAttribute("name")String name,@PluginAttribute("append")boolean append,/** 日志等级 */@PluginAttribute("level")String level,/** 设置打印包含的包名,前缀匹配,逗号分隔多个 */@PluginAttribute("includes")String includes,/** 设置打印不包含的包名,前缀匹配,同时存在会被排除,逗号分隔多个 */@PluginAttribute("excludes")String excludes){returnnewKafkaAppender(
                name, topic, kafkaBroker,null, layout, append, level, includes, excludes);}@Overridepublicfinalvoidstop(){super.stop();if(producer !=null){
            producer.close();}}@Overridepublicvoidappend(LogEvent event){if(event.getLevel().isMoreSpecificThan(this.level)){if(filterPackageName(event)){return;}try{if(producer !=null){Future<RecordMetadata> result =
                            producer.send(newProducerRecord<String,String>(
                                            topic,getLayout().toSerializable(event).toString()));// result.get();}}catch(Exception e){LOGGER.error("Unable to write to kafka for appender [{}].",this.getName(), e);thrownewAppenderLoggingException("Unable to write to kafka in appender: "+ e.getMessage(), e);}finally{}}}/**
     * 过滤包名,如果为True则不发送到Kafka
     *
     * @name filterPackageName
     * @date 2023/2/28 下午4:07
     * @return boolean
     * @param event
     * @author Jast
     */privatebooleanfilterPackageName(LogEvent event){boolean flag =true;if(includeSet.size()==0&& includeMatchSet.size()==0&& excludeSet.size()==0&& excludeMatchSet.size()==0){returnfalse;}if(includeSet.size()==0&& includeMatchSet.size()==0){
            flag =false;}/** 打印日志类/名称 */String loggerName = event.getLoggerName();for(String include : includeSet){if(loggerName.equals(include)){
                flag =false;}}for(String include : includeMatchSet){if(loggerName.startsWith(include)){
                flag =false;}}for(String exclude : excludeMatchSet){if(loggerName.startsWith(exclude)){
                flag =true;}}for(String exclude : excludeSet){if(loggerName.equals(exclude)){
                flag =true;}}return flag;}}

log4j.properties配置文件修改

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level=INFO
rootLogger.appenderRef.file.ref=MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name=akka
logger.akka.level=INFO
logger.kafka.name=org.apache.kafka
logger.kafka.level=INFO
logger.hadoop.name=org.apache.hadoop
logger.hadoop.level=INFO
logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=INFO
logger.shaded_zookeeper.name=org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level=INFO
# Log all infos in the given file
appender.main.name=MainAppender
appender.main.type=RollingFile
appender.main.append=true
appender.main.fileName=${sys:log.file}
appender.main.filePattern=${sys:log.file}.%i
appender.main.layout.type=PatternLayout
appender.main.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type=Policies
appender.main.policies.size.type=SizeBasedTriggeringPolicy
appender.main.policies.size.size=100MB
appender.main.policies.startup.type=OnStartupTriggeringPolicy
appender.main.strategy.type=DefaultRolloverStrategy
appender.main.strategy.max=${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name=org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level=OFF

# 从这里开始上面是Flink官方配置文件内容,下面是我们自定义的Kafka配置
# kafka appender config
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.type=KafkaAppender
appender.kafka.name=KafkaAppender
# 日志发送到的Topic
appender.kafka.topic=flink_job_logs
# Kafka Broker
appender.kafka.kafkaBroker=172.16.24.194:9092
# 发送到Kafka日志等级
appender.kafka.level=info
# 过滤指定包名的文件
appender.kafka.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
## kafka的输出的日志pattern,JSONLayout与PatternLayout 二选一
#appender.kafka.layout.type=PatternLayout
#appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  ${sys:log.file}  ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
# 日志文件路径和名称,可以根据这个值区分出执行所在机器
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}
# Flink的执行名称这个需要在执行启动命令时候手动指定,指定方法 -yD env.java.opts="-Dflink_per_job_name=profile-platform" 
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=flink_job_name
appender.kafka.layout.additionalField2.value=${sys:flink_job_name}

启动命令指定配置文件

通过

-yD \$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties" \

指定配置文件

/opt/flink/bin/flink run \-m yarn-cluster \-p$P\-ys$YS\-yjm$YJM\-ytm$YTM\-yt$JAR_PATH/lib \-ynm$YNM\-yD\$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties"\-yDenv.java.opts="-Dfile.encoding=UTF-8"\-yDenv.java.opts="-Dflink_per_job_name=profile-platform"\-c$START_CLASS\$JAR_PATH/$JAR_NAME.jar $1

在Kafka中消费数据格式

{"thread":"Sink Label Group Data Send Topic (4/5)#0","level":"INFO","loggerName":"org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler","message":"Committing the state for checkpoint 23","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1677741263,"nanoOfSecond":777000000},"threadId":104,"threadPriority":5,"logdir":"/hadoop/yarn/log/application_1675237371712_0142/container_e25_1675237371712_0142_01_000003/taskmanager.log","flink_per_job_name":"avris-profile-platform-0301"}

字段说明

字段名称说明epochSecond日志时间单位秒logdir日志文件名称loggerName打印日志的类名level日志等级thread线程名message程序中打印的日志内容

一键应用

为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下

  1. maven中引用依赖
<dependency><groupId>com.gitee.jastee</groupId><artifactId>kafka-log4j-appender</artifactId><version>1.0.4</version></dependency>
  1. 在代码中使用Log打印日志
  2. 配置文件log4j.properties
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level=INFO
rootLogger.appenderRef.file.ref=MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name=akka
logger.akka.level=INFO
logger.kafka.name=org.apache.kafka
logger.kafka.level=INFO
logger.hadoop.name=org.apache.hadoop
logger.hadoop.level=INFO
logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=INFO
logger.shaded_zookeeper.name=org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level=INFO
# Log all infos in the given file
appender.main.name=MainAppender
appender.main.type=RollingFile
appender.main.append=true
appender.main.fileName=${sys:log.file}
appender.main.filePattern=${sys:log.file}.%i
appender.main.layout.type=PatternLayout
appender.main.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type=Policies
appender.main.policies.size.type=SizeBasedTriggeringPolicy
appender.main.policies.size.size=100MB
appender.main.policies.startup.type=OnStartupTriggeringPolicy
appender.main.strategy.type=DefaultRolloverStrategy
appender.main.strategy.max=${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name=org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level=OFF

# 从这里开始上面是Flink官方配置文件内容,下面是我们自定义的Kafka配置
# kafka appender config
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.type=KafkaAppender
appender.kafka.name=KafkaAppender
# 日志发送到的Topic
appender.kafka.topic=flink_job_logs
# Kafka Broker
appender.kafka.kafkaBroker=172.16.24.194:9092
# 发送到Kafka日志等级
appender.kafka.level=info
# 过滤指定包名的文件
appender.kafka.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
## kafka的输出的日志pattern,JSONLayout与PatternLayout 二选一
#appender.kafka.layout.type=PatternLayout
#appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  ${sys:log.file}  ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
# 日志文件路径和名称,可以根据这个值区分出执行所在机器
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}
# Flink的执行名称这个需要在执行启动命令时候手动指定,指定方法 -yD env.java.opts="-Dflink_per_job_name=profile-platform" 
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=flink_job_name
appender.kafka.layout.additionalField2.value=${sys:flink_job_name}
  1. 启动命令中指定日志配置文件
-yD\$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties"\
  1. 启动
/opt/flink/bin/flink run \-m yarn-cluster \-p$P\-ys$YS\-yjm$YJM\-ytm$YTM\-yt$JAR_PATH/lib \-ynm$YNM\-yD\$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties"\-yDenv.java.opts="-Dfile.encoding=UTF-8"\-yDenv.java.opts="-Dflink_per_job_name=profile-platform"\-c$START_CLASS\$JAR_PATH/$JAR_NAME.jar $1

参考链接

https://blog.csdn.net/weixin_44500374/article/details/117931457

https://blog.csdn.net/shi_xiansheng/article/details/119778656

https://blog.csdn.net/u010772882/article/details/125493323

https://blog.csdn.net/Andrew_2018/article/details/115033630?ops_request_misc=&request_id=&biz_id=102&utm_term=appender.kafka.filter&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduweb~default-0-115033630.142v73insert_down3,201v4add_ask,239v2insert_chatgpt&spm=1018.2226.3001.4187

https://www.codenong.com/31757361/

标签: kafka log4j flink

本文转载自: https://blog.csdn.net/zhangshenghang/article/details/129304263
版权归原作者 jast_zsh 所有, 如有侵权,请联系我们删除。

“Flink使用Log4j将日志发送到Kafka”的评论:

还没有评论