0


Flink 客户端操作命令及可视化工具

Flink

提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从

Flink

命令行、

Scala Shell

SQL Client

Restful API

Web

五个方面进行整理。

Flink

安装目录的

bin

目录下可以看到

flink

start-scala-shell.sh

sql-client.sh

等文件,这些都是客户端操作的入口。
[点击并拖拽以移动] ​

flink 常见操作:可以通过 -help 查看帮助

run 运行任务

-d

:以分离模式运行作业

-c

:如果没有在

jar

包中指定入口类,则需要在这里通过这个参数指定;

-m

:指定需要连接的

jobmanager

(主节点)地址,使用这个参数可以指定一个不同于配置文件中的

jobmanager

,可以说是

yarn

集群名称;

-p

:指定程序的并行度。可以覆盖配置文件中的默认值;

-s

:保存点

savepoint

的路径以还原作业来自(例如

hdfs:///flink/savepoint-1537

);

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID dce7b69ad15e8756766967c46122736f

就可以看到我们提交的

JobManager

,默认是一个并发。
[点击并拖拽以移动] ​

点进去就可以看到详细的信息
[点击并拖拽以移动] ​

点击左侧

TaskManager —Stdout

能看到具体输出的日志信息。
[点击并拖拽以移动] ​

或者查看

TaskManager

节点的

log

目录下的

*.out

文件,也能看到具体的输出信息。
[点击并拖拽以移动] ​

list 查看任务列表

-m

jobmanager<arg>

作业管理器(主)的地址连接。

[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop 停止任务

需要指定

jobmanager

ip:prot

jobId

。如下报错可知,一个

job

能够被

stop

要求所有的

source

都是可以

stoppable

的,即实现了

StoppableFunction

接口。

[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
Suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".
    at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)
StoppableFunction

接口如下,属于优雅停止任务。

/**
 * @Description 需要 stoppabel 的函数必须实现此接口,例如流式任务 source*
 *               stop() 方法在任务收到 stop信号的时候调用
 *               source 在接收到这个信号后,必须停止发送新的数据优雅的停止。
 * @Date 2020/7/9 17:26
 */@PublicEvolvingpublicinterfaceStoppableFunction{/**
     * 停止 source,与 cancel() 不同的是,这是一个让 source优雅停止的请求。
     * 等待中的数据可以继续发送出去,不需要立即停止
    */voidstop();}

Cancel 取消任务

如果在

conf/flink-conf.yaml

里面配置

state.savepoints.dir

,会保存

savepoint

,否则不会保存

savepoint

。(重启)

state.savepoints.dir: file:///tmp/savepoint

执行 **

Cancel

命令** 取消任务

[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.

也可以在**停止的时候显示指定

savepoint

目录**

1[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081-s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
DEPRECATIONWARNING:Cancelling a job withsavepoint is deprecated. Use"stop"instead.
Cancelling job f58bb4c49ee5580ab5f27fdb24083353 withsavepointto/tmp/savepoint.
Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.

取消和停止(流作业)的区别如下:
● **

cancel()

调用**, 立即调用作业算子的

cancel()

方法,以尽快取消它们。如果算子在接到

cancel()

调用后没有停止,

Flink

将开始定期中断算子线程的执行,直到所有算子停止为止。
● **

stop()

调用** ,是更优雅的停止正在运行流作业的方式。

stop()

仅适用于

source

实现了

StoppableFunction

接口的作业。当用户请求停止作业时,作业的所有

source

都将接收

stop()

方法调用。直到所有

source

正常关闭时,作业才会正常结束。这种方式,使 作业正常处理完所有作业。

触发 savepoint

当需要生成

savepoint

文件时,需要**手动触发

savepoint

**。如下,需要指定正在运行的 JobID 和生成文件的存放目录。同时,我们也可以看到它会返回给用户存放的

savepoint

的文件名称等信息。

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
 Executing TopSpeedWindowing example with default input data set.
 Use --input to specify file input.
 Printing result to stdout. Use --output to specify output path.
 Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d
 [root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/
 Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.
 Waiting for response...
 Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfd
 You can resume your program from this savepoint with the run command.

**

savepoint

checkpoint

的区别:**

checkpoint

是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;

savepoint

是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。

checkpoint

是作业

failover

的时候自动使用,不需要用户指定。

savepoint

一般用于程序的版本更新,

bug

修复,

A/B Test

等场景,需要用户指定。

从指定 savepoint 中启动

[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2

查看

JobManager

的日志能够看到

Reset the checkpoint ID

为我们指定的

savepoint

文件中的

ID

[点击并拖拽以移动] ​

modify 修改任务并行度

这里修改

master

conf/flink-conf.yaml

task slot

数修改为

4

。并通过

xsync

分发到 两个

slave

节点上。

taskmanager.numberOfTaskSlots:4

修改参数后需要重启集群生效:关闭/启动集群

[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh 
Stopping taskexecutor daemon (pid: 8236) on host hadoop2.
Stopping taskexecutor daemon (pid: 8141) on host hadoop3.
Stopping standalonesession daemon (pid: 22633) on host hadoop1.
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.

启动任务

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a

从页面上能看到

Task Slots

总计变为了

8

,运行的

Slot

1

,剩余

Slot

数量为

7


[点击并拖拽以移动] ​

这时候默认的并行度是

1

[点击并拖拽以移动] ​

Flink1.0

版本命令行

flink modify

已经没有这个行为了,被移除了。。。

Flink1.7

上是可以运行的。

[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3"modify" is not a valid action.

Info 显示程序的执行计划

[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar 
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

拷贝输出的

json

内容,粘贴到这个网站:

http://flink.apache.org/visualizer/

可以生成类似如下的执行图。

[点击并拖拽以移动] ​

可以与实际运行的物理执行计划进行对比。
[点击并拖拽以移动] ​

SQL Client Beta

进入 Flink SQL

[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded
Select

查询,按

Q

退出如下界面;

Flink SQL>select'hello word';SQL Query Result (Table)Table program finished.                                                                                       Page: Lastof1                                                                                         Updated: 16:37:04.649

                    EXPR$0
                hello word

Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O OpenRow
R Refresh                                      -Dec Refresh                                  L Last Page                                    P Prev Page

打开

http://hadoop1:8081

能看到这条

select

语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的

Custom Source

,输出用的是

Stream Collect Sink

,且只输出一条结果。
[点击并拖拽以移动] ​

[点击并拖拽以移动] ​

explain 查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;== Abstract Syntax Tree ==         //抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob'}, { _UTF-16LE'Alice'}, { _UTF-16LE'Greg'}, { _UTF-16LE'Bob'}]])== Optimized Logical Plan ==      //优化后的逻辑执行计划
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])
   +- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob'}, { _UTF-16LE'Alice'}, { _UTF-16LE'Greg'}, { _UTF-16LE'Bob'}]])== Physical Execution Plan ==    //物理执行计划
Stage 13: Data Source
    content : Source: Values(tuples=[[{ _UTF-16LE'Bob'}, { _UTF-16LE'Alice'}, { _UTF-16LE'Greg'}, { _UTF-16LE'Bob'}]])

    Stage 15: Operator
        content : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
        ship_strategy : HASH

结果展示

SQL Client

支持两种模式来维护并展示查询结果:

table mode

在内存中物化查询结果,并以分页

table

形式展示。用户可以通过以下命令启用

table mode

:例如如下案例;

Flink SQL>SET execution.result-mode=table;[INFO]Session property has been set.

Flink SQL>SELECT name,COUNT(*)AS cnt FROM(VALUES('Bob'),('Alice'),('Greg'),('Bob'))AS NameTable(name)GROUPBY name;SQL Query Result (Table)Table program finished.                                                                                       Page: Lastof1                                                                                         Updated: 16:55:08.589

                      name                       cnt
                     Alice                         1
                      Greg                         1
                       Bob                         2

Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O OpenRow
R Refresh                                      -Dec Refresh                                  L Last Page                                    P Prev Page

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

changelog mode

不会物化查询结果,而是直接对

continuous query

产生的添加和撤回

retractions

结果进行展示:如下案例中的-表示撤回消息

Flink SQL>SET execution.result-mode=changelog;[INFO]Session property has been set.

Flink SQL>SELECT name,COUNT(*)AS cnt FROM(VALUES('Bob'),('Alice'),('Greg'),('Bob'))AS NameTable(name)GROUPBY name;SQL Query Result (Changelog)Table program finished.                                                                                                                                                                                               Updated: 16:58:05.777+/-                      name                       cnt
   +                       Bob                         1+                     Alice                         1+                      Greg                         1-                       Bob                         1+                       Bob                         2

Q Quit                                                                        + Inc Refresh                                                                 O OpenRow
R Refresh                                                                     -Dec Refresh

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

Environment Files

**

CREATE TABLE

** 创建表

DDL

语句:

Flink SQL>CREATETABLE pvuv_sink (>     dt VARCHAR,>     pv BIGINT,>     uv BIGINT>);[INFO]Table has been created.

**

SHOW TABLES

** 查看所有表名

Flink SQL>showtables;
pvuv_sink

**

DESCRIBE 表名

** 查看表的详细信息;

Flink SQL>describe pvuv_sink;
root
 |-- dt: STRING|-- pv: BIGINT|-- uv: BIGINT

**插入等操作均与关系型数据库操作语句一样,省略

N

个操作**

Restful API

接下来我们演示如何通过

rest api

来提交

jar

包和执行任务。
[点击并拖拽以移动] ​

通过

Show Plan

可以看到执行图
[点击并拖拽以移动] ​

提交之后的操作,取消的话点击页面的

Cancel Job

​ [点击并拖拽以移动] ​​

标签: flink python 大数据

本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135187846
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Flink 客户端操作命令及可视化工具”的评论:

还没有评论