I have the following dataframe in pyspark:
+------------------- +-------------------+---------+-----------------------+-----------+
|device_id |order_creation_time|order_id |status_check_time |status_code|
+--------------------+-------------------+---------+-----------------------+-----------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1 |200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200 |
+--------------------+-------------------+---------+-----------------------+-----------+
I need to get the time of the status_check_time immediately preceding, and immediately after the order_creation_time.
The order_creation_time column will be always constant across the same order_id (so, each order_id has only 1 order_creation_time)
In this case, the output should be:
+------------------- +-------------------+---------+---------------------------+-----------------------+
|device_id |order_creation_time|order_id |previous_status_check_time |next_status_check_time |
+--------------------+-------------------+---------+---------------------------+-----------------------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676 |2022-11-26 23:54:57.682|
+--------------------+-------------------+---------+---------------------------+-----------------------+
I was trying to use lag and lead functions, but I’m not getting the desired output:
ss = (
SparkSession.
builder.
appName("test").
master("local[2]").
getOrCreate()
)
data = [
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:33.858", "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:13.1" , "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:57.682", "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:36.676", "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:21.293", "status_code": 200}
]
df = ss.createDataFrame(data)
windowSpec = Window.partitionBy("device_id").orderBy("status_check_time")
(
df.withColumn(
"previous_status_check_time", lag("status_check_time").over(windowSpec)
).withColumn(
"next_status_check_time", lead("status_check_time").over(windowSpec)
).show(truncate=False)
)
Any ideas of how to fix this??
>Solution :
We can calculate the difference between the two timestamps in seconds and retain the ones that are the closest negative and closest positive.
data_sdf. \
withColumn('ts_diff', func.col('status_check_time').cast('long') - func.col('order_creation_time').cast('long')). \
groupBy([k for k in data_sdf.columns if k != 'status_check_time']). \
agg(func.max(func.when(func.col('ts_diff') < 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('previous_status_check_time'),
func.min(func.when(func.col('ts_diff') >= 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('next_status_check_time')
). \
show(truncate=False)
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
# |device_id |order_creation_time|order_id |status_code|previous_status_check_time|next_status_check_time |
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|200 |2022-11-26 23:54:36.676 |2022-11-26 23:54:57.682|
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
The timestamp difference results in the following
# +--------------------+-------------------+---------+-----------------------+-----------+-------+
# |device_id |order_creation_time|order_id |status_check_time |status_code|ts_diff|
# +--------------------+-------------------+---------+-----------------------+-----------+-------+
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200 |-52 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1 |200 |-32 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200 |-16 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200 |5 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200 |20 |
# +--------------------+-------------------+---------+-----------------------+-----------+-------+