2021年1月17日星期日

Not able to use FLASK API with pyspark "SparkException: Only one SparkContext may be running in this JVM" [closed]

Is it possible to use Flask API with pyspark? I was using it in a python script earlier but now due to change in requirements, I require spark for processing thus tried using pyspark, but it ain't working.

Requirement : I'll pass arguments via flask API to script where I'll read data from hadoop and filter data based on these arguments.

CODE :

from pyspark.sql.types import *  from pyspark.sql import functions as F  from pyspark.sql.functions import *    from pyspark.sql import SparkSession    from pyspark import SQLContext, StorageLevel    from pyspark.sql.types import StructField, StructType, StringType, IntegerType  from ipaddress import ip_address, IPv4Address,IPv6Address    from flask import Flask, jsonify, request  import pandas as pd  import datetime  from pmdarima.arima import auto_arima    spark = SparkSession.builder.appName("IFT").getOrCreate()  sc=spark.sparkContext    app = Flask(__name__)    loc_map = pd.read_csv("/data/IFT/LocationFile.csv")  loc_map.drop("LatLong",axis=1,inplace=True)  loc_map["Pincode"] = loc_map["Pincode"].astype('float64')      data = spark.read.option("header",True).csv("IFT/clogs_crm_devices/par*")  #some processing on data here based on api inputs    @app.route('/ift/', methods = ['GET'])  def ift_api():       return "completed"    if __name__ == '__main__':      app.run(host="0.0.0.0", port=7036, debug=True)    

ERROR TRACE :

21/01/17 20:33:14 INFO YarnScheduler: Adding task set 2.0 with 1 tasks  21/01/17 20:33:14 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 400, NGPRDJADBS87, executor 1, partition 0, RACK_LOCAL, 8383 bytes)  21/01/17 20:33:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on NGPRDJADBS87:33000 (size: 4.4 KB, free: 366.2 MB)  21/01/17 20:33:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on NGPRDJADBS87:33000 (size: 33.2 KB, free: 366.2 MB)  21/01/17 20:33:14 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 400) in 505 ms on NGPRDJADBS87 (executor 1) (1/1)  21/01/17 20:33:14 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool  21/01/17 20:33:14 INFO DAGScheduler: ResultStage 2 (csv at NativeMethodAccessorImpl.java:0) finished in 0.583 s  21/01/17 20:33:14 INFO DAGScheduler: Job 2 finished: csv at NativeMethodAccessorImpl.java:0, took 0.590400 s  21/01/17 20:33:14 INFO FileSourceStrategy: Pruning directories with:  21/01/17 20:33:14 INFO FileSourceStrategy: Post-Scan Filters:  21/01/17 20:33:14 INFO FileSourceStrategy: Output Data Schema: struct<value: string>  21/01/17 20:33:14 INFO FileSourceScanExec: Pushed Filters:  21/01/17 20:33:14 INFO CodeGenerator: Code generated in 7.065522 ms  21/01/17 20:33:14 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 387.1 KB, free 365.2 MB)  21/01/17 20:33:14 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 33.2 KB, free 365.2 MB)  21/01/17 20:33:14 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on NGPRDJADBS98:35000 (size: 33.2 KB, free: 366.2 MB)  21/01/17 20:33:14 INFO SparkContext: Created broadcast 4 from csv at NativeMethodAccessorImpl.java:0  21/01/17 20:33:14 INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, open cost is considered as scanning 4194304 bytes.  * Serving Flask app "test" (lazy loading)  * Environment: production     WARNING: This is a development server. Do not use it in a production deployment.     Use a production WSGI server instead.  * Debug mode: on  * Running on http://0.0.0.0:7036/ (Press CTRL+C to quit)  * Restarting with stat  Traceback (most recent call last):    File "/data/IFT/test.py", line 17, in <module>      spark = SparkSession.builder.appName("IFT").getOrCreate()    File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate    File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 358, in getOrCreate    File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 119, in __init__    File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 181, in _do_init    File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 297, in _initialize_context    File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__    File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value  py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.  : org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:  org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)  sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)  sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)  sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)  java.lang.reflect.Constructor.newInstance(Constructor.java:423)  py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)  py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)  py4j.Gateway.invoke(Gateway.java:238)  py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)  py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)  py4j.GatewayConnection.run(GatewayConnection.java:238)  java.lang.Thread.run(Thread.java:748)          at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2461)          at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2457)          at scala.Option.foreach(Option.scala:257)          at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2457)          at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2546)          at org.apache.spark.SparkContext.<init>(SparkContext.scala:84)          at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)          at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)          at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)          at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)          at java.lang.reflect.Constructor.newInstance(Constructor.java:423)          at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)          at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)          at py4j.Gateway.invoke(Gateway.java:238)          at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)          at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)          at py4j.GatewayConnection.run(GatewayConnection.java:238)          at java.lang.Thread.run(Thread.java:748)    21/01/17 20:33:17 INFO SparkContext: Invoking stop() from shutdown hook  21/01/17 20:33:17 INFO AbstractConnector: Stopped Spark@269dbaed{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}  21/01/17 20:33:17 INFO SparkUI: Stopped Spark web UI at http://NGPRDJADBS98:4040  21/01/17 20:33:17 INFO YarnClientSchedulerBackend: Interrupting monitor thread  21/01/17 20:33:17 INFO YarnClientSchedulerBackend: Shutting down all executors  21/01/17 20:33:17 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down  21/01/17 20:33:17 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices  (serviceOption=None,  services=List(),  started=false)  21/01/17 20:33:17 INFO YarnClientSchedulerBackend: Stopped  21/01/17 20:33:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!  21/01/17 20:33:17 INFO MemoryStore: MemoryStore cleared  21/01/17 20:33:17 INFO BlockManager: BlockManager stopped  21/01/17 20:33:17 INFO BlockManagerMaster: BlockManagerMaster stopped  21/01/17 20:33:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!  21/01/17 20:33:17 INFO SparkContext: Successfully stopped SparkContext  21/01/17 20:33:17 INFO ShutdownHookManager: Shutdown hook called  21/01/17 20:33:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-92109e8e-6395-4e6a-9ae9-fc49e358d945  21/01/17 20:33:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8fbd717-6341-43a9-aaa7-fd2cfa16510e  21/01/17 20:33:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-92109e8e-6395-4e6a-9ae9-fc49e358d945/pyspark-eec83e7a-9f89-45a6-8c67-56a883edec1b  

What could be the possible solution to this? Is there any issue with spark-env.sh file?

https://stackoverflow.com/questions/65762054/not-able-to-use-flask-api-with-pyspark-sparkexception-only-one-sparkcontext-ma January 17, 2021 at 10:48PM

没有评论:

发表评论