0


Flink学习:Flink常见报错

Flink Error

一、org.apache.flink.table.api.TableException: Only the first field can reference an atomic type

程序如下:

importorg.apache.flink.api.scala.createTypeInformationimportorg.apache.flink.streaming.api.environment._
importorg.apache.flink.api.java.tuple._
importorg.apache.flink.table.api.scala.table2TableConversionsimportorg.apache.flink.table.api.{TableEnvironment, Types}importorg.apache.flink.table.sources.CsvTableSource
importorg.apache.flink.types.Row

object sqlTest {def main(args: Array[String]):Unit={val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(sEnv)val stream = sEnv.fromElements((192,"nie"),(200,"hu"))
    tEnv.registerDataStream("testTable", stream,"id,name")val result = tEnv.sqlQuery("select * from testTable where id = 192")}}

看网上解释:tuple要使用java包里面的(scala import org.apache.flink.api.java.tuple._),而不是scala自带的tuple,不然也会认为是geneic类型,导致报错,修改后代码如下:

importorg.apache.flink.api.scala.createTypeInformationimportorg.apache.flink.streaming.api.environment._
importorg.apache.flink.api.java.tuple._
importorg.apache.flink.table.api.scala.table2TableConversionsimportorg.apache.flink.table.api.{TableEnvironment, Types}importorg.apache.flink.table.sources.CsvTableSource
importorg.apache.flink.types.Row

object sqlTest {def main(args: Array[String]):Unit={val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(sEnv)val stream = sEnv.fromElements(new Tuple2(192,"nie"),new Tuple2(200,"zhu"))
    tEnv.registerDataStream("testTable", stream,"id,name")val result = tEnv.sqlQuery("select * from testTable where id = 192")//result.toRetractStream[Row].print()//val csvSource = new CsvTableSource("path",)//tEnv.registerTableSource("CsvTable",csvSource)}}

二、Only tables that originate from Scala DataStreams can be converted to Scala DataStreams

程序如下:

importorg.apache.flink.api.scala.createTypeInformationimportorg.apache.flink.api.java.tuple._
importorg.apache.flink.api.scala.ExecutionEnvironment
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment
importorg.apache.flink.table.api.scala._
importorg.apache.flink.table.api.TableEnvironment
importorg.apache.flink.types.Row

object SqlTest {def main(args: Array[String]):Unit={val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)//val InmemCatalog = new InMemoryExternalCatalog()//tEnv.registerExternalCatalog("externalCatalog",InmemCatalog)val stream = streamEnv.fromElements(new Tuple2(192,"nie"),new Tuple2(200,"hu"))

    tEnv.registerDataStream("testTable", stream,"id,name")//val result = tEnv.scan("testTable").select("id")//print(result)val result = tEnv.sqlQuery("select * from testTable where id = 192")
    result.toRetractStream[(Long,String)].print()

    streamEnv.execute("test")//tEnv.sqlUpdate("insert into csv_output_table select product,amount from sensors where type = 'temperature")}}

后来发现是导入错了包

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment
应该改为
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment

三、Field reference expression or alias on field expression expected.

还是上面那个程序,改完之后,还报上述错误

下面这段代码有错误
tEnv.registerDataStream("testTable", stream,"id,name")
应该改为
tEnv.registerDataStream("testTable", stream,'id, 'name),用单引号括起来
如果使用importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment,使用单引号括起来是会报错的

四、Result field does not match requested type. Requested: Long; Actual: Integer

还是上面那段程序,还有报错

result.toRetractStream[(Long,String)].print()
后面中的类型应该改为Row
result.toRetractStream[Row].print()
标签: flink 学习 scala

本文转载自: https://blog.csdn.net/nzbing/article/details/129475039
版权归原作者 我爱夜来香A 所有, 如有侵权,请联系我们删除。

“Flink学习:Flink常见报错”的评论:

还没有评论