Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Error in filtering for rows with array size = 1 pyspark

previously, the dataframe was like that

+----------+--------------------+
|     appId|                lang|
+----------+--------------------+
|1000098520|              ["EN"]|
|1001449696|              ["EN"]|
|1001780528|["AR","ZH","CS","...|
|1001892954|              ["EN"]|
|1001892954|              ["EN"]|
|1001976488|["EN","FR","DE","...|
|1002028916|              ["EN"]|
|1002908393|              ["EN"]|
|1003066972|["EN","FR","DE","...|
|1004217104|              ["EN"]|
|1004552566|              ["EN"]|
|1005192468|              ["EN"]|
|1005488142|["EN","JA","KO","...|
root
 |-- appId: string (nullable = true)
 |-- lang: string (nullable = true)

I tried to use json.loads() to convert to array with strings. But I think it somehow does not conform to the json.. how can i convert it to array of strings?

+----------+--------------------+--------+
|     appId|                lang|len_lang|
+----------+--------------------+--------+
|1000098520|                [EN]|       1|
|1001449696|                [EN]|       1|
|1001780528|[AR, ZH, CS, NL, ...|      25|
|1001892954|                [EN]|       1|
|1001892954|                [EN]|       1|
|1001976488|    [EN, FR, DE, ES]|       4|
|1002028916|                [EN]|       1|
|1002908393|                [EN]|       1|

I have this dataframe. The lang column was previously of string type, but i converted it to array type using udf json.loads(). I then want to filter for appids with only ‘EN’ as the language i.e. array size == 1 and only contains ‘EN’.

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

I tried to do a where() statement with F.size(F.col('lang'))==1 & F.array_contains(F.col('lang','EN'))… but I get this error

21/11/19 10:07:32 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/../server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
    eval_type = read_int(infile)
  File "/../spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:260)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 352, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 142, in dump_stream
    for obj in iterator:
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 341, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 5, in <lambda>
  File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 2, in parse_array_from_string
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)```

>Solution :

Use the from_json function like the following example.

import pyspark.sql.functions as F

.....
df = df.withColumn('lang', F.expr('from_json(lang,"array<string>")')).select('*', F.size('lang').alias('len_lang'))
df.printSchema()
df.show(truncate=False)
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading