Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Convert python pandas iterator and string concat into pyspark

I am attempting to move a process from Pandas into Pyspark, but I am a complete novice in the latter. Note: This is an EDA process so I am not too worried about having it as a loop for now, I can optimise that at a later date.

Set up:

import pandas as pd
import numpy as np
import pyspark.pandas as ps

Dummy Data:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

df = ps.DataFrame({'id': ['ID_01', 'ID_02', 'ID_02', 'ID_03', 'ID_03'], 'name': ['Jack', 'John', 'John', 'James', 'Jamie']})
df_pandas = df.to_pandas()
df_spark = df.to_spark()
df
id name
ID_01 Jack
ID_02 John
ID_02 John
ID_03 James
ID_03 Jamie

Pandas code:

unique_ids = df_pandas['id'].unique()
for unique_id in unique_ids:
  names = '; '.join(sorted(df_pandas[df_pandas['id'] == unique_id]['name'].unique()))
  df.loc[df['id'] == unique_id, 'name'] = names
df
id name
ID_01 Jack
ID_02 John
ID_02 John
ID_03 James; Jamie
ID_03 James; Jamie

This last table is the desired output. However, I am having issues achieving this in PySpark. This is where I have got to:

unique_ids = df_spark.select('id').distinct().collect()
for unique_id in unique_ids:
    names = df_spark.filter(df_spark.id == unique_id.id).select('name').distinct()

I am then unsure how to do the next steps; i.e. how to concatenate the resulting single column DataFrame, nor how to ensure the correct replacement.

I have investigated the following sources, with no success (likely due to my inexperience in PySpark):

  • This answer shows how to concatenate columns and not rows
  • This answer might be helpful for the loc conversion (but I have not managed to get there yet
  • This answer initially proved promising, since it would remove the need for the loop as well, but I could not figure out how to do the distinct and sort equivalents on the collect_list output object

>Solution :

Try:

import pyspark.sql.functions as f

new_df = (df_spark.select(['name', 'id'])
                  .distinct()
                  .groupby('id')
                  .agg(f.concat_ws('; ', f.collect_list('name'))
                        .alias('name')))

out_df = (df_spark.join(new_df, df_spark['id'] == new_df['id'], 'left')
                  .drop(df_spark['name']).drop(new_df['id']))

Output:

>>> out_df.show()

+-----+------------+
|   id|        name|
+-----+------------+
|ID_01|        Jack|
|ID_02|        John|
|ID_02|        John|
|ID_03|James; Jamie|
|ID_03|James; Jamie|
+-----+------------+
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading