Iterating Throws Rows Of A Dataframe And Setting Value In Spark
Solution 1:
This is approaching the limit of complexity that's possible with the DataFrame API. Someone else may be able to suggest a method of doing this with DataFrames, but personally I think the RDD API is much more suited to this. Here's an example to give you an idea of how to structure your algorithms for Spark:
data = [(datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1'),
(datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2'),
(datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2'),
(datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1')]
rdd = sc.parallelize(data)
def toTimeSince(row):
cust_user_app, timestamps =row
timestamps = sorted(timestamps)
result= [(timestamps[0], *cust_user_app, timedelta(30))]
previous_timestamp = timestamps[0]
fortimestampin sorted(timestamps)[1:]:
result.append((timestamp, *cust_user_app, timestamp- previous_timestamp))
returnresult
(rdd
.map(lambda row: (row[1:], [row[0]])) # Data looks like ((customer, user, app), [timestamp])
.reduceByKey(lambda a, b: a + b) # Data looks like ((customer, user, app), list_of_timestamps)
.flatMap(toTimeSince) # Data looks like (timestamp, customer, user, app, time_since_previous)
.collect())
Result:
[(datetime.datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2', datetime.timedelta(30)),
(datetime.datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2', datetime.timedelta(30)),
(datetime.datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1', datetime.timedelta(30)),
(datetime.datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1', datetime.timedelta(0, 180))]
The key points are:
- The algorithm as you've described it is not inherently suited to Spark - there is a strong dependence between rows (every row must be calculated by comparing to another row), which is difficult to parallelize.
- My suggestion uses Spark to aggregate a list of timestamps for records with the same customer, user and app. Following this, it's easy to sort the timestamps for each customer-user-app combination and expand back out into the dataset you want.
Solution 2:
Its possible you have to use window function in pyspark and partition by for that your window will be user and app for this. you have to give rank then and if the rank is one then set to your default value otherwise current time - previous time. I think that's what you wanted to do.
In sql terms you have to use partition by clause but to use this in pyspark you have to use window. Hope this will solve your problem a bit lazy to write the code sorry for that.
Post a Comment for "Iterating Throws Rows Of A Dataframe And Setting Value In Spark"