Subject: Using existing distribution for join when subset of keys


Is the following what you trying to do?

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("x", "y")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("x", "y")
df1.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, Seq("x", "y"))
joined.explain

I see no exchange:

== Physical Plan ==
*(3) Project [x#342, y#343]
+- *(3) SortMergeJoin [x#342, y#343], [x#346, y#347], Inner
   :- *(1) Sort [x#342 ASC NULLS FIRST, y#343 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [x#342, y#343]
   :     +- *(1) Filter (isnotnull(x#342) && isnotnull(y#343))
   :        +- *(1) FileScan parquet default.t1[x#342,y#343] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8
   +- *(2) Sort [x#346 ASC NULLS FIRST, y#347 ASC NULLS FIRST], false, 0
      +- *(2) Project [x#346, y#347]
         +- *(2) Filter (isnotnull(x#346) && isnotnull(y#347))
            +- *(2) FileScan parquet default.t2[x#346,y#347] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8

On Sun, May 31, 2020 at 2:38 PM Patrick Woody <[EMAIL PROTECTED]>
wrote: