0


Streampark集成Cloudera Flink、ldap、告警,以及部署常见问题

集成背景

我们当前集群使用的是Cloudera CDP,Flink版本为Cloudera Version 1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置Flink Home,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。

集成步骤

版本匹配问题解决

首先解决无法识别Cloudera中的Flink Home问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。

修改对象:

修改源码:(解决无法匹配cloudera jar包)

class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logger {

  private[this] lazy val FLINK_VER_PATTERN = Pattern.compile("^(\\d+\\.\\d+)(\\.)?.*$")

  private[this] lazy val FLINK_VERSION_PATTERN = Pattern.compile("^Version: (\\d+\\.\\d+\\.\\d)(-csa)?(\\d+\\.\\d+\\.\\d+\\.\\d)?, Commit ID: (.*)$")

  private[this] lazy val FLINK_SCALA_VERSION_PATTERN = Pattern.compile("^flink-dist_(\\d+\\.\\d*)-(\\d+\\.\\d+\\.\\d)(-csa.*)?.jar$")

  lazy val scalaVersion: String = {
    val matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
    if (matcher.matches()) {
      matcher.group(1);
    } else {
      // flink 1.15 + on support scala 2.12
      "2.12"
    }
  }

  lazy val fullVersion: String = s"${version}_$scalaVersion"

  lazy val flinkLib: File = {
    require(flinkHome != null, "[StreamPark] flinkHome must not be null.")
    require(new File(flinkHome).exists(), "[StreamPark] flinkHome must be exists.")
    val lib = new File(s"$flinkHome/lib")
    require(lib.exists() && lib.isDirectory, s"[StreamPark] $flinkHome/lib must be exists and must be directory.")
    lib
  }

  lazy val flinkLibs: List[NetURL] = flinkLib.listFiles().map(_.toURI.toURL).toList

  lazy val version: String = {
    val flinkVersion = new AtomicReference[String]
    val cmd = List(s"java -classpath ${flinkDistJar.getAbsolutePath} org.apache.flink.client.cli.CliFrontend --version")
    val success = new AtomicBoolean(false)
    val buffer = new mutable.StringBuilder
    CommandUtils.execute(
      flinkLib.getAbsolutePath,
      cmd,
      new Consumer[String]() {
        override def accept(out: String): Unit = {
          buffer.append(out).append("\n")
          val matcher = FLINK_VERSION_PATTERN.matcher(out)
          if (matcher.find) {
            success.set(true)
            flinkVersion.set(matcher.group(1))
          }
        }
      })
    logInfo(buffer.toString())
    if (!success.get()) {
      throw new IllegalStateException(s"[StreamPark] parse flink version failed. $buffer")
    }
    buffer.clear()
    flinkVersion.get
  }

  // flink major version, like "1.13", "1.14"
  lazy val majorVersion: String = {
    if (version == null) {
      null
    } else {
      val matcher = FLINK_VER_PATTERN.matcher(version)
      matcher.matches()
      matcher.group(1)
    }
  }

  lazy val flinkDistJar: File = {
    val distJar = flinkLib.listFiles().filter(_.getName.matches("flink-dist.*\\.jar"))
    distJar match {
      case x if x.isEmpty =>
        throw new IllegalArgumentException(s"[StreamPark] can no found flink-dist jar in $flinkLib")
      case x if x.length > 1 =>
        throw new IllegalArgumentException(s"[StreamPark] found multiple flink-dist jar in $flinkLib")
      case _ =>
    }
    distJar.head
  }

  // StreamPark flink shims version, like "streampark-flink-shims_flink-1.13"
  lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion"

  override def toString: String =
    s"""
       |----------------------------------------- flink version -----------------------------------
       |     flinkHome    : $flinkHome
       |     distJarName  : ${flinkDistJar.getName}
       |     flinkVersion : $version
       |     majorVersion : $majorVersion
       |     scalaVersion : $scalaVersion
       |     shimsVersion : $shimsVersion
       |-------------------------------------------------------------------------------------------
       |""".stripMargin

}

Flink Home指定

由于Cloudera Flink的默认安装路径为/opt/cloudera/parcels/Flink-${version},而执行/opt/cloudera/parcels/Flink-${version}/bin/flink 为整体环境配置,vi flink可查看到具体过程

实际的flink提交路径在/opt/cloudera/parcels/Flink-${version}/lib/flink/bin/flink,因此/opt/cloudera/parcels/Flink-${version}/lib/flink可以理解为真正的Flink Home,具体查看该目录下内容

发现缺少conf目录,倘若配置该目录在Streampark为Flink Home将无法访问到集群,因此可软连接Flink配置或者在该路径下编辑集群中的Flink配置文件。

综上,前置配置和打包好代码(代码中可能会涉及到自己使用上的优化修改)之后,可以进行部署。

注意2.0的版本打包的话直接执行源码中的build.sh即可,选择混合部署,生成的包在dist目录下。

部署流程

前置部署流程建议参考官方步骤安装部署 | Apache StreamPark (incubating)

特别注意需要对元数据库进行初始化以及初始数据插入,执行sql在${streamarkHome}/script/data&schema

根据官方的意思需要将mysql的connector添加到lib目录下,不然无法连接数据库。

在conf/application.yml中修改数据源为mysql,配置好集群中使用到的用户(默认hdfs),默认在hdfs创建streampark的工作目录hdfs:///streampark。

部署结果验证

部署完成之后,执行bin下的startup.sh 可以启动集群,在web上进入部署地址ip:port(默认10000)

使用默认账号 admin streampark可以进去

登录进去之后点击设置中心可以进行Flink home的配置

LDAP集成

主要是需要配置conf下application.yml中的ldap配置信息即可,然后重启streampark。

使用与踩坑点:

登录选择LDAP登录

利用公司LDAP登录之后,提示

但是刚才使用ladp登录的用户,在streampark上已经创建对应的用户,须在成员管理里面将刚才创建的用户添加到对应的团队中,刚才那个用户才可以登录。

告警配置

主要配置的是企业微信告警,在设置中心配置企业微信机器人的token(注意公司环境为内网的话,需要在代码中修改对应的url,拼接为内网发送地址)
告警模板在代码中的修改路径为:

中间修改了的告警模板,重新打包一下即可。

一些问题及解决办法

一、Hadoop环境

解决办法:在部署Streampark的节点上添加一下hadoop环境即可

vi /etc/profile

source一下,重启streampark即可

二、依赖jar的初始化

解决办法:在于部署后的streampark在hdfs上的工作目录上lib目录没有正常上传,找到hdfs上初始化的strempark work路径,观察一下hdfs:///streampark/flink/.../下的lib目录是否完整,不完整的话手动将本地Flink Home目录下的lib put上去即可。


本文转载自: https://blog.csdn.net/Zhanghao149/article/details/129363333
版权归原作者 酥酥饼一号 所有, 如有侵权,请联系我们删除。

“Streampark集成Cloudera Flink、ldap、告警,以及部署常见问题”的评论:

还没有评论