I have a simple dataframe
sdf0 = spark.createDataFrame(
[
("eng", "BlackBerry sells legacy patents of mobile devices"),
("eng", "Amazon to shut down publishing house Westland Books"),
],
["lang", "title"],
)
| lang | title |
|---|---|
| eng | BlackBerry sells legacy patents of mobile devices |
| eng | Amazon to shut down publishing house Westland Books |
I also have a code that extracts filtered words from text
# to lower
sdf = sdf0.withColumn("low_title", F.lower(F.col("title")))
# tokenize
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
sdf1 = tokenizer.transform(sdf)
# filter stopwords
import stopwordsiso
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
sdf_filtered = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf1.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
# explode
sdf_exp = (
sdf_filtered.withColumn("filtered_word", F.explode("filtered_words"))
.select("lang", "filtered_word")
.withColumn(
"filtered_word",
F.regexp_replace(
"filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~–—0-9]', ""
),
)
.filter(F.length(F.col("filtered_word")) > 0)
)
Output:
+----+-------------+
|lang|filtered_word|
+----+-------------+
| eng| blackberry|
| eng| sells|
| eng| legacy|
| eng| patents|
| eng| mobile|
| eng| devices|
| eng| amazon|
| eng| shut|
| eng| publishing|
| eng| house|
| eng| westland|
| eng| books|
+----+-------------+
I tried to rewrite it in a class, but I keep getting an error that the columns does not exist. How do I bind previous data frames and feed them into functions? Or is there a simpler way to not write a lot of functions. Thank you.
import pyspark.sql.functions as F
import stopwordsiso
class Filter_words:
def __init__(self, sdf):
self.sdf = sdf
def lower(self):
self.sdf = self.sdf.withColumn("low_title", F.lower(F.col("title")))
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(sdf)
def stop_words(self):
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
def explode_column(self):
self.sdf = (
self.sdf.withColumn("filtered_word", F.explode("tokens"))
.select("lang", "filtered_word")
.withColumn(
"filtered_word",
F.regexp_replace(
"filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~–—0-9]', ""
),
)
.filter(F.length(F.col("filtered_word")) > 0)
)
sdf = Filter_words(sdf0)
>Solution :
Found it – you have at least 3 errors in your code. Use an IDE on a new session, you’ll see all the errors
tokenize
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(sdf)
# should be
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(self.sdf) # self missing
stop_words
def stop_words(self):
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
# should be
def stop_words(self):
available_langs = {"eng": "en"} # final -s missing
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(self.sdf.where(F.col("lang") == key)) # self missing
for key, value in stopwords.items()
),
)