I have a dictionary like this:
sample_dict = {
"A": ["aaaa\.com", "aaaa\.es"],
"B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
"C": ["ccccc\.com"],
# many more entries here
}
I would like to add a column in a Spark DataFrame which performs the following operation:
(
df
.withColumn(
"new_col",
F.when(
(F.col("filter_col").rlike("aaaa\.com")) |
(F.col("filter_col").rlike("aaaa\.es")),
F.lit("A")
)
.when(
(F.col("filter_col").rlike("bbbb\.com")) |
(F.col("filter_col").rlike("bbbb\.es")) |
(F.col("filter_col").rlike("bbbb\.net")),
F.lit("B")
)
.when(
(F.col("filter_col").rlike("cccc\.com")),
F.lit("C")
)
.otherwise(None)
)
)
But, of course, I would like it to be dynamical, so that I may add new components to my dictionary and the column would automatically consider them and add a new category based on the rules.
Is this possible?
>Solution :
If you could alter you column such that you could look for exact matches you could use df.replace():
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de'),
Row(filter_col='aaa.es'),
Row(filter_col='bbb.de'),
Row(filter_col='bbb.es'),
])
d = {
'aaa.de': 'A',
'aaa.es': 'A',
'bbb.de': 'B',
'bbb.es': 'B',
}
(
df
.withColumn('new_col', F.col('filter_col'))
.withColumn('new_col', F.when(F.col('new_col').isin(list(d.keys())), F.col('new_col')))
.replace(d, None, subset='new_col')
.show()
)
# Output:
+----------+-------+
|filter_col|new_col|
+----------+-------+
| aaa.de| A|
| aaa.es| A|
| bbb.de| B|
| bbb.es| B|
| foo| null|
+----------+-------+
There might be a more performant way to replace values not mentioned in your dictionary with "None" (your "otherwise" condition).
Update:
If the reformatting is not possible, you would have to iterate through your dict:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de/foo'),
Row(filter_col='aaa.es/foo'),
Row(filter_col='bbb.de/foo'),
Row(filter_col='bbb.es/foo'),
Row(filter_col='foo'),
])
d = {
'aaa\.de': 'A',
'aaa\.es': 'A',
'bbb\.de': 'B',
'bbb\.es': 'B',
}
df = df.withColumn('new_col', F.lit(None).cast('string'))
for k,v in d.items():
df = df.withColumn('new_col', F.when(F.col('filter_col').rlike(k), v).otherwise(F.col('new_col')))
df.show()
# Output
+----------+-------+
|filter_col|new_col|
+----------+-------+
|aaa.de/foo| A|
|aaa.es/foo| A|
|bbb.de/foo| B|
|bbb.es/foo| B|
| foo| null|
+----------+-------+