Subject: [PySpark] How to write HFiles as an 'append' to the same directory?


I have a process in Apache Spark that attempts to write HFiles to S3 in a batched process. I want the resulting HFiles in the same directory, as they are in the same column family. However, I'm getting a 'directory already exists error' when I try to run this on AWS EMR. How can I write Hfiles via Spark as an 'append', like I can do via a CSV?

The batch writing function looks like this:

for col_group in split_cols:
    processed_chunk = batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group, pandas_udf_func, group_by_args)

    hfile_writer.write_hfiles(processed_chunk, output_path,
                              zookeeper_ip, table_name, constants.DEFAULT_COL_FAMILY)

The actual function to write the Hfiles is this:

rdd.saveAsNewAPIHadoopFile(output_path,
                           constants.OUTPUT_FORMAT_CLASS,
                           keyClass=constants.KEY_CLASS,
                           valueClass=constants.VALUE_CLASS,
                           keyConverter=constants.KEY_CONVERTER,
                           valueConverter=constants.VALUE_CONVERTER,
                           conf=conf)

The exception I'm getting:
Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 'metadata_path=/tmp/metadata.csv', 'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles', 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], job_name='matrix_transformations')

job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', '/tmp/metadata.csv'], ['output_path', 's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'], ['group_by_args', 'cluster_id'], ['zookeeper_ip', 'ip-172-30-5-36.ec2.internal'], ['table_name', 'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]

Traceback (most recent call last):

  File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in <module>

    job_module.transform(spark, **job_args)

  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 93, in transform

  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 73, in write_split_columnwise_transform

  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py", line 44, in write_hfiles

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.

: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median already exists

        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)

        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)

        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)

        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)

        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)

        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)

        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)

        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991)

        at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:584)

        at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.C