I am doing certain trasnformation on pyspark dataframe and then after the trasnformation I am converting the dataframe to pandas dataframe.
The transformation operation in itself takes 4 hours to run when I run df.show()
but when I convert the dataframe to Pandas (df.toPandas()) after the operation. The memory crashes.
What I need is a way to calculate the operation and save the result as a calculated pyspark dataframe. So that using toPandas operation after the operation works withotut crashing the memory.
So was wondering if we can force evaluate and save the results of pyspark dataframe before converting to pandas df?
So what I am doing is:
@udf("float") def some_operation(col1): ..... ..... return some_float_value df = df.withColumn("col2", some_operation(F.col("col1"))) df.show() <Takes 4 hours> Outputs: > +-----+-----+ |col1 |col2| +-----++----- | 1 | 0.1 | | 2 | 0.2 | | 3 | 0.4 | | 4 | 0.7 | | .. | ... | |10000| 0.4 | +-----+-----+
But when I run :
@udf("float") def some_operation(channel): ..... ..... return some_float_value df = df.withColumn("col2", some_operation(F.col("col1"))) df_pandas = df.select("*").toPandas() <Crashes with error> /databricks/spark/python/pyspark/sql/pandas/conversion.py:134: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation. An error occurred while calling o337.getResult. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431) at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98) at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94) at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 386 in stage 1226.0 failed 4 times, most recent failure: Lost task 386.6 in stage 1226.0 (TID 162556, 10.0.1.13, executor 603): java.net.SocketException: Socket is closed at java.net.Socket.getInputStream(Socket.java:921) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:195) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:76) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at org.apache.spark.scheduler.Task.run(Task.scala:117) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402) at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559) at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635) at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687) at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612) at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611) at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559) at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146) at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141) at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:115) at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:108) at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$1(SocketAuthServer.scala:62) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:62) Caused by: java.net.SocketException: Socket is closed at java.net.Socket.getInputStream(Socket.java:921) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:195) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:76) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356) at org.apache.spark.rdd.RDD.iterator(RDD.scala:320) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at org.apache.spark.scheduler.Task.run(Task.scala:117) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) warnings.warn(msg)
https://stackoverflow.com/questions/67014475/how-to-force-execute-the-operations-on-pyspark-dataframe April 09, 2021 at 10:57AM
没有评论:
发表评论