Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

How to explode column with csv string in PySpark?

I have a dataframe like this

+---+---------------------+
| id|                  csv|
+---+---------------------+
|  1|a,b,c\n1,2,3\n2,3,4\n|
|  2|a,b,c\n3,4,5\n4,5,6\n|
|  3|a,b,c\n5,6,7\n6,7,8\n|
+---+---------------------+

and I want to explode the string type csv column, in fact I’m only interested in this column. So I’m looking for a method to obtain the following dataframe from the above.

+--+--+--+
| a| b| c|
+--+--+--+
| 1| 2| 3|
| 2| 3| 4|
| 3| 4| 5|
| 4| 5| 6|
| 5| 6| 7|
| 6| 7| 8|
+--+--+--+

Looking at the from_csv documentation it seems that the insput csv string can contain only one row of data, which I found stated more clearly here. So that’s not an option.

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

I guess I could loop over the individual rows of the input dataframe, extract and parse the csv string from each row and then stitch everything together:

rows = df.collect()

for (i, row) in enumerate(rows):
  data = row['csv']
  data = data.split('\\n')
  rdd = spark.sparkContext.parallelize(data)

  df_row = (spark.read
    .option('header', 'true')
    .schema('a int, b int, c int')
    .csv(rdd))

  if i == 0:
    df_new = df_row
  else:
    df_new = df_new.union(df_row)

df_new.show()

But that seems awfully inefficient. Is there a better way to achieve the desired result?

>Solution :

Using split + from_csv functions along with transform you can do something like:

from pyspark.sql import functions as F


df = spark.createDataFrame([
    (1, r"a,b,c\n1,2,3\n2,3,4\n"), (2, r"a,b,c\n3,4,5\n4,5,6\n"),
    (3, r"a,b,c\n5,6,7\n6,7,8\n")], ["id", "csv"]
)

df1 = df.withColumn(
    "csv",
    F.transform(
        F.split(F.regexp_replace("csv", r"^a,b,c\\n|\\n$", ""), r"\\n"),
        lambda x: F.from_csv(x, "a int, b int, c int")
    )
).selectExpr("inline(csv)")

df1.show()

# +---+---+---+
# |  a|  b|  c|
# +---+---+---+
# |  1|  2|  3|
# |  2|  3|  4|
# |  3|  4|  5|
# |  4|  5|  6|
# |  5|  6|  7|
# |  6|  7|  8|
# +---+---+---+
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading