I have following code that wants to write the data generated with datagen to a file, but when I run the application, no target directory is created, and no data is written.
When I add env.execute()
at the end of the code, it complains that No operators defined in streaming topology. Cannot execute.
I would ask how to make the application work, thanks.
test("insert into table") { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tenv = StreamTableEnvironment.create(env) val ddl = """ create temporary table abc( name STRING, age INT ) with ( 'connector' = 'datagen' ) """.stripMargin(' ') tenv.executeSql(ddl) val sql = """ select * from abc """.stripMargin(' ') val sinkDDL = s""" create temporary table xyz( name STRING, age INT ) with ( 'connector' = 'filesystem', 'path' = 'D:\\${System.currentTimeMillis()}-csv' , 'format' = 'csv' ) """.stripMargin(' ') tenv.executeSql(sinkDDL) val insertInSQL = """ insert into xyz select name, age from abc """.stripMargin(' ') tenv.executeSql(insertInSQL) // env.execute() }
https://stackoverflow.com/questions/65527490/no-directory-is-create-no-data-is-written January 01, 2021 at 01:36PM
没有评论:
发表评论