Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

How to rewrite your code in OOP using Pyspark

I have a simple dataframe

sdf0 = spark.createDataFrame(
    [
        ("eng", "BlackBerry sells legacy patents of mobile devices"),
        ("eng", "Amazon to shut down publishing house Westland Books"),
    ],
    ["lang", "title"],
)
lang title
eng BlackBerry sells legacy patents of mobile devices
eng Amazon to shut down publishing house Westland Books

I also have a code that extracts filtered words from text

# to lower
sdf = sdf0.withColumn("low_title", F.lower(F.col("title")))

# tokenize
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
sdf1 = tokenizer.transform(sdf)

# filter stopwords
import stopwordsiso

available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
    stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
sdf_filtered = reduce(
    lambda a, b: a.unionAll(b),
    (
        StopWordsRemover(
            inputCol="tokens", outputCol="filtered_words", stopWords=value
        ).transform(sdf1.where(F.col("lang") == key))
        for key, value in stopwords.items()
    ),
)

# explode
sdf_exp = (
    sdf_filtered.withColumn("filtered_word", F.explode("filtered_words"))
    .select("lang", "filtered_word")
    .withColumn(
        "filtered_word",
        F.regexp_replace(
            "filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~–—0-9]', ""
        ),
    )
    .filter(F.length(F.col("filtered_word")) > 0)
)

Output:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

+----+-------------+
|lang|filtered_word|
+----+-------------+
| eng|   blackberry|
| eng|        sells|
| eng|       legacy|
| eng|      patents|
| eng|       mobile|
| eng|      devices|
| eng|       amazon|
| eng|         shut|
| eng|   publishing|
| eng|        house|
| eng|     westland|
| eng|        books|
+----+-------------+

I tried to rewrite it in a class, but I keep getting an error that the columns does not exist. How do I bind previous data frames and feed them into functions? Or is there a simpler way to not write a lot of functions. Thank you.

import pyspark.sql.functions as F
import stopwordsiso


class Filter_words:
    def __init__(self, sdf):
        self.sdf = sdf

    def lower(self):
        self.sdf = self.sdf.withColumn("low_title", F.lower(F.col("title")))

    def tokenize(self):
        tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
        self.sdf = tokenizer.transform(sdf)

    def stop_words(self):
        available_lang = {"eng": "en"}
        stopwords_iso = {}
        for lang in available_langs:
            stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
        stopwords = {k: list(v) for k, v in stopwords_iso.items()}
        self.sdf = reduce(
            lambda a, b: a.unionAll(b),
            (
                StopWordsRemover(
                    inputCol="tokens", outputCol="filtered_words", stopWords=value
                ).transform(sdf.where(F.col("lang") == key))
                for key, value in stopwords.items()
            ),
        )

    def explode_column(self):
        self.sdf = (
            self.sdf.withColumn("filtered_word", F.explode("tokens"))
            .select("lang", "filtered_word")
            .withColumn(
                "filtered_word",
                F.regexp_replace(
                    "filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~–—0-9]', ""
                ),
            )
            .filter(F.length(F.col("filtered_word")) > 0)
        )


sdf = Filter_words(sdf0)

>Solution :

Found it – you have at least 3 errors in your code. Use an IDE on a new session, you’ll see all the errors


  1. tokenize

    def tokenize(self):
        tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
        self.sdf = tokenizer.transform(sdf)

# should be 

    def tokenize(self):
        tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
        self.sdf = tokenizer.transform(self.sdf) # self missing
  1. stop_words
    def stop_words(self):
        available_lang = {"eng": "en"}
        stopwords_iso = {}
        for lang in available_langs:
            stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
        stopwords = {k: list(v) for k, v in stopwords_iso.items()}
        self.sdf = reduce(
            lambda a, b: a.unionAll(b),
            (
                StopWordsRemover(
                    inputCol="tokens", outputCol="filtered_words", stopWords=value
                ).transform(sdf.where(F.col("lang") == key))
                for key, value in stopwords.items()
            ),
        )

# should be 

    def stop_words(self):
        available_langs = {"eng": "en"} # final -s missing
        stopwords_iso = {}
        for lang in available_langs:
            stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
        stopwords = {k: list(v) for k, v in stopwords_iso.items()}
        self.sdf = reduce(
            lambda a, b: a.unionAll(b),
            (
                StopWordsRemover(
                    inputCol="tokens", outputCol="filtered_words", stopWords=value
                ).transform(self.sdf.where(F.col("lang") == key))  # self missing
                for key, value in stopwords.items()
            ),
        )
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading