reverse of itertools.groupby?

I’m composing generators together for some data processing. I first batch the data generator for threading in an API call like:

from itertools import groupby, count
def batch(data: List[Any], size=4):
    c = count()
    for _, g in groupby(data, lambda _: next(c)//size):
        yield g  

which i then feed to the threader to do an API call

from concurrent.futures import ThreadPoolExecutor
def thread(data: Iterable, func: Callable, n=4):
    with ThreadPoolExecutor(max_workers=n) as executor:
        for batch in data:
            yield executor.map(func, batch) 

now I’m trying to merge the batches back into a list/generator for use down stream in the generator pipeline. I tried this

from itertools import chain
def flat_map(batches: Iterable):
    for i in list(chain(batches)):
        yield i

But i seems to still be a generator and not an item from the list?

Answer

You wanted chain(*batches) or chain.from_iterable(batches). chain(batches) is basically just yielding the same values as using batches directly would get you, it just adds a layer of wrapping. So the correct code (without listifying, which is almost certainly wrong here) is just:

from itertools import chain
def flat_map(batches: Iterable):
    return chain.from_iterable(batches)  # chain(*batches) would also work, but if batches is an iterator itself, it would be forced to eagerly run to completion first; chain.from_iterable can begin work when the first batch is ready

you don’t even need yield since the iterator is already producing what you want. If you need it to be a true generator, just replace return with yield from for a similar result.

Also note: You might avoid the need for the function entirely by just changing:

yield executor.map(func, batch) 

to:

yield from executor.map(func, batch) 

so thread is flattening as it goes in the first place.