I have a connector which feeds me millions of entries. Some of those entries have a field
date whose value is the same for all of them, but others don’t. I need to get this value and set it to every entry on my pipeline.
If it was not a distributed pipepline, I would do something like this:
def set_date(entries): for entry in entries: mydate = entry.get('date') if mydate: break for entry in entries: entry['date'] = mydate
However, I have to treat each entry individually and not as a set, so I can not write the logic like this.
Moreover, we can not assure that every set of PCollection has at least one entry with the desired
date field. It is unlikely, but it could still happen that all the entries on a PCollection does not have the
date field, so the logic I have written above is not valid. I need to write some kind of PTransform which saves the
date value to a class attribute, and on some type of “finally” clause, set the value on each entry.
What is the correct way of implementing this?
Great question! For Beam in a Batch pipeline you can do this with a side input. You would do something like this:
def get_date(entry): if 'date' in entry: yield entry['date'] def put_date(entry, date): entry['date'] = date return entry with beam.Pipeline(...) as p: entries = p | ReadEntries(....) # First obtain the date from the entires. # We do this by outputting the date from the entries with a date, # and then we 'sample' one (this is a simple combiner that lets us # pick one of the dates - since they're all the same). date_pc = beam.pvalue.AsList( entries | beam.FlatMap(get_date) | beam.Combine(Sample.FixedSizeGlobally(1)) # We then use the date_pc side input to join it into entries. dated_entries = entries | beam.Map(put_date, date_pc)
Let me know if this helps.
If none of the entries has a
date attribute, then we may need to do something else. Happy to iterate!