So I have this weird issue. I’m using a huge dataset that has dates and times in it represented by a single string. This data can be easily converted using datetime.strptime(), but the problem is the data is so huge, I need to use pyspark to convert it. No problem, I thought, I scoured stackoverflow and saw UDFs! so I made one. unfortunately, the values stored in the dataframe don’t match what the function actually produces. I assumed a function would be executed row by row as spark sees the data, but it doesn’t seem like that’s happening.
Here’s what I have (data is a pyspark dataframe called result and I’m showing the first 5 rows):
| timestamp | node_id | subsys | sensor | par | val_raw | val_hrf |
|---|---|---|---|---|---|---|
| 2018/01/01 00:00:06 | 001e0610e532 | chemsense | lps25h | temp | -954 | -9.54 |
| 2018/01/01 00:00:30 | 001e0610e532 | chemsense | lps25h | temp | -954 | -9.54 |
| 2018/01/01 00:00:54 | 001e0610e532 | chemsense | lps25h | temp | -957 | -9.57 |
| 2018/01/01 00:01:18 | 001e0610e532 | chemsense | lps25h | temp | -961 | -9.61 |
| 2018/01/01 00:01:42 | 001e0610e532 | chemsense | lps25h | temp | -962 | -9.62 |
To convert the timestamp column into actionable data, I convert it to a float using a custom function:
def timeswap(x:str):
print(x)
utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
print(utime)
return utime
I have confirmed this function works properly. So I go ahead and run it on the entire column and decided to make a new column called unixTime to store it:
timeUDF = spark.udf.register('timeUDF',timeswap,FloatType()) result_conv = result.withColumn('unixTime', timeUDF('timestamp'))
Seems like it worked. I spent weeks thinking this was accurate, running algorithms on the data only to find out recently the data is clumping in a way it shouldn’t; multiple readings on a single date. So I go ahead and tell spark to print the column. Doing so actually causes the function to call for each row. I new this would be a thing, so I put in the print statements as a sanity check: result_conv.select('unixTime').head(5)
It output this # comments by me:
2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0)]
Does anyone know what I’m missing here? I’ve even confirmed that the float in the row list doesn’t exist when running more than 5 lines, so I don’t know where that value is arising from or why it’s duplicating across rows. It’s neither the average nor the median value (and those shouldn’t be used anyway), and I don’t know why it’s duplicating (the number of duplicates isn’t consistent when I look at longer stretches of rows). I’d really like to avoid having to convert this to a pandas DF, then back to a spark DF to do this. Bottom line is I need to convert the date string into a unixtime float that is unique per line for this sensor (as it is in the data).
Thanks for any help!
>Solution :
You don’t have to use UDFs. You can use inbuilt pyspark functions to do the same task. UDFs are the last resort. They slow down your program.
Here’s what I did. I think the minor difference in value is due to timezone issues probably. Not sure though. If you better specify the problem, I could help further. E.g. (1514745006 (my computer) - 1514782806 (your computer)) = 37800 seconds = 10.5 hours. So it means you are 10.5 hrs ahead of my timezone.
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")
data1 = [
["2018/01/01 00:00:06"],
["2018/01/01 00:00:30"],
["2018/01/01 00:00:54"],
["2018/01/01 00:01:18"],
["2018/01/01 00:01:42"],
]
df1Columns = ["time_col"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)
# 1514782806.0 # the correct output from the function
df1 = df1.withColumn("integer_value", F.unix_timestamp(F.to_timestamp('time_col', 'yyyy/MM/dd HH:mm:ss')))
df1.show(n=100, truncate=False)
Output :
+-------------------+-------------+
|time_col |integer_value|
+-------------------+-------------+
|2018/01/01 00:00:06|1514745006 |
|2018/01/01 00:00:30|1514745030 |
|2018/01/01 00:00:54|1514745054 |
|2018/01/01 00:01:18|1514745078 |
|2018/01/01 00:01:42|1514745102 |
+-------------------+-------------+