0


45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文简单的介绍了Flink 的指标体系的第一部分,即指标类型以及四种类型的代码实现示例。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版

本文依赖nc能正常使用。
本文分为5个部分,即指标分类、计数器、gauge、histogram和meter四个指标的代码实现。
本文的示例是在Flink 1.17版本中运行。

一、Flink 指标体系

Flink暴露了一个度量系统,允许收集度量并将其公开给外部系统。
本文涉及的maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency></dependencies>

1、Registering metrics 注册指标

通过调用getRuntimeContext().getMetricGroup(),您可以从任何扩展RichFunction的用户函数访问度量系统。此方法返回一个MetricGroup对象,您可以在该对象上创建和注册新度量。

1)、指标类型

Flink支持计数器、仪表盘、柱状图和计量表。Counters, Gauges, Histograms and Meters.

2)、计数器

计数器是用来统计数量的。当前值可以是in-或使用 inc()/inc(long n)或dec()/dec(long n)增减。您可以通过调用MetricGroup上的 counter(String name)来创建和注册计数器。
本示例提供了多种实现方式,供参考。

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.metrics.Counter;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author alanchan
 *
 */publicclassTestMetricsDemo{//    public class LineMapper extends RichMapFunction<String, String> {//        private transient Counter counter;////        @Override//        public void open(Configuration config) {//            this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter");//        }////        @Override//        public String map(String value) throws Exception {//            this.counter.inc();//            return value;//        }//    }publicstaticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformationDataStream<Tuple2<String,Integer>> result = lines.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{String[] arr = value.split(",");for(String word : arr){
                    out.collect(word);}}}).map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{returnTuple2.of(value,1);}}).keyBy(t -> t.f0).sum(1);//        SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = lines.map(new RichMapFunction<String, Tuple2<Integer, Integer>>() {////            @Override//            public Tuple2<Integer, Integer> map(String value) throws Exception {//                int subTaskId = getRuntimeContext().getIndexOfThisSubtask();// 子任务id/分区编号//                return new Tuple2(subTaskId, 1);//            }//            // 按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素//        }).keyBy(t -> t.f0).sum(1);// RichFlatMapFunction<IN, OUT>// Tuple3<String, Long, Integer> 输入的字符串,行数,统计单词的总数DataStream<Tuple3<String,Long,Integer>> result2 = lines.flatMap(newRichFlatMapFunction<String,Tuple2<String,Long>>(){//            private transient Counter counter;privatelong result2LineCounter =0;@Overridepublicvoidopen(Configuration config){//                this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter:");
                result2LineCounter =getRuntimeContext().getMetricGroup().counter("result2LineCounter:").getCount();}@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Long>> out)throwsException{//                this.counter.inc();
                result2LineCounter++;System.out.println("计数器行数:"+ result2LineCounter);String[] arr = value.split(",");for(String word : arr){
                    out.collect(Tuple2.of(word, result2LineCounter));}}}).map(newMapFunction<Tuple2<String,Long>,Tuple3<String,Long,Integer>>(){@OverridepublicTuple3<String,Long,Integer>map(Tuple2<String,Long> value)throwsException{//                Tuple3<String, Long, Integer> t = Tuple3.of(value.f0, value.f1, 1);returnTuple3.of(value.f0, value.f1,1);}}).keyBy(t -> t.f0).sum(2);// sink
        result.print("result:");
        result2.print("result2:");

        env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        env.setParallelism(1);//        DataStream<String> input = env.fromElements("a", "b", "c", "a", "b", "c");////        input.keyBy(value -> value).map(new RichMapFunction<String, String>() {//            private long count = 0;////            @Override//            public void open(Configuration parameters) throws Exception {                super.open(parameters);//                count = getRuntimeContext().getMetricGroup().counter("myCounter").getCount();//            }////            @Override//            public String map(String value) throws Exception {//                count++;//                return value + ": " + count;//            }//        }).print();////        env.execute("Flink Count Counter Example");}}///验证数据///// 输入数据[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink

//控制台输出:
计数器行数:1
result:>(hello,1)
result2:>(hello,1,1)
result:>(123,1)
result2:>(123,1,1)
计数器行数:2
result2:>(alan,2,1)
result:>(alan,1)
result2:>(flink,2,1)
result:>(flink,1)
result2:>(good,2,1)
result:>(good,1)
计数器行数:3
result:>(alan_chan,1)
result2:>(alan_chan,3,1)
result:>(hi,1)
result2:>(hi,3,1)
result:>(flink,2)
result2:>(flink,2,2)

或者,您也可以使用自己的Counter实现:

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.metrics.Counter;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author alanchan
 *
 */publicclassTestMetricsDemo{publicstaticvoidtest2()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformation// Tuple3<String, Long, Integer> 输入的字符串,行数,统计单词的总数DataStream<Tuple3<String,Long,Integer>> result = lines.flatMap(newRichFlatMapFunction<String,Tuple2<String,Long>>(){privatetransientCounter counter;@Overridepublicvoidopen(Configuration config){this.counter =getRuntimeContext().getMetricGroup().counter("result2LineCounter",newAlanCustomCounter());}@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Long>> out)throwsException{this.counter.inc();//                result2LineCounter++;System.out.println("计数器行数:"+this.counter.getCount());String[] arr = value.split(",");for(String word : arr){
                    out.collect(Tuple2.of(word,this.counter.getCount()));}}}).map(newMapFunction<Tuple2<String,Long>,Tuple3<String,Long,Integer>>(){@OverridepublicTuple3<String,Long,Integer>map(Tuple2<String,Long> value)throwsException{returnTuple3.of(value.f0, value.f1,1);}}).keyBy(t -> t.f0).sum(2);// sink
        result.print("result:");

        env.execute();}publicstaticclassAlanCustomCounterimplementsCounter{privatelong count;@Overridepublicvoidinc(){
            count +=2;}@Overridepublicvoidinc(long n){
            count += n;}@Overridepublicvoiddec(){
            count -=2;}@Overridepublicvoiddec(long n){
            count -= n;}@OverridepubliclonggetCount(){return count;}}publicstaticvoidmain(String[] args)throwsException{test2();}}///验证数据///// 输入数据[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink

//控制台输出:
计数器行数:2
result:>(hello,2,1)
result:>(123,2,1)
计数器行数:4
result:>(alan,4,1)
result:>(flink,4,1)
result:>(good,4,1)
计数器行数:6
result:>(alan_chan,6,1)
result:>(hi,6,1)
result:>(flink,4,2)

3)、Gauge

仪表可根据需要提供任何类型的值。为了使用Gauge,您必须首先创建一个实现org.apache.flink.metrics.Guge接口的类。返回值的类型没有限制。您可以通过调用MetricGroup上的gauge(String name, Gauge gauge) 来注册gauge。

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.metrics.Counter;importorg.apache.flink.metrics.Gauge;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author alanchan
 *
 */publicclassTestMetricsGaugeDemo{//    public class MyMapper extends RichMapFunction<String, String> {//        private transient int valueToExpose = 0;////        @Override//        public void open(Configuration config) {//            getRuntimeContext().getMetricGroup().gauge("MyGauge", new Gauge<Integer>() {//                @Override//                public Integer getValue() {//                    return valueToExpose;//                }//            });//        }////        @Override//        public String map(String value) throws Exception {//            valueToExpose++;//            return value;//        }//    }publicstaticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String,String,Integer>> result = lines.flatMap(newRichFlatMapFunction<String,Tuple2<String,String>>(){privatelong result2LineCounter =0;privateGauge<String> gauge =null;@Overridepublicvoidopen(Configuration config){
                result2LineCounter =getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();

                gauge =getRuntimeContext().getMetricGroup().gauge("alanGauge",newGauge<String>(){@OverridepublicStringgetValue(){return"alan lines["+ result2LineCounter +"]";}});}@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,String>> out)throwsException{
                result2LineCounter++;System.out.println("计数器行数:"+ result2LineCounter);String[] arr = value.split(",");for(String word : arr){
                    out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(newMapFunction<Tuple2<String,String>,Tuple3<String,String,Integer>>(){@OverridepublicTuple3<String,String,Integer>map(Tuple2<String,String> value)throwsException{returnTuple3.of(value.f0, value.f1,1);}}).keyBy(t -> t.f0).sum(2);// sink
        result.print("result:");

        env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}///验证数据///// 输入数据[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink

//控制台输出:
计数器行数:1
result:>(hello,alan lines[1],1)
result:>(123,alan lines[1],1)
计数器行数:2
result:>(alan,alan lines[2],1)
result:>(flink,alan lines[2],1)
result:>(good,alan lines[2],1)
计数器行数:3
result:>(alan_chan,alan lines[3],1)
result:>(hi,alan lines[3],1)
result:>(flink,alan lines[2],2)

报告器会将暴露的对象转换为String,这意味着需要一个有意义的toString()实现。

4)、Histogram

直方图测量长值的分布。您可以通过调用MetricGroup上的histogram(String name, Histogram histogram) 来注册一个对象。
下面的示例是自己实现的Histogram接口,仅仅用于演示实现过程。

importjava.io.Serializable;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.metrics.Gauge;//import com.codahale.metrics.Histogram;importorg.apache.flink.metrics.Histogram;importorg.apache.flink.metrics.HistogramStatistics;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author alanchan
 *
 */publicclassTestMetricsHistogramDemo{//    public class MyMapper extends RichMapFunction<Long, Long> {//        private transient Histogram histogram;////        @Override//        public void open(Configuration config) {//            this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());//        }////        @Override//        public Long map(Long value) throws Exception {//            this.histogram.update(value);//            return value;//        }//    }publicstaticclassAlanHistogramimplementsHistogram{privateCircularDoubleArray descriptiveStatistics =newCircularDoubleArray(10);;publicAlanHistogram(){}publicAlanHistogram(int windowSize){this.descriptiveStatistics =newCircularDoubleArray(windowSize);}@Overridepublicvoidupdate(long value){this.descriptiveStatistics.addValue(value);}@OverridepubliclonggetCount(){returnthis.descriptiveStatistics.getElementsSeen();}@OverridepublicHistogramStatisticsgetStatistics(){//            return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);returnnull;}classCircularDoubleArrayimplementsSerializable{privatestaticfinallong serialVersionUID =1L;privatefinaldouble[] backingArray;privateint nextPos =0;privateboolean fullSize =false;privatelong elementsSeen =0;CircularDoubleArray(int windowSize){this.backingArray =newdouble[windowSize];}synchronizedvoidaddValue(double value){
                backingArray[nextPos]= value;++elementsSeen;++nextPos;if(nextPos == backingArray.length){
                    nextPos =0;
                    fullSize =true;}}synchronizeddouble[]toUnsortedArray(){finalint size =getSize();double[] result =newdouble[size];System.arraycopy(backingArray,0, result,0, result.length);return result;}privatesynchronizedintgetSize(){return fullSize ? backingArray.length : nextPos;}privatesynchronizedlonggetElementsSeen(){return elementsSeen;}}}publicstaticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String,String,Integer>> result = lines.flatMap(newRichFlatMapFunction<String,Tuple2<String,String>>(){privatelong result2LineCounter =0;privateGauge<String> gauge =null;privateHistogram histogram =null;;@Overridepublicvoidopen(Configuration config){
                result2LineCounter =getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();

                gauge =getRuntimeContext().getMetricGroup().gauge("alanGauge",newGauge<String>(){@OverridepublicStringgetValue(){return"alan lines["+ result2LineCounter +"]";}});this.histogram =getRuntimeContext().getMetricGroup().histogram("alanHistogram",newAlanHistogram());}@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,String>> out)throwsException{
                result2LineCounter++;this.histogram.update(result2LineCounter *3);// 此处仅仅示例this.histogram.getCount()的值,没有实际的意义System.out.println("计数器行数:"+ result2LineCounter +"  histogram:"+this.histogram.getCount());String[] arr = value.split(",");for(String word : arr){
                    out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(newMapFunction<Tuple2<String,String>,Tuple3<String,String,Integer>>(){@OverridepublicTuple3<String,String,Integer>map(Tuple2<String,String> value)throwsException{returnTuple3.of(value.f0, value.f1,1);}}).keyBy(t -> t.f0).sum(2);// sink
        result.print("result:");

        env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}///验证数据///// 输入数据[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink

//控制台输出:
计数器行数:1  histogram:1
result:>(hello,alan lines[1],1)
result:>(123,alan lines[1],1)
计数器行数:2  histogram:2
result:>(alan,alan lines[2],1)
result:>(flink,alan lines[2],1)
result:>(good,alan lines[2],1)
计数器行数:3  histogram:3
result:>(alan_chan,alan lines[3],1)
result:>(hi,alan lines[3],1)
result:>(flink,alan lines[2],2)

Flink没有提供直方图的默认实现,但提供了一个允许使用Codahale/DropWizard直方图的包装器。要使用此包装器,
在pom.xml中添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-dropwizard</artifactId><version>1.17.1</version></dependency>

下面的示例是使用 Codahale/DropWizard直方图,如下所示:

importjava.io.Serializable;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;importorg.apache.flink.metrics.Gauge;//import com.codahale.metrics.Histogram;importorg.apache.flink.metrics.Histogram;importorg.apache.flink.metrics.HistogramStatistics;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importcom.codahale.metrics.SlidingWindowReservoir;/**
 * @author alanchan
 *
 */publicclassTestMetricsHistogramDemo{publicstaticvoidtest2()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String,String,Integer>> result = lines.flatMap(newRichFlatMapFunction<String,Tuple2<String,String>>(){privatelong result2LineCounter =0;privateGauge<String> gauge =null;privateHistogram histogram =null;;@Overridepublicvoidopen(Configuration config){
                result2LineCounter =getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();

                gauge =getRuntimeContext().getMetricGroup().gauge("alanGauge",newGauge<String>(){@OverridepublicStringgetValue(){return"alan lines["+ result2LineCounter +"]";}});com.codahale.metrics.Histogram dropwizardHistogram =newcom.codahale.metrics.Histogram(newSlidingWindowReservoir(500));//                this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());this.histogram =getRuntimeContext().getMetricGroup().histogram("alanHistogram",newDropwizardHistogramWrapper(dropwizardHistogram));}@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,String>> out)throwsException{
                result2LineCounter++;this.histogram.update(result2LineCounter *3);// 此处仅仅示例this.histogram.getCount()的值,没有实际的意义System.out.println("计数器行数:"+ result2LineCounter +"  histogram:"+this.histogram.getCount());String[] arr = value.split(",");for(String word : arr){
                    out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(newMapFunction<Tuple2<String,String>,Tuple3<String,String,Integer>>(){@OverridepublicTuple3<String,String,Integer>map(Tuple2<String,String> value)throwsException{returnTuple3.of(value.f0, value.f1,1);}}).keyBy(t -> t.f0).sum(2);// sink
        result.print("result:");

        env.execute();}publicstaticvoidmain(String[] args)throwsException{test2();}}///验证数据///// 输入数据[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink

//控制台输出://控制台输出:
计数器行数:1  histogram:1
result:>(hello,alan lines[1],1)
result:>(123,alan lines[1],1)
计数器行数:2  histogram:2
result:>(alan,alan lines[2],1)
result:>(flink,alan lines[2],1)
result:>(good,alan lines[2],1)
计数器行数:3  histogram:3
result:>(alan_chan,alan lines[3],1)
result:>(hi,alan lines[3],1)
result:>(flink,alan lines[2],2)

5)、Meter

仪表测量平均吞吐量。可以使用markEvent()方法注册事件的发生。可以使用markEvent(long n)方法注册同时发生多个事件。您可以通过在MetricGroup上调用meter(String name, Meter meter)来注册meter。

下面的示例展示了自定义的Meter实现,可能很不严谨,实际上应用更多的是本部分的第二个示例。

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;importorg.apache.flink.metrics.Counter;importorg.apache.flink.metrics.Gauge;importorg.apache.flink.metrics.Histogram;importorg.apache.flink.metrics.Meter;importorg.apache.flink.metrics.SimpleCounter;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;//import com.codahale.metrics.Meter;importcom.codahale.metrics.SlidingWindowReservoir;/**
 * @author alanchan
 *
 */publicclassTestMetricsMeterDemo{publicclassMyMapperextendsRichMapFunction<Long,Long>{privatetransientMeter meter;@Overridepublicvoidopen(Configuration config){this.meter =getRuntimeContext().getMetricGroup().meter("myMeter",newAlanMeter());}@OverridepublicLongmap(Long value)throwsException{this.meter.markEvent();return value;}}publicstaticclassAlanMeterimplementsMeter{/** The underlying counter maintaining the count. */privatefinalCounter counter =newSimpleCounter();;/** The time-span over which the average is calculated. */privatefinalint timeSpanInSeconds =0;/** Circular array containing the history of values. */privatefinallong[] values =null;;/** The index in the array for the current time. */privateint time =0;/** The last rate we computed. */privatedouble currentRate =0;@OverridepublicvoidmarkEvent(){this.counter.inc();}@OverridepublicvoidmarkEvent(long n){this.counter.inc(n);}@OverridepubliclonggetCount(){return counter.getCount();}@OverridepublicdoublegetRate(){return currentRate;}publicvoidupdate(){
            time =(time +1)% values.length;
            values[time]= counter.getCount();
            currentRate =((double)(values[time]- values[(time +1)% values.length])/ timeSpanInSeconds);}}publicstaticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String,String,Integer>> result = lines.flatMap(newRichFlatMapFunction<String,Tuple2<String,String>>(){privatelong result2LineCounter =0;privateGauge<String> gauge =null;privateHistogram histogram =null;privateMeter meter;@Overridepublicvoidopen(Configuration config){
                result2LineCounter =getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();

                gauge =getRuntimeContext().getMetricGroup().gauge("alanGauge",newGauge<String>(){@OverridepublicStringgetValue(){return"alan lines["+ result2LineCounter +"]";}});com.codahale.metrics.Histogram dropwizardHistogram =newcom.codahale.metrics.Histogram(newSlidingWindowReservoir(500));this.histogram =getRuntimeContext().getMetricGroup().histogram("alanHistogram",newDropwizardHistogramWrapper(dropwizardHistogram));this.meter =getRuntimeContext().getMetricGroup().meter("alanMeter",newAlanMeter());}@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,String>> out)throwsException{
                result2LineCounter++;this.histogram.update(result2LineCounter *3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值,没有实际的意义,具体使用以实际使用场景为准System.out.println("计数器行数:"+ result2LineCounter +",  histogram:"+this.histogram.getCount()+",   meter.getRate:"+this.meter.getRate());String[] arr = value.split(",");for(String word : arr){
                    out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(newMapFunction<Tuple2<String,String>,Tuple3<String,String,Integer>>(){@OverridepublicTuple3<String,String,Integer>map(Tuple2<String,String> value)throwsException{returnTuple3.of(value.f0, value.f1,1);}}).keyBy(t -> t.f0).sum(2);// sink
        result.print("result:");

        env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}///验证数据///// 输入数据[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink

//控制台输出:
计数器行数:1,  histogram:1,   meter.getRate:0.0
result:>(hello,alan lines[1],1)
result:>(123,alan lines[1],1)
计数器行数:2,  histogram:2,   meter.getRate:0.0
result:>(alan,alan lines[2],1)
result:>(flink,alan lines[2],1)
result:>(good,alan lines[2],1)
计数器行数:3,  histogram:3,   meter.getRate:0.0
result:>(alan_chan,alan lines[3],1)
result:>(hi,alan lines[3],1)
result:>(flink,alan lines[2],2)

Flink提供了一个允许使用Codahale/DropWizard仪表的包装器。要使用此包装器,
在pom.xml中添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-dropwizard</artifactId><version>1.17.1</version></dependency>

下面使用Codahale/DropWizard注册的示例,如下所示:

importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;importorg.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;importorg.apache.flink.metrics.Counter;importorg.apache.flink.metrics.Gauge;importorg.apache.flink.metrics.Histogram;importorg.apache.flink.metrics.Meter;importorg.apache.flink.metrics.SimpleCounter;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;//import com.codahale.metrics.Meter;importcom.codahale.metrics.SlidingWindowReservoir;/**
 * @author alanchan
 *
 */publicclassTestMetricsMeterDemo{publicstaticvoidtest2()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42",9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String,String,Integer>> result = lines.flatMap(newRichFlatMapFunction<String,Tuple2<String,String>>(){privatelong result2LineCounter =0;privateGauge<String> gauge =null;privateHistogram histogram =null;privateMeter meter;@Overridepublicvoidopen(Configuration config){
                result2LineCounter =getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();

                gauge =getRuntimeContext().getMetricGroup().gauge("alanGauge",newGauge<String>(){@OverridepublicStringgetValue(){return"alan lines["+ result2LineCounter +"]";}});com.codahale.metrics.Histogram dropwizardHistogram =newcom.codahale.metrics.Histogram(newSlidingWindowReservoir(500));this.histogram =getRuntimeContext().getMetricGroup().histogram("alanHistogram",newDropwizardHistogramWrapper(dropwizardHistogram));//                this.meter = getRuntimeContext().getMetricGroup().meter("alanMeter", new AlanMeter());com.codahale.metrics.Meter dropwizardMeter =newcom.codahale.metrics.Meter();this.meter =getRuntimeContext().getMetricGroup().meter("alanMeter",newDropwizardMeterWrapper(dropwizardMeter));}@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,String>> out)throwsException{
                result2LineCounter++;this.histogram.update(result2LineCounter *3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值,没有实际的意义,具体使用以实际使用场景为准System.out.println("计数器行数:"+ result2LineCounter +",  histogram:"+this.histogram.getCount()+",   meter.getRate:"+this.meter.getRate());String[] arr = value.split(",");for(String word : arr){
                    out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(newMapFunction<Tuple2<String,String>,Tuple3<String,String,Integer>>(){@OverridepublicTuple3<String,String,Integer>map(Tuple2<String,String> value)throwsException{returnTuple3.of(value.f0, value.f1,1);}}).keyBy(t -> t.f0).sum(2);// sink
        result.print("result:");

        env.execute();}publicstaticvoidmain(String[] args)throwsException{test2();}}//控制台输出:
计数器行数:1,  histogram:1,   meter.getRate:0.0
result:>(hello,alan lines[1],1)
result:>(123,alan lines[1],1)
计数器行数:2,  histogram:2,   meter.getRate:0.0
result:>(alan,alan lines[2],1)
result:>(flink,alan lines[2],1)
result:>(good,alan lines[2],1)
计数器行数:3,  histogram:3,   meter.getRate:0.0
result:>(alan_chan,alan lines[3],1)
result:>(hi,alan lines[3],1)
result:>(flink,alan lines[2],2)

以上,本文简单的介绍了Flink 的指标体系的第一部分,即指标类型以及四种类型的代码实现示例。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版


本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/134537064
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。

“45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例”的评论:

还没有评论