2021年4月22日星期四

How to call a method in the foreachBatch during writeStream in Spark

I am trying to call a method using foreachBatch while doing writeStream. Here is the sample.

import org.apache.spark.sql.functions._  import org.apache.spark.sql.DataFrame    def transformStream(inputDf: DataFrame): DataFrame = {         inputDf.withColumn("S_TIMESTAMP",to_timestamp(col("timestamp")/1000))                .withColumn("S_DATE",to_date(col("S_TIMESTAMP"),"yyyy-MM-dd"))                .withColumn("FILE_NAME", input_file_name())                .withColumn("PROCESS_DATE", current_timestamp())  }     // Writing the stream   readstreamDf  .writeStream                                            .format("delta")    .foreachBatch(transformStream _)  .option("checkpointLocation", checkPoint)     .option("path", datapath)  .outputMode("append")                                            .partitionBy("S_DATE")  .start()    // Write stream throwing error  error: overloaded method value foreachBatch with alternatives:    (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>    (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]   cannot be applied to (org.apache.spark.sql.DataFrame => org.apache.spark.sql.DataFrame)    .foreachBatch(transformStream _)     

Can someone suggest what I am doing wrong here.

https://stackoverflow.com/questions/67223053/how-to-call-a-method-in-the-foreachbatch-during-writestream-in-spark April 23, 2021 at 10:06AM

没有评论:

发表评论