2021年3月6日星期六

Join data-frame based on value in list of WrappedArray

I have to join two spark data-frames in Scala based on a custom function. Both data-frames have the same schema.

Sample Row of data in DF1:

{    "F1" : "A",    "F2" : "B",    "F3" : "C",    "F4" : [                 {                  "name" : "N1",                  "unit" : "none",                  "count" : 50.0,                  "sf1" : "val_1",                  "sf2" : "val_2"                },                {                  "name" : "N2",                  "unit" : "none",                  "count" : 100.0,                  "sf1" : "val_3",                  "sf2" : "val_4"                }           ]  }  

Sample Row of data in DF2:

{    "F1" : "A",    "F2" : "B",    "F3" : "C",    "F4" : [                 {                  "name" : "N1",                  "unit" : "none",                  "count" : 80.0,                  "sf1" : "val_5",                  "sf2" : "val_6"                },                {                  "name" : "N2",                  "unit" : "none",                  "count" : 90.0,                  "sf1" : "val_7",                  "sf2" : "val_8"                },                {                  "name" : "N3",                  "unit" : "none",                  "count" : 99.0,                  "sf1" : "val_9",                  "sf2" : "val_10"                }           ]  }  

RESULT of Joining these sample rows:

{    "F1" : "A",    "F2" : "B",    "F3" : "C",    "F4" : [                 {                  "name" : "N1",                  "unit" : "none",                  "count" : 80.0,                  "sf1" : "val_5",                  "sf2" : "val_6"                },                {                  "name" : "N2",                  "unit" : "none",                  "count" : 100.0,                  "sf1" : "val_3",                  "sf2" : "val_4"                },                {                  "name" : "N3",                  "unit" : "none",                  "count" : 99.0,                  "sf1" : "val_9",                  "sf2" : "val_10"                }           ]  }  

The result is:

  • full-outer-join based on value of "F1", "F2" and "F3" +
  • join of "F4" keeping unique nodes(use name as id) with max value of "count"

I am not very familiar with Scala and have been struggling with this for more than a day now. Here is what I have gotten to so far:

val df1 = sqlContext.read.parquet("stack_a.parquet")  val df2 = sqlContext.read.parquet("stack_b.parquet")    val df4 = df1.toDF(df1.columns.map(_ + "_A"):_*)  val df5 = df2.toDF(df1.columns.map(_ + "_B"):_*)  val df6 = df4.join(df5, df4("F1_A") === df5("F1_B") && df4("F2_A") === df5("F2_B") && df4("F3_A") === df5("F3_B"), "outer")    def joinFunction(r:Row) = {     //Need the real-deal here!     //print(r(3)) //-->Any = WrappedArray([..])          //also considering parsing as json to do the processing but not sure about the performance impact     //val parsed = JSON.parseFull(r.json) //then play with parsed       r.toSeq //  }  val finalResult = df6.rdd.map(joinFunction)  finalResult.collect  

I was planning to add the custom merge logic in joinFunction but I am struggling to convert the WrappedArray/Any class to something I can work with. Any inputs on how to do the conversion or the join in a better way will be very helpful.

Thanks!

Edit (7 Mar, 2021)

The full-outer join actually has to be performed only on "F1". Hence, using @werner's answer, I am doing:

val df1_a = df1.toDF(df1.columns.map(_ + "_A"):_*)  val df2_b = df2.toDF(df2.columns.map(_ + "_B"):_*)    val finalResult = df1_a.join(df2_b, df1_a("F1_A") === df2_b("F1_B"), "full_outer")  .drop("F1_B")  .withColumn("F4", joinFunction(col("F4_A"), col("F4_B")))  .drop("F4_A", "F4_B")  .withColumn("F2", when(col("F2_A").isNull, col("F2_B")).otherwise(col("F2_A")))  .drop("F2_A", "F2_B")  .withColumn("F3", when(col("F3_A").isNull, col("F3_B")).otherwise(col("F3_A")))  .drop("F3_A", "F3_B")  

But I am getting this error. What am I missing..?

https://stackoverflow.com/questions/66462993/join-data-frame-based-on-value-in-list-of-wrappedarray March 04, 2021 at 02:41AM

没有评论:

发表评论