I have to join two spark data-frames in Scala based on a custom function. Both data-frames have the same schema.
Sample Row of data in DF1:
{ "F1" : "A", "F2" : "B", "F3" : "C", "F4" : [ { "name" : "N1", "unit" : "none", "count" : 50.0, "sf1" : "val_1", "sf2" : "val_2" }, { "name" : "N2", "unit" : "none", "count" : 100.0, "sf1" : "val_3", "sf2" : "val_4" } ] } Sample Row of data in DF2:
{ "F1" : "A", "F2" : "B", "F3" : "C", "F4" : [ { "name" : "N1", "unit" : "none", "count" : 80.0, "sf1" : "val_5", "sf2" : "val_6" }, { "name" : "N2", "unit" : "none", "count" : 90.0, "sf1" : "val_7", "sf2" : "val_8" }, { "name" : "N3", "unit" : "none", "count" : 99.0, "sf1" : "val_9", "sf2" : "val_10" } ] } RESULT of Joining these sample rows:
{ "F1" : "A", "F2" : "B", "F3" : "C", "F4" : [ { "name" : "N1", "unit" : "none", "count" : 80.0, "sf1" : "val_5", "sf2" : "val_6" }, { "name" : "N2", "unit" : "none", "count" : 100.0, "sf1" : "val_3", "sf2" : "val_4" }, { "name" : "N3", "unit" : "none", "count" : 99.0, "sf1" : "val_9", "sf2" : "val_10" } ] } The result is:
- full-outer-join based on value of "F1", "F2" and "F3" +
- join of "F4" keeping unique nodes(use name as id) with max value of "count"
I am not very familiar with Scala and have been struggling with this for more than a day now. Here is what I have gotten to so far:
val df1 = sqlContext.read.parquet("stack_a.parquet") val df2 = sqlContext.read.parquet("stack_b.parquet") val df4 = df1.toDF(df1.columns.map(_ + "_A"):_*) val df5 = df2.toDF(df1.columns.map(_ + "_B"):_*) val df6 = df4.join(df5, df4("F1_A") === df5("F1_B") && df4("F2_A") === df5("F2_B") && df4("F3_A") === df5("F3_B"), "outer") def joinFunction(r:Row) = { //Need the real-deal here! //print(r(3)) //-->Any = WrappedArray([..]) //also considering parsing as json to do the processing but not sure about the performance impact //val parsed = JSON.parseFull(r.json) //then play with parsed r.toSeq // } val finalResult = df6.rdd.map(joinFunction) finalResult.collect I was planning to add the custom merge logic in joinFunction but I am struggling to convert the WrappedArray/Any class to something I can work with. Any inputs on how to do the conversion or the join in a better way will be very helpful.
Thanks!
Edit (7 Mar, 2021)
The full-outer join actually has to be performed only on "F1". Hence, using @werner's answer, I am doing:
val df1_a = df1.toDF(df1.columns.map(_ + "_A"):_*) val df2_b = df2.toDF(df2.columns.map(_ + "_B"):_*) val finalResult = df1_a.join(df2_b, df1_a("F1_A") === df2_b("F1_B"), "full_outer") .drop("F1_B") .withColumn("F4", joinFunction(col("F4_A"), col("F4_B"))) .drop("F4_A", "F4_B") .withColumn("F2", when(col("F2_A").isNull, col("F2_B")).otherwise(col("F2_A"))) .drop("F2_A", "F2_B") .withColumn("F3", when(col("F3_A").isNull, col("F3_B")).otherwise(col("F3_A"))) .drop("F3_A", "F3_B") But I am getting this error. What am I missing..?
https://stackoverflow.com/questions/66462993/join-data-frame-based-on-value-in-list-of-wrappedarray March 04, 2021 at 02:41AM
没有评论:
发表评论