I have a pipeline that:
- Reads messages from pubsub
- Converts them to a domain object
- Applies fixed window
- Sends data back to a pubsub topic
I would like to process only specific messages – for example having a specific attribute and discard all other messages. How can this be done in beam?
Can I simply skip c.outputWithTimestamp(…); for the messages that should be discarded?
My code:
pipeline.apply("Read PubSub messages",
PubsubIO.
readStrings().
fromSubscription(pubsubSub))
.apply("Convert to DeviceData",
ParDo.of(new DoFn<String, KV<String, DeviceData>>() {
@Override
public Duration getAllowedTimestampSkew() {
return new Duration(Long.MAX_VALUE);
}
@ProcessElement
public void processElement(ProcessContext c) {
String message = c.element();
DeviceData data = new Gson().fromJson(message, DeviceData.class);
String sourceId = data.getSensorId() != null ? data.getSensorId() : data.getFormulaId();
// use timestamp from payload
Long timeInNanoSeconds = data.getTimeInNanoSeconds();
Instant timestamp = ClockUtil.fromNanos(timeInNanoSeconds);
long millis = timestamp.toEpochMilli();
c.outputWithTimestamp(KV.of(sourceId, data), new org.joda.time.Instant(millis));
}
}))
.apply("Apply fixed window", window)
.apply("Group by inputId", GroupByKey.create())
.apply("Collect created buckets", ParDo.of(new GatherBuckets(options.getWindowSize())))
.apply("Send to Pub/sub", PubsubIO.writeStrings().to(topic));
>Solution :
Can I simply skip c.outputWithTimestamp(…); for the messages that should be discarded?
Yes, a DoFn can emit any number of output messages per input message, including zero.