Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Pyspark windows function: preceding and following event

I have the following dataframe in pyspark:

+------------------- +-------------------+---------+-----------------------+-----------+
|device_id           |order_creation_time|order_id |status_check_time      |status_code|
+--------------------+-------------------+---------+-----------------------+-----------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1  |200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200        |
+--------------------+-------------------+---------+-----------------------+-----------+

I need to get the time of the status_check_time immediately preceding, and immediately after the order_creation_time.

The order_creation_time column will be always constant across the same order_id (so, each order_id has only 1 order_creation_time)

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

In this case, the output should be:

+------------------- +-------------------+---------+---------------------------+-----------------------+
|device_id           |order_creation_time|order_id |previous_status_check_time |next_status_check_time |
+--------------------+-------------------+---------+---------------------------+-----------------------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676    |2022-11-26 23:54:57.682|
+--------------------+-------------------+---------+---------------------------+-----------------------+

I was trying to use lag and lead functions, but I’m not getting the desired output:

ss = (
    SparkSession.
    builder.
    appName("test").
    master("local[2]").
    getOrCreate()
)
data = [
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:33.858", "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:13.1"  , "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:57.682", "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:36.676", "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:21.293", "status_code": 200}  
]
     
df = ss.createDataFrame(data)

windowSpec  = Window.partitionBy("device_id").orderBy("status_check_time")

(
   df.withColumn(
    "previous_status_check_time", lag("status_check_time").over(windowSpec)
   ).withColumn(
    "next_status_check_time", lead("status_check_time").over(windowSpec)
   ).show(truncate=False) 
)

Any ideas of how to fix this??

>Solution :

We can calculate the difference between the two timestamps in seconds and retain the ones that are the closest negative and closest positive.

data_sdf. \
    withColumn('ts_diff', func.col('status_check_time').cast('long') - func.col('order_creation_time').cast('long')). \
    groupBy([k for k in data_sdf.columns if k != 'status_check_time']). \
    agg(func.max(func.when(func.col('ts_diff') < 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('previous_status_check_time'),
        func.min(func.when(func.col('ts_diff') >= 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('next_status_check_time')
        ). \
    show(truncate=False)

# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
# |device_id           |order_creation_time|order_id |status_code|previous_status_check_time|next_status_check_time |
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|200        |2022-11-26 23:54:36.676   |2022-11-26 23:54:57.682|
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+

The timestamp difference results in the following

# +--------------------+-------------------+---------+-----------------------+-----------+-------+
# |device_id           |order_creation_time|order_id |status_check_time      |status_code|ts_diff|
# +--------------------+-------------------+---------+-----------------------+-----------+-------+
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200        |-52    |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1  |200        |-32    |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200        |-16    |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200        |5      |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200        |20     |
# +--------------------+-------------------+---------+-----------------------+-----------+-------+
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading