Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Pyspark RDD create 2 rows from one row into new Dataframe

I have a dataframe like:

data = [('valorant','web', 'start'),
  ('counter-strike','android', 'start'),
  ('sims','web', 'finished'), 
]

columns = ["game","platform", "type"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
+--------------+--------+--------+
|          game|platform|    type|
+--------------+--------+--------+
|      valorant|     web|   start|
|counter-strike| android|   start|
|          sims|     web|finished|
+--------------+--------+--------+

Which I want to turn into:

+--------------+-----+
|          game|count|
+--------------+-----+
|      valorant|    1|
|counter-strike|    1|
|          sims|    1|
|          sims|    1|
+--------------+-----+

So that if type == 'finished' the new RDD should have 2 rows with value 1 instead of just one row with value 1.
Is there any way I can do this without having to map dataframe 2 times and then merge those RDDs?

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

If I do:

def func1(x):
  if x.type == "start":
    return (x.game, 1)
  elif x.type == "finished":
    return ((x.game, 1), (x.game, 1))

rdd2=df.rdd.map(lambda x: func1(x))
df2=rdd2.toDF(['game',  'value'])
df2.show(truncate=False)
+---------------------------+-----+
|game                       |value|
+---------------------------+-----+
|valorant                   |1    |
|counter                    |1    |
|[Ljava.lang.Object;@4b01785|null |
+---------------------------+-----+

It does not work obviously since func1 expects one value in return. Any ideas?

>Solution :

when expression + explode literal array:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "count",
    F.explode(
        F.when(F.col("type") == "start", F.array(F.lit(1)))
            .when(F.col("type") == "finished", F.array(F.lit(1), F.lit(1)))
    )
).drop("platform", "type")

df1.show()

#+--------------+-----+
#|          game|count|
#+--------------+-----+
#|      valorant|    1|
#|counter-strike|    1|
#|          sims|    1|
#|          sims|    1|
#+--------------+-----+
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading