I am streaming into Big Query table using apache beam (python) and having a problem with timestamp format.
I have an event (json) with epoch timestamp (int). I want to insert into BIG QUERY table with timestamp columns .
What is the best way to do it ? Can I do it without parsing each event ?
for example :
event= {'ts' : 1630494181342 ,'user' : 'ash_max'}
Into table :
{ts: timestamp , user: string}
>Solution :
Apache Beam applies the transformations on all the elements that are passed through the pipeline. So, the defined pipeline steps will convert the epoch time epoch time to datetime format on a per element basis before ingesting it into BigQuery.
The following Apache Beam pipeline code will convert the epoch time in seconds to datetime ie. timestamp
sample code:
import apache_beam as beam
class GetTimestamp(beam.DoFn):
def process(self, mytime, timestamp=beam.DoFn.TimestampParam):
yield '{}'.format(timestamp.to_utc_datetime())
with beam.Pipeline() as pipeline:
plant_timestamps = (
pipeline
| 'My Time' >> beam.Create([
{'ts': 1633013727, 'user': 'ash_max'},
{'ts': 1590969600, 'user':'samwilliam'},
])
| 'With timestamps' >> beam.Map(
lambda mytime: beam.window.TimestampedValue(mytime, mytime['ts']))
| 'Get timestamp' >> beam.ParDo(GetTimestamp())
| beam.Map(print)
)