0


Flink Rest API接口使用

官方Flink Rest API文档

REST API | Apache Flink

Flink接口调用地址

   用户可以通过flink提供的Rest API管理应用。Rest API可供用户或脚本直接访问,它可以对外公开有关Flink集群和应用的信息。flink使用Web服务器来同时支持Rest API和Web UI,该服务器会作为Dispatcher进程的一部分(Dispatcher启动JobManager)来运行。默认情况下二者都会使用8081端口。我们可以通过./conf/flink-conf.yaml来设置web服务器的ip和端口:rest.address,rest.port,同时为了避免我们提交的项目在每次集群启动时被删掉,我们需要配置项目的固定存储地址web.upload.dir,这样我们通过web提交的项目就会被保存到这个位置,如果不配置该地址,提交的项目会被放到临时文件中去,集群重启就会被删掉,很不方便。

如下配置:

#==============================================================================
# Rest & web frontend
#==============================================================================

# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8081

# The address to which the REST client will connect to
#
rest.address: 0.0.0.0

# Port range for the REST and web server to bind to.
#
#rest.bind-port: 8080-8090

# The address that the REST & web server binds to
#
#rest.bind-address: 0.0.0.0

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

web.submit.enable: true

上传jar包

{
    "filename": "/tmp/flink-web-c3ba80f2-0cd4-47ab-93b3-936dae41a704/flink-web-upload/e366a315-6bd4-43d4-ad29-baf7d6114dd3_ConnectedComponents.jar",
    "status": "success"
}
  • 请求实例:
http://192.168.131.129:8081/v1/jars/upload

或者可以使用Flink的web ui页面自己提交项目程序:

获取提交的项目id

{
    "address": "http://0.0.0.0:8081",
    "files": [
        {
            "id": "e366a315-6bd4-43d4-ad29-baf7d6114dd3_ConnectedComponents.jar",
            "name": "ConnectedComponents.jar",
            "uploaded": 1663050489000,
            "entry": [
                {
                    "name": "org.apache.flink.examples.java.graph.ConnectedComponents",
                    "description": null
                }
            ]
        }
    ]
}

运行程序接口

  1. entry-class 程序入口类
  2. programArgsList:Array
  3. programArg:String
  4. parallelism:Integer

返回结果:jobid

{
    "jobid": "e366a315-6bd4-43d4-ad29-baf7d6114dd3"
}

请求示例1:

1、http://192.168.131.129:8081/jars/6814473b-20fb-49e0-9e2b-08b1c9967014_WordCount.jar/run?entry-class=io.github.streamingwithflink.demo.BatchCountFromJdbc&programArgsList=--bootstrap.servers hadoop2.test.yunwei.puppet.dh:6667,hadoop3.test.yunwei.puppet.dh:6667,hadoop4.test.yunwei.puppet.dh:6667 --group.id m3gcn_tab_map_race_3 --client.id m3gcn_tab_map_race_3 --config.file hdfs://DHTestCluster/user/gezhihui/flink/configuration/m3gcn_tab_map_race2.txt --job.name m3gcn_tab_map_race
2、http://192.168.131.129:8081/jars/6814473b-20fb-49e0-9e2b-08b1c9967014_WordCount.jar/run
{
    "entryClass":"io.github.streamingwithflink.demo.BatchCountFromJdbc"
    "programArgsList": [
        "--name",
        "zhansan"]
}
 
请求体中的programArgsList可以传值到main函数的string[]args,取值可以通过ParameterTool parameters = 
ParameterTool.fromArgs(args);String name = parameters.get("name");所以他的key-value的格式是key为"--key"或者"-key",
value为key的下一行,根据上述代码,value不要以"-"作为开头,这里也可以自己去解析String[],这里args就对应我们programArgsList
的传参,如3所示
 
3、http://192.168.131.129:8081/jars/6814473b-20fb-49e0-9e2b-08b1c9967014_WordCount.jar/run
{
    "entryClass":"io.github.streamingwithflink.demo.BatchCountFromJdbc"
    "programArgsList": [
        "lisi",
        "zhansan"]
}

删除某个jar包

http://192.168.131.129:8081/jars/6814473b-20fb-49e0-9e2b-08b1c9967014_WordCount.jar

取消某个job

http://192.168.196.3:8081/jobs/e366a315-6bd4-43d4-ad29-baf7d6114dd3/cancel

查看所有的job

http://192.168.131.129:8081/jobs/overview

查看某个job的具体信息

http://192.168.131.129:8081/jobs/f5d30b564a1f90f8fd9f5d4a44e26437

查看某个job的checkpoints

http://192.168.131.129:8081/jobs/f5d30b564a1f90f8fd9f5d4a44e26437/checkpoints
标签: flink 大数据

本文转载自: https://blog.csdn.net/Angel_asp/article/details/126834061
版权归原作者 angelasp 所有, 如有侵权,请联系我们删除。

“Flink Rest API接口使用”的评论:

还没有评论