Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Apache Beam: how to discard messages?

I have a pipeline that:

  1. Reads messages from pubsub
  2. Converts them to a domain object
  3. Applies fixed window
  4. Sends data back to a pubsub topic

I would like to process only specific messages – for example having a specific attribute and discard all other messages. How can this be done in beam?

Can I simply skip c.outputWithTimestamp(…); for the messages that should be discarded?

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

My code:

        pipeline.apply("Read PubSub messages",
                    PubsubIO.
                    readStrings().
                    fromSubscription(pubsubSub))

            .apply("Convert to DeviceData",
                    ParDo.of(new DoFn<String, KV<String, DeviceData>>() {
                        @Override
                        public Duration getAllowedTimestampSkew() {
                            return new Duration(Long.MAX_VALUE);
                        }

                        @ProcessElement
                        public void processElement(ProcessContext c) {
                            String message = c.element();
                            DeviceData data = new Gson().fromJson(message, DeviceData.class);
                            String sourceId = data.getSensorId() != null ? data.getSensorId() : data.getFormulaId();

                            // use timestamp from payload
                            Long timeInNanoSeconds = data.getTimeInNanoSeconds();
                            Instant timestamp = ClockUtil.fromNanos(timeInNanoSeconds);
                            long millis = timestamp.toEpochMilli();

                            c.outputWithTimestamp(KV.of(sourceId, data), new org.joda.time.Instant(millis));
                        }
                    }))

            .apply("Apply fixed window", window)
            .apply("Group by inputId", GroupByKey.create())
            .apply("Collect created buckets", ParDo.of(new GatherBuckets(options.getWindowSize())))
            .apply("Send to Pub/sub", PubsubIO.writeStrings().to(topic));

>Solution :

Can I simply skip c.outputWithTimestamp(…); for the messages that should be discarded?

Yes, a DoFn can emit any number of output messages per input message, including zero.

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading