Convert python pandas iterator and string concat into pyspark

I am attempting to move a process from Pandas into Pyspark, but I am a complete novice in the latter. Note: This is an EDA process so I am not too worried about having it as a loop for now, I can optimise that at a later date. Set up: import pandas as pd import… Read More Convert python pandas iterator and string concat into pyspark

why am I not able to convert string type column to date format in pyspark?

I have a column which is in the "20130623" format. I am trying to convert it into dd-mm-YYYY. I have seen various post online including here. But I only got one solution as below from datetime import datetime df = df2.withColumn("col_name", datetime.utcfromtimestamp(int("col_name")).strftime(‘%d-%m-%y’)) However, it throws an error that the input should be int type, not… Read More why am I not able to convert string type column to date format in pyspark?

Pyspark calculate average of non-zero elements for each column

from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([(0.0, 1.2, -1.3), (0.0, 0.0, 0.0), (-17.2, 20.3, 15.2), (23.4, 1.4, 0.0),], [‘col1’, ‘col2’, ‘col3’]) df1 = df.agg(F.avg(‘col1’)) df2 = df.agg(F.avg(‘col2’)) df3 = df.agg(F.avg(‘col3’)) If I have a dataframe, ID COL1 COL2 COL3 1 0.0 1.2 -1.3 2 0.0 0.0… Read More Pyspark calculate average of non-zero elements for each column

Difference between alias and withColumnRenamed

What is the difference between: my_df = my_df.select(col(‘age’).alias(‘age2’)) and my_df = my_df.select(col(‘age’).withColumnRenamed(‘age’, ‘age2’)) >Solution : The second expression is not going to work, you need to call withColumnRenamed() on your dataframe. I assume you mean: my_df = my_df.withColumnRenamed(‘age’, ‘age2’) And to answer your question, there is no difference.

How do I append new rows to a PySpark DataFrame guaranteeing a unique ID?

I have two PySpark DataFrame objects that I wish to concatenate. One of the DataFrames df_a has a column unique_id derived using pyspark.sql.functions.monotonically_increasing_id(). The other DataFrame, df_b does not. I want to append the rows of df_b to df_a, but I need to generate values for the unique_id column that do not coincide with any… Read More How do I append new rows to a PySpark DataFrame guaranteeing a unique ID?

Selecting only one column from the right dataframe when joining

I have two huge dataframes that even contain columns with the same name that have no connection whatsoever. I have 2 join keys, though, and I want to add to data_left just one column from data_right. I tried: output_df = data_left.join(data_right, on=["join_key_1", "join_key_2"], how="left").select("data_left.*", "data_right.extraColumn") But it does not recognize the * even after importing… Read More Selecting only one column from the right dataframe when joining

Why isn't PySpark JSON writing all keys to df when a custom schema is defined?

I’m trying to read approx. 2000 files in an s3 bucket, parse the data in each file and then write the parsed output to another bucket. Each file is made up of arrays of dictionaries. e.g. [ {‘region_code’: "UK", ‘city’: "London", ‘country_name’: "England", …etc.} ] I have the following schema for my input data. These… Read More Why isn't PySpark JSON writing all keys to df when a custom schema is defined?