I am trying to call a method using foreachBatch while doing writeStream. Here is the sample.
import org.apache.spark.sql.functions._ import org.apache.spark.sql.DataFrame def transformStream(inputDf: DataFrame): DataFrame = { inputDf.withColumn("S_TIMESTAMP",to_timestamp(col("timestamp")/1000)) .withColumn("S_DATE",to_date(col("S_TIMESTAMP"),"yyyy-MM-dd")) .withColumn("FILE_NAME", input_file_name()) .withColumn("PROCESS_DATE", current_timestamp()) } // Writing the stream readstreamDf .writeStream .format("delta") .foreachBatch(transformStream _) .option("checkpointLocation", checkPoint) .option("path", datapath) .outputMode("append") .partitionBy("S_DATE") .start() // Write stream throwing error error: overloaded method value foreachBatch with alternatives: (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and> (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to (org.apache.spark.sql.DataFrame => org.apache.spark.sql.DataFrame) .foreachBatch(transformStream _) Can someone suggest what I am doing wrong here.
https://stackoverflow.com/questions/67223053/how-to-call-a-method-in-the-foreachbatch-during-writestream-in-spark April 23, 2021 at 10:06AM
没有评论:
发表评论