[SOLVED] suggestion for tune up the code which contains explode and groupby

Issue

I wrote the code for below probelem but it has below problems. Please suggest me if some tuning can be done.

  1. It takes more time I think.
  2. there are 3 brands as of now. It is hardcoded. If more brands would be added, i need to add the code manually.

input dataframe schema :

root
 |-- id: string (nullable = true)
 |-- attrib: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- pref: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pref_type: string (nullable = true)
 |    |    |-- brand: string (nullable = true)
 |    |    |-- tp_id: string (nullable = true)
 |    |    |-- aff: float (nullable = true)
 |    |    |-- pre_id: string (nullable = true)
 |    |    |-- cr_date: string (nullable = true)
 |    |    |-- up_date: string (nullable = true)
 |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

expected output schema:

root
 |-- id: string (nullable = true)
 |-- attrib: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- pref: struct (nullable = false)
 |    |-- brandA: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- pref_type: string (nullable = true)
 |    |    |    |-- tp_id: string (nullable = true)
 |    |    |    |-- aff: float (nullable = true)
 |    |    |    |-- pref_id: string (nullable = true)
 |    |    |    |-- cr_date: string (nullable = true)
 |    |    |    |-- up_date: string (nullable = true)
 |    |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)
 |    |-- brandB: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- pref_type: string (nullable = true)
 |    |    |    |-- tp_id: string (nullable = true)
 |    |    |    |-- aff: float (nullable = true)
 |    |    |    |-- pref_id: string (nullable = true)
 |    |    |    |-- cr_date: string (nullable = true)
 |    |    |    |-- up_date: string (nullable = true)
 |    |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)
 |    |-- brandC: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- pref_type: string (nullable = true)
 |    |    |    |-- tp_id: string (nullable = true)
 |    |    |    |-- aff: float (nullable = true)
 |    |    |    |-- pref_id: string (nullable = true)
 |    |    |    |-- cr_date: string (nullable = true)
 |    |    |    |-- up_date: string (nullable = true)
 |    |    |    |-- pref_attrib: map (nullable = true)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)

The processing can be done based on the brand attribute under preferences(preferences.brand)

I have written the below code for that:

def modifyBrands(inputDf: DataFrame): DataFrame ={
    val PreferenceProps = Array("pref_type", "tp_id", "aff", "pref_id", "cr_date", "up_date", "pref_attrib")
    import org.apache.spark.sql.functions._
    val explodedDf = inputDf.select(col("id"), explode(col("pref")))
        .select(
            col("id"),
            col("col.pref_type"),
            col("col.brand"),
            col("col.tp_id"),
            col("col.aff"),
            col("col.pre_id"),
            col("col.cr_dt"),
            col("col.up_dt"),
            col("col.pref_attrib")
        ).cache()

    val brandAddedDf = explodedDf
        .withColumn("brandA", when(col("brand") === "brandA", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandA"))
        .withColumn("brandB", when(col("brand") === "brandB", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandB"))
        .withColumn("brandC", when(col("brand") === "brandC", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandC"))
        .cache()

    explodedDf.unpersist()

    val groupedDf = brandAddedDf.groupBy("id").agg(
        collect_list("brandA").alias("brandA"),
        collect_list("brandB").alias("brandB"),
        collect_list("brandC").alias("brandC")
    ).withColumn("preferences", struct(
        when(size(col("brandA")).notEqual(0), col("brandA")).alias("brandA"),
        when(size(col("brandB")).notEqual(0), col("brandB")).alias("brandB"),
        when(size(col("brandC")).notEqual(0), col("brandC")).alias("brandC"),
    )).drop("brandA", "brandB", "brandC")
      .cache()
    brandAddedDf.unpersist()

    val idAttributesDf = inputDf.select("id", "attrib").cache()

    val joinedDf = idAttributesDf.join(groupedDf, "id")
    groupedDf.unpersist()
    idAttributesDf.unpersist()
    joinedDf.printSchema()
    joinedDf // returning joined df which will be wrote as paquet file.
  }

Solution

You can simplify your code using higher-order function filter on arrays. Just map through brand names and for-each one return a filtered array from pref. This way you avoid the exploding / grouping part.

Here’s a complete example:

val data = """{"id":1,"attrib":{"key":"k","value":"v"},"pref":[{"pref_type":"type1","brand":"brandA","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandB","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandC","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}}]}"""
val inputDf = spark.read.json(Seq(data).toDS)

val brands = Seq("brandA", "brandB", "brandC")

// or getting them from input dataframe
// val brands = inputDf.select("pref.brand").as[Seq[String]].collect.flatten


val brandAddedDf = inputDf.withColumn(
  "pref",
  struct(brands.map(b => expr(s"filter(pref, x -> x.brand = '$b')").as(b)): _*)
)

brandAddedDf.printSchema
//root
// |-- attrib: struct (nullable = true)
// |    |-- key: string (nullable = true)
// |    |-- value: string (nullable = true)
// |-- id: long (nullable = true)
// |-- pref: struct (nullable = false)
// |    |-- brandA: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)
// |    |-- brandB: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)
// |    |-- brandC: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)

Answered By – blackbishop

Answer Checked By – David Goodson (BugsFixing Volunteer)

Leave a Reply

Your email address will not be published. Required fields are marked *