Unfortunately the "write" portion of the reshuffle cannot be
parallelized more than the source that it's reading from. In my
experience, generally the read is the bottleneck in this case, but
it's possible (e.g. if the input compresses extremely well) that it is
the write that is slow (which you seem to indicate based on your
observation of the UI, right?).

It could be that materializing to temporary files is cheaper than
materializing randomly to shuffle (especially on pre-portable Python).
In that case you could force a fusion break with a side input instead.
E.g.

class FusionBreak(beam.PTransform):
    def expand(self, pcoll):
        # Create an empty PCollection that depends on pcoll.
        empty = pcoll | beam.FlatMap(lambda x: ())
        # Use this empty PCollection as a side input, which will force
a fusion break.
        return pcoll | beam.Map(lambda x, unused: x,
beam.pvalue.AsIterable(empty))

which could be used in place of Reshard like

    p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...

You'll probably want to be sure to pass the use_fastavro experiment as well.

On Wed, May 15, 2019 at 6:53 AM Niels Basjes <[EMAIL PROTECTED]> wrote: