Skip to content Skip to sidebar Skip to footer

Pyspark Merge Multiple Columns Into A Json Column

I asked the question a while back for python, but now I need to do the same thing in PySpark. I have a dataframe (df) like so: |cust_id|address |store_id|email |sales_cha

Solution 1:

Use to_json function to create json object!

Example:

from pyspark.sql.functions import *

#sample data
df=spark.createDataFrame([('1234567','123 Main St','10SjtT','idk@gmail.com','ecom','direct')],['cust_id','address','store_id','email','sales_channel','category'])

df.select("cust_id","address",to_json(struct("store_id","category","sales_channel","email")).alias("metadata")).show(10,False)

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","category":"direct","sales_channel":"ecom","email":"idk@gmail.com"}|
+-------+-----------+----------------------------------------------------------------------------------------+

to_json by passing list of columns:

ll=['store_id','email','sales_channel','category']

df.withColumn("metadata", to_json(struct([x for x in ll]))).drop(*ll).show()

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","email":"idk@gmail.com","sales_channel":"ecom","category":"direct"}|
+-------+-----------+----------------------------------------------------------------------------------------+

Solution 2:

@Shu gives a good answer, here's a variant that works out slightly better for my use case. I'm going from Kafka -> Spark -> Kafka and this one liner does exactly what I want. The struct(*) will pack up all the fields in the dataframe.

# Packup the fields in preparation for sending to Kafka sink
kafka_df = df.selectExpr('cast(id as string) as key', 'to_json(struct(*)) as value')

Post a Comment for "Pyspark Merge Multiple Columns Into A Json Column"