PySpark – Combine a list of filtering conditions

For starters, let me define a sample dataframe and import the sql functions:

import pyspark.sql.functions as func

row_data = [(1, 1, 1), (1, 1, 2), (1, 1, 3),
           (1, 2, 1), (1, 2, 2), (1, 2, 3),
           (2, 1, 1), (2, 1, 2), (2, 1, 3),
           (2, 2, 1), (2, 2, 2), (2, 2, 3),
           (2, 2, 4), (2, 2, 5), (2, 2, 6)]

test_df = spark.createDataFrame(row_data, ["A", "B", "C"])

test_df.show()

This returns the following dataframe:

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  1|  1|
|  1|  1|  2|
|  1|  1|  3|
|  1|  2|  1|
|  1|  2|  2|
|  1|  2|  3|
|  2|  1|  1|
|  2|  1|  2|
|  2|  1|  3|
|  2|  2|  1|
|  2|  2|  2|
|  2|  2|  3|
|  2|  2|  4|
|  2|  2|  5|
|  2|  2|  6|
+---+---+---+

Now lets say I have a list of filtering conditions, for example, a list of filtering conditions detailing that columns A and B shall be equal to 1

l = [func.col("A") == 1, func.col("B") == 1]

I can combine these two conditions as follows and then filter the dataframe, obtaining the following result:

t = l[0] & l[1]
test_df.filter(t).show()

Result:

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  1|  1|
|  1|  1|  2|
|  1|  1|  3|
+---+---+---+

MY QUESTION

If l is a list of unknown length n (that is, a list of n filtering conditions) instead of only two, which is the most pythonic way, or a one-liner way to logically combine them in and & or | manner?

all() and any() will not work, because they are designed for simple lists of [True, False] elements.

As an example, let us say that l = [func.col("A") == 1, func.col("B") == 1, func.col("C") == 2].

Help would be much appreciated.

>Solution :

You could use reduce, or a loop. The execution plan in spark will be the same for both, so I believe it’s just a matter of preference

for c in l:
  test_df = test_df.where(c)

test_df.explain()

Produces

== Physical Plan ==
*(1) Filter ((isnotnull(A#11487L) AND isnotnull(B#11488L)) AND ((A#11487L = 1) AND (B#11488L = 1)))
+- *(1) Scan ExistingRDD[A#11487L,B#11488L,C#11489L]

and

test_df = test_df.where(reduce(lambda x, y: x & y, l))
test_df.explain()

Produces

== Physical Plan ==
*(1) Filter ((isnotnull(A#11487L) AND isnotnull(B#11488L)) AND ((A#11487L = 1) AND (B#11488L = 1)))
+- *(1) Scan ExistingRDD[A#11487L,B#11488L,C#11489L]

Leave a Reply