Is it possible to use Flask API with pyspark? I was using it in a python script earlier but now due to change in requirements, I require spark for processing thus tried using pyspark, but it ain't working.
Requirement : I'll pass arguments via flask API to script where I'll read data from hadoop and filter data based on these arguments.
CODE :
from pyspark.sql.types import * from pyspark.sql import functions as F from pyspark.sql.functions import * from pyspark.sql import SparkSession from pyspark import SQLContext, StorageLevel from pyspark.sql.types import StructField, StructType, StringType, IntegerType from ipaddress import ip_address, IPv4Address,IPv6Address from flask import Flask, jsonify, request import pandas as pd import datetime from pmdarima.arima import auto_arima spark = SparkSession.builder.appName("IFT").getOrCreate() sc=spark.sparkContext app = Flask(__name__) loc_map = pd.read_csv("/data/IFT/LocationFile.csv") loc_map.drop("LatLong",axis=1,inplace=True) loc_map["Pincode"] = loc_map["Pincode"].astype('float64') data = spark.read.option("header",True).csv("IFT/clogs_crm_devices/par*") #some processing on data here based on api inputs @app.route('/ift/', methods = ['GET']) def ift_api(): return "completed" if __name__ == '__main__': app.run(host="0.0.0.0", port=7036, debug=True)
ERROR TRACE :
21/01/17 20:33:14 INFO YarnScheduler: Adding task set 2.0 with 1 tasks 21/01/17 20:33:14 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 400, NGPRDJADBS87, executor 1, partition 0, RACK_LOCAL, 8383 bytes) 21/01/17 20:33:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on NGPRDJADBS87:33000 (size: 4.4 KB, free: 366.2 MB) 21/01/17 20:33:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on NGPRDJADBS87:33000 (size: 33.2 KB, free: 366.2 MB) 21/01/17 20:33:14 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 400) in 505 ms on NGPRDJADBS87 (executor 1) (1/1) 21/01/17 20:33:14 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 21/01/17 20:33:14 INFO DAGScheduler: ResultStage 2 (csv at NativeMethodAccessorImpl.java:0) finished in 0.583 s 21/01/17 20:33:14 INFO DAGScheduler: Job 2 finished: csv at NativeMethodAccessorImpl.java:0, took 0.590400 s 21/01/17 20:33:14 INFO FileSourceStrategy: Pruning directories with: 21/01/17 20:33:14 INFO FileSourceStrategy: Post-Scan Filters: 21/01/17 20:33:14 INFO FileSourceStrategy: Output Data Schema: struct<value: string> 21/01/17 20:33:14 INFO FileSourceScanExec: Pushed Filters: 21/01/17 20:33:14 INFO CodeGenerator: Code generated in 7.065522 ms 21/01/17 20:33:14 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 387.1 KB, free 365.2 MB) 21/01/17 20:33:14 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 33.2 KB, free 365.2 MB) 21/01/17 20:33:14 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on NGPRDJADBS98:35000 (size: 33.2 KB, free: 366.2 MB) 21/01/17 20:33:14 INFO SparkContext: Created broadcast 4 from csv at NativeMethodAccessorImpl.java:0 21/01/17 20:33:14 INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, open cost is considered as scanning 4194304 bytes. * Serving Flask app "test" (lazy loading) * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: on * Running on http://0.0.0.0:7036/ (Press CTRL+C to quit) * Restarting with stat Traceback (most recent call last): File "/data/IFT/test.py", line 17, in <module> spark = SparkSession.builder.appName("IFT").getOrCreate() File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 358, in getOrCreate File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 119, in __init__ File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 181, in _do_init File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 297, in _initialize_context File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__ File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:423) py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) py4j.Gateway.invoke(Gateway.java:238) py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) py4j.GatewayConnection.run(GatewayConnection.java:238) java.lang.Thread.run(Thread.java:748) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2461) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2457) at scala.Option.foreach(Option.scala:257) at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2457) at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2546) at org.apache.spark.SparkContext.<init>(SparkContext.scala:84) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) 21/01/17 20:33:17 INFO SparkContext: Invoking stop() from shutdown hook 21/01/17 20:33:17 INFO AbstractConnector: Stopped Spark@269dbaed{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 21/01/17 20:33:17 INFO SparkUI: Stopped Spark web UI at http://NGPRDJADBS98:4040 21/01/17 20:33:17 INFO YarnClientSchedulerBackend: Interrupting monitor thread 21/01/17 20:33:17 INFO YarnClientSchedulerBackend: Shutting down all executors 21/01/17 20:33:17 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down 21/01/17 20:33:17 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 21/01/17 20:33:17 INFO YarnClientSchedulerBackend: Stopped 21/01/17 20:33:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 21/01/17 20:33:17 INFO MemoryStore: MemoryStore cleared 21/01/17 20:33:17 INFO BlockManager: BlockManager stopped 21/01/17 20:33:17 INFO BlockManagerMaster: BlockManagerMaster stopped 21/01/17 20:33:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 21/01/17 20:33:17 INFO SparkContext: Successfully stopped SparkContext 21/01/17 20:33:17 INFO ShutdownHookManager: Shutdown hook called 21/01/17 20:33:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-92109e8e-6395-4e6a-9ae9-fc49e358d945 21/01/17 20:33:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8fbd717-6341-43a9-aaa7-fd2cfa16510e 21/01/17 20:33:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-92109e8e-6395-4e6a-9ae9-fc49e358d945/pyspark-eec83e7a-9f89-45a6-8c67-56a883edec1b
What could be the possible solution to this? Is there any issue with spark-env.sh file?
https://stackoverflow.com/questions/65762054/not-able-to-use-flask-api-with-pyspark-sparkexception-only-one-sparkcontext-ma January 17, 2021 at 10:48PM
没有评论:
发表评论