0


flink group by

from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

write all the data to one file

t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path('/tmp/input'))
.with_format(OldCsv().field('word', DataTypes.STRING()))
.with_schema(Schema().field('word', DataTypes.STRING()))
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output'))
.with_format(OldCsv().field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT()))
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT()))
.create_temporary_table('mySink')

tab = t_env.from_path('mySource')
print(dir(tab))
tab.group_by(tab.word).select(tab.word, lit(1).count).execute_insert('mySink').wait()

标签: java 开发语言

本文转载自: https://blog.csdn.net/zhaoyangjian724/article/details/130780843
版权归原作者 scan724 所有, 如有侵权,请联系我们删除。

“flink group by”的评论:

还没有评论