Hi all,

we've just had a case where we suspect that messages get delayed from
being consumed under certain circumstances. I don't necessarily think
this is a bug, hence have not opened a jira yet but wanted to discuss
here - there's probably a best practice that I just don't know about.

The scenario is having one consumer that is subscribed to a large
number of partitions, some of which are very busy and some of which
only receive messages sporadically. When the consumer now sends a
fetchrequest for all subscribed partitions the broker starts filling
these partition by partition while honoring two parameters:

max.partition.fetch.bytes - controls the maximum size of the data that
is returned for one individual partition - default: 1 * 1024 * 1024 =
1048576 bytes
fetch.max.bytes - controls the overall maximum size of data that is
returned for the entire fetchrequest - default: 50 * 1024 * 1024 =
52428800 bytes

So by default a fetchresponse can contain data from a maximum of 50
partitions, which creates the possibility of "freezing out" partitions
if there are a lot of busy partitions in the subscriptions.
I've created a small test for this to illustrate my concern:

000 - 1 partition - 1 message
aaa - 100 partitions - 10 Mill. messages
bbb - 1000 partitions - 50 Mill. messages
mmm - 1 partition - 1 message
zzz - 100 partitions - 10 Mill messages

When I consume from these with default settings and simply print the
time I first receive a message from a topic I get the following:
Got first record from topic aaa after 747 ms
Got first record from topic bbb after 2764 ms
Got first record from topic zzz after 15068 ms
Got first record from topic 000 after 16588 ms
Got first record from topic mmm after 16588 ms

So as we can see the topics with only one partition get stuck behind
the larger topics with data to be read. I am unsure in what order the
broker iterates over the partitions, but I've always seen the same
general order in the output, so there seems to be some factor
influencing this.
One potential fix that I identified was to reduce the
max.partition.fetch.bytes parameter, so that more partitions can be
included in a fetchresponse. If I rerun the test with a value of 1024
I get:

Got first record from topic aaa after 5446 ms
Got first record from topic bbb after 5469 ms
Got first record from topic zzz after 5744 ms
Got first record from topic mmm after 5762 ms
Got first record from topic 000 after 5762 ms

Which looks much better, but I have doubts whether this is the actual
solution as this could lead to an increase in the number of fetch
requests that are being sent, when only a few partitions have new
5 Partitions with 10mb of new data each would fit in 10 requests with
default settings, but need 10240 with my adjusted settings.

This topic is currently also being discussed in the thread on KIP-349
[1] but consensus seems to be that there is no real need for a feature
like this.

Are there common patterns to get around this? The obvious solution
would be scaling the load across more consumers of course, either by
adding them to the consumer group or by splitting the topics over
consumers, but that sort of just makes it a question of scale until it
may happen again.

Would it potentially be worthwhile looking into code changes to
improve handling for these edge cases?
Keeping track of the last time partitions returned data for a consumer
group and prioritizing "oldest" partitions for example. This would
need memory on the broker though which might turn out to be quite a
lot since it would scale with partition count and consumer groups.
Alternatively some sort of feedback to the consumer could be added
about partitions that were not checked due to the limits, but that
would need a wire protocol change.
Perhaps a little consumer side logic that starts fragmenting fetch
requests if it notices that responses always have data from the
maximum number of partitions.

Best regards,

[1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics