Subject: PyArrow Exception in Pandas UDF GROUPEDAGG()

Hi everyone,

I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.

The GROUP BY function runs on a wide dataset. The first column of the dataset contains string labels that are GROUPed on. The remaining columns are numeric values that are aggregated in the Pandas UDF. The dataset is very wide, with 50,000 columns and 3 million rows.

| label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|
|   label_a  |         2.0        |         5.6       |      7.123      |
|   label_b  |         11.0      |         1.4       |      2.345      |
|   label_a  |         3.1        |         6.2       |      5.444      |

My job runs fine on smaller datasets, with the same number of columns but fewer rows. However, when run on a dataset with 3 million rows, I see the following exception:

20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 2358)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/", line 377, in main
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/", line 286, in dump_stream
    for series in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/", line 303, in load_stream
    for batch in reader:
  File "pyarrow/ipc.pxi", line 266, in __iter__
  File "pyarrow/ipc.pxi", line 282, in pyarrow.lib._CRecordBatchReader.read_next_batch
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: read length must be positive or -1

Looking at this issue<>, it looks like PyArrow has a 2GB limit for each shard that is sent to the grouping function.

I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.

I've attached the full error log here as well. What are some workarounds that I can do to get this job running? Unfortunately, we are running up to a production release and this is becoming a severe blocker.