2021年4月8日星期四

How to force execute the operations on pyspark dataframe?

I am doing certain trasnformation on pyspark dataframe and then after the trasnformation I am converting the dataframe to pandas dataframe.

The transformation operation in itself takes 4 hours to run when I run df.show() but when I convert the dataframe to Pandas (df.toPandas()) after the operation. The memory crashes.

What I need is a way to calculate the operation and save the result as a calculated pyspark dataframe. So that using toPandas operation after the operation works withotut crashing the memory.

So was wondering if we can force evaluate and save the results of pyspark dataframe before converting to pandas df?

So what I am doing is:

@udf("float")  def some_operation(col1):      .....      .....      return some_float_value    df = df.withColumn("col2", some_operation(F.col("col1")))  df.show()  <Takes 4 hours>  Outputs:  >      +-----+-----+      |col1 |col2|       +-----++-----      | 1   | 0.1 |       | 2   | 0.2 |       | 3   | 0.4 |       | 4   | 0.7 |       | ..  | ... |       |10000| 0.4 |       +-----+-----+  

But when I run :

@udf("float")  def some_operation(channel):      .....      .....      return some_float_value    df = df.withColumn("col2", some_operation(F.col("col1")))  df_pandas = df.select("*").toPandas()    <Crashes with error>    /databricks/spark/python/pyspark/sql/pandas/conversion.py:134: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.    An error occurred while calling o337.getResult.  : org.apache.spark.SparkException: Exception thrown in awaitResult:       at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431)      at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)      at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)      at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source)      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)      at java.lang.reflect.Method.invoke(Method.java:498)      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)      at py4j.Gateway.invoke(Gateway.java:295)      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)      at py4j.commands.CallCommand.execute(CallCommand.java:79)      at py4j.GatewayConnection.run(GatewayConnection.java:251)      at java.lang.Thread.run(Thread.java:748)  Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 386 in stage 1226.0 failed 4 times, most recent failure: Lost task 386.6 in stage 1226.0 (TID 162556, 10.0.1.13, executor 603): java.net.SocketException: Socket is closed      at java.net.Socket.getInputStream(Socket.java:921)      at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:195)      at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:76)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)      at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)      at org.apache.spark.scheduler.Task.run(Task.scala:117)      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)      at java.lang.Thread.run(Thread.java:748)    Driver stacktrace:      at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)      at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)      at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)      at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)      at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)      at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)      at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)      at scala.Option.foreach(Option.scala:407)      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402)      at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631)      at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)      at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635)      at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612)      at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689)      at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)      at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)      at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)      at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)      at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)      at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687)      at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612)      at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611)      at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144)      at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)      at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146)      at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141)      at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:115)      at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:108)      at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$1(SocketAuthServer.scala:62)      at scala.util.Try$.apply(Try.scala:213)      at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:62)  Caused by: java.net.SocketException: Socket is closed      at java.net.Socket.getInputStream(Socket.java:921)      at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:195)      at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:76)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)      at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)      at org.apache.spark.scheduler.Task.run(Task.scala:117)      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)      at java.lang.Thread.run(Thread.java:748)      warnings.warn(msg)  
https://stackoverflow.com/questions/67014475/how-to-force-execute-the-operations-on-pyspark-dataframe April 09, 2021 at 10:57AM

没有评论:

发表评论