Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

pyspark udf function storing incorrect data despite function producing correct result

So I have this weird issue. I’m using a huge dataset that has dates and times in it represented by a single string. This data can be easily converted using datetime.strptime(), but the problem is the data is so huge, I need to use pyspark to convert it. No problem, I thought, I scoured stackoverflow and saw UDFs! so I made one. unfortunately, the values stored in the dataframe don’t match what the function actually produces. I assumed a function would be executed row by row as spark sees the data, but it doesn’t seem like that’s happening.

Here’s what I have (data is a pyspark dataframe called result and I’m showing the first 5 rows):

timestamp node_id subsys sensor par val_raw val_hrf
2018/01/01 00:00:06 001e0610e532 chemsense lps25h temp -954 -9.54
2018/01/01 00:00:30 001e0610e532 chemsense lps25h temp -954 -9.54
2018/01/01 00:00:54 001e0610e532 chemsense lps25h temp -957 -9.57
2018/01/01 00:01:18 001e0610e532 chemsense lps25h temp -961 -9.61
2018/01/01 00:01:42 001e0610e532 chemsense lps25h temp -962 -9.62

To convert the timestamp column into actionable data, I convert it to a float using a custom function:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

def timeswap(x:str):
    print(x)
    utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
    print(utime)
    return utime

I have confirmed this function works properly. So I go ahead and run it on the entire column and decided to make a new column called unixTime to store it:

timeUDF = spark.udf.register('timeUDF',timeswap,FloatType()) result_conv = result.withColumn('unixTime', timeUDF('timestamp'))

Seems like it worked. I spent weeks thinking this was accurate, running algorithms on the data only to find out recently the data is clumping in a way it shouldn’t; multiple readings on a single date. So I go ahead and tell spark to print the column. Doing so actually causes the function to call for each row. I new this would be a thing, so I put in the print statements as a sanity check: result_conv.select('unixTime').head(5)

It output this # comments by me:

2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0)]

Does anyone know what I’m missing here? I’ve even confirmed that the float in the row list doesn’t exist when running more than 5 lines, so I don’t know where that value is arising from or why it’s duplicating across rows. It’s neither the average nor the median value (and those shouldn’t be used anyway), and I don’t know why it’s duplicating (the number of duplicates isn’t consistent when I look at longer stretches of rows). I’d really like to avoid having to convert this to a pandas DF, then back to a spark DF to do this. Bottom line is I need to convert the date string into a unixtime float that is unique per line for this sensor (as it is in the data).

Thanks for any help!

>Solution :

You don’t have to use UDFs. You can use inbuilt pyspark functions to do the same task. UDFs are the last resort. They slow down your program.

Here’s what I did. I think the minor difference in value is due to timezone issues probably. Not sure though. If you better specify the problem, I could help further. E.g. (1514745006 (my computer) - 1514782806 (your computer)) = 37800 seconds = 10.5 hours. So it means you are 10.5 hrs ahead of my timezone.

import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")

data1 = [
    ["2018/01/01 00:00:06"],
    ["2018/01/01 00:00:30"],
    ["2018/01/01 00:00:54"],
    ["2018/01/01 00:01:18"],
    ["2018/01/01 00:01:42"],


]

df1Columns = ["time_col"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

# 1514782806.0  # the correct output from the function

df1 = df1.withColumn("integer_value", F.unix_timestamp(F.to_timestamp('time_col', 'yyyy/MM/dd HH:mm:ss')))
df1.show(n=100, truncate=False)

Output :

+-------------------+-------------+
|time_col           |integer_value|
+-------------------+-------------+
|2018/01/01 00:00:06|1514745006   |
|2018/01/01 00:00:30|1514745030   |
|2018/01/01 00:00:54|1514745054   |
|2018/01/01 00:01:18|1514745078   |
|2018/01/01 00:01:42|1514745102   |
+-------------------+-------------+
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading