Issue
I wrote the code for below probelem but it has below problems. Please suggest me if some tuning can be done.
- It takes more time I think.
- there are 3 brands as of now. It is hardcoded. If more brands would be added, i need to add the code manually.
input dataframe schema :
root
|-- id: string (nullable = true)
|-- attrib: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- pref: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- pref_type: string (nullable = true)
| | |-- brand: string (nullable = true)
| | |-- tp_id: string (nullable = true)
| | |-- aff: float (nullable = true)
| | |-- pre_id: string (nullable = true)
| | |-- cr_date: string (nullable = true)
| | |-- up_date: string (nullable = true)
| | |-- pref_attrib: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
expected output schema:
root
|-- id: string (nullable = true)
|-- attrib: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- pref: struct (nullable = false)
| |-- brandA: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- pref_type: string (nullable = true)
| | | |-- tp_id: string (nullable = true)
| | | |-- aff: float (nullable = true)
| | | |-- pref_id: string (nullable = true)
| | | |-- cr_date: string (nullable = true)
| | | |-- up_date: string (nullable = true)
| | | |-- pref_attrib: map (nullable = true)
| | | | |-- key: string
| | | | |-- value: string (valueContainsNull = true)
| |-- brandB: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- pref_type: string (nullable = true)
| | | |-- tp_id: string (nullable = true)
| | | |-- aff: float (nullable = true)
| | | |-- pref_id: string (nullable = true)
| | | |-- cr_date: string (nullable = true)
| | | |-- up_date: string (nullable = true)
| | | |-- pref_attrib: map (nullable = true)
| | | | |-- key: string
| | | | |-- value: string (valueContainsNull = true)
| |-- brandC: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- pref_type: string (nullable = true)
| | | |-- tp_id: string (nullable = true)
| | | |-- aff: float (nullable = true)
| | | |-- pref_id: string (nullable = true)
| | | |-- cr_date: string (nullable = true)
| | | |-- up_date: string (nullable = true)
| | | |-- pref_attrib: map (nullable = true)
| | | | |-- key: string
| | | | |-- value: string (valueContainsNull = true)
The processing can be done based on the brand attribute under preferences(preferences.brand
)
I have written the below code for that:
def modifyBrands(inputDf: DataFrame): DataFrame ={
val PreferenceProps = Array("pref_type", "tp_id", "aff", "pref_id", "cr_date", "up_date", "pref_attrib")
import org.apache.spark.sql.functions._
val explodedDf = inputDf.select(col("id"), explode(col("pref")))
.select(
col("id"),
col("col.pref_type"),
col("col.brand"),
col("col.tp_id"),
col("col.aff"),
col("col.pre_id"),
col("col.cr_dt"),
col("col.up_dt"),
col("col.pref_attrib")
).cache()
val brandAddedDf = explodedDf
.withColumn("brandA", when(col("brand") === "brandA", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandA"))
.withColumn("brandB", when(col("brand") === "brandB", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandB"))
.withColumn("brandC", when(col("brand") === "brandC", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandC"))
.cache()
explodedDf.unpersist()
val groupedDf = brandAddedDf.groupBy("id").agg(
collect_list("brandA").alias("brandA"),
collect_list("brandB").alias("brandB"),
collect_list("brandC").alias("brandC")
).withColumn("preferences", struct(
when(size(col("brandA")).notEqual(0), col("brandA")).alias("brandA"),
when(size(col("brandB")).notEqual(0), col("brandB")).alias("brandB"),
when(size(col("brandC")).notEqual(0), col("brandC")).alias("brandC"),
)).drop("brandA", "brandB", "brandC")
.cache()
brandAddedDf.unpersist()
val idAttributesDf = inputDf.select("id", "attrib").cache()
val joinedDf = idAttributesDf.join(groupedDf, "id")
groupedDf.unpersist()
idAttributesDf.unpersist()
joinedDf.printSchema()
joinedDf // returning joined df which will be wrote as paquet file.
}
Solution
You can simplify your code using higher-order function filter
on arrays. Just map through brand names and for-each one return a filtered array from pref
. This way you avoid the exploding / grouping part.
Here’s a complete example:
val data = """{"id":1,"attrib":{"key":"k","value":"v"},"pref":[{"pref_type":"type1","brand":"brandA","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandB","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandC","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}}]}"""
val inputDf = spark.read.json(Seq(data).toDS)
val brands = Seq("brandA", "brandB", "brandC")
// or getting them from input dataframe
// val brands = inputDf.select("pref.brand").as[Seq[String]].collect.flatten
val brandAddedDf = inputDf.withColumn(
"pref",
struct(brands.map(b => expr(s"filter(pref, x -> x.brand = '$b')").as(b)): _*)
)
brandAddedDf.printSchema
//root
// |-- attrib: struct (nullable = true)
// | |-- key: string (nullable = true)
// | |-- value: string (nullable = true)
// |-- id: long (nullable = true)
// |-- pref: struct (nullable = false)
// | |-- brandA: array (nullable = true)
// | | |-- element: struct (containsNull = true)
// | | | |-- aff: string (nullable = true)
// | | | |-- brand: string (nullable = true)
// | | | |-- cr_date: string (nullable = true)
// | | | |-- pre_id: string (nullable = true)
// | | | |-- pref_attrib: struct (nullable = true)
// | | | | |-- key: string (nullable = true)
// | | | | |-- value: string (nullable = true)
// | | | |-- pref_type: string (nullable = true)
// | | | |-- tp_id: string (nullable = true)
// | | | |-- up_date: string (nullable = true)
// | |-- brandB: array (nullable = true)
// | | |-- element: struct (containsNull = true)
// | | | |-- aff: string (nullable = true)
// | | | |-- brand: string (nullable = true)
// | | | |-- cr_date: string (nullable = true)
// | | | |-- pre_id: string (nullable = true)
// | | | |-- pref_attrib: struct (nullable = true)
// | | | | |-- key: string (nullable = true)
// | | | | |-- value: string (nullable = true)
// | | | |-- pref_type: string (nullable = true)
// | | | |-- tp_id: string (nullable = true)
// | | | |-- up_date: string (nullable = true)
// | |-- brandC: array (nullable = true)
// | | |-- element: struct (containsNull = true)
// | | | |-- aff: string (nullable = true)
// | | | |-- brand: string (nullable = true)
// | | | |-- cr_date: string (nullable = true)
// | | | |-- pre_id: string (nullable = true)
// | | | |-- pref_attrib: struct (nullable = true)
// | | | | |-- key: string (nullable = true)
// | | | | |-- value: string (nullable = true)
// | | | |-- pref_type: string (nullable = true)
// | | | |-- tp_id: string (nullable = true)
// | | | |-- up_date: string (nullable = true)
Answered By – blackbishop
Answer Checked By – David Goodson (BugsFixing Volunteer)