效能調整

對於某些工作負載,透過將資料快取在記憶體中或啟用一些實驗選項,可以提升效能。

將資料快取在記憶體中

Spark SQL 可以使用內建記憶體欄位格式快取表格,方法是呼叫 spark.catalog.cacheTable("tableName")dataFrame.cache()。然後,Spark SQL 只會掃描必要的欄位,並會自動調整壓縮以將記憶體使用量和 GC 壓力降到最低。您可以呼叫 spark.catalog.uncacheTable("tableName")dataFrame.unpersist() 將表格從記憶體中移除。

可以使用 SparkSession 上的 setConf 方法,或使用 SQL 執行 SET key=value 指令來設定內建記憶體快取。

屬性名稱預設值意義版本
spark.sql.inMemoryColumnarStorage.compressed true 設定為 true 時,Spark SQL 會根據資料統計自動為每個欄位選取壓縮編解碼器。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制欄位快取的批次大小。較大的批次大小可以提升記憶體使用率和壓縮,但快取資料時有發生 OOM 的風險。 1.1.1

其他設定選項

以下選項也可以用來調整查詢執行的效能。隨著更多最佳化自動執行,這些選項在未來版本中可能會被棄用。

屬性名稱預設值意義版本
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 讀取檔案時,封裝到單一分區中的最大位元組數。此設定僅在使用檔案為基礎的來源(例如 Parquet、JSON 和 ORC)時有效。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) 開啟檔案的估計成本,以在相同時間內可以掃描的位元組數來衡量。在將多個檔案放入分區時會使用此設定。最好高估,然後包含小檔案的分區會比包含較大檔案的分區(會先排程)快。此設定僅在使用檔案為基礎的來源(例如 Parquet、JSON 和 ORC)時有效。 2.0.0
spark.sql.files.minPartitionNum 預設並行度 建議(不保證)分割檔案分區的最小數目。如果未設定,預設值為 `spark.sql.leafNodeDefaultParallelism`。此組態僅在使用基於檔案的來源(例如 Parquet、JSON 和 ORC)時有效。 3.1.0
spark.sql.files.maxPartitionNum 建議(不保證)分割檔案分區的最大數目。如果設定,則 Spark 會重新調整每個分區,以使分區數目接近此值(如果分區的初始數目超過此值)。此組態僅在使用基於檔案的來源(例如 Parquet、JSON 和 ORC)時有效。 3.5.0
spark.sql.broadcastTimeout 300

廣播連接中廣播等待時間(以秒為單位)的逾時時間

1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760(10 MB) 設定在執行連接時將廣播到所有工作節點的資料表的最大大小(以位元組為單位)。將此值設定為 -1,即可停用廣播。請注意,目前僅支援已執行指令 `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` 的 Hive Metastore 資料表的統計資料。 1.1.0
spark.sql.shuffle.partitions 200 設定在為連接或聚合交換資料時要使用的分區數目。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 設定啟用工作輸入路徑平行清單的閾值。如果輸入路徑的數目大於此閾值,Spark 會使用 Spark 分散式工作來列出檔案。否則,它會退回到順序清單。此組態僅在使用基於檔案的資料來源(例如 Parquet、ORC 和 JSON)時有效。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 設定工作輸入路徑的最大清單平行度。如果輸入路徑的數目大於此值,它會節流到使用此值。此組態僅在使用基於檔案的資料來源(例如 Parquet、ORC 和 JSON)時有效。 2.1.1

SQL 查詢的聯結策略提示

連接策略提示,即 BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL,指示 Spark 在與另一個關係連接時對每個指定的關係使用提示的策略。例如,當在資料表「t1」上使用 BROADCAST 提示時,Spark 會優先考慮將「t1」作為建置側的廣播連接(視是否有任何等值連接金鑰而定,可能是廣播雜湊連接或廣播巢狀迴圈連接),即使統計資料建議的資料表「t1」大小高於組態 spark.sql.autoBroadcastJoinThreshold

當在聯結的兩側指定不同的聯結策略提示時,Spark 會優先考慮 BROADCAST 提示,優先於 MERGE 提示,優先於 SHUFFLE_HASH 提示,優先於 SHUFFLE_REPLICATE_NL 提示。當兩側都指定 BROADCAST 提示或 SHUFFLE_HASH 提示時,Spark 會根據聯結類型和關係大小來選擇建立側。

請注意,Spark 不保證會選擇提示中指定的聯結策略,因為特定策略可能不支援所有聯結類型。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

如需更多詳細資訊,請參閱 聯結提示 文件。

SQL 查詢的合併提示

合併提示允許 Spark SQL 使用者控制輸出檔案數量,就像 Dataset API 中的 coalescerepartitionrepartitionByRange,它們可用於效能調整和減少輸出檔案數量。「COALESCE」提示只有一個分割區號碼作為參數。「REPARTITION」提示有一個分割區號碼、欄位,或兩者皆有/皆無作為參數。「REPARTITION_BY_RANGE」提示必須有欄位名稱,分割區號碼則為選用。「REBALANCE」提示有一個初始分割區號碼、欄位,或兩者皆有/皆無作為參數。

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;

如需更多詳細資訊,請參閱 分割區提示 文件。

適應式查詢執行

適應式查詢執行 (AQE) 是 Spark SQL 中的一項最佳化技術,它利用執行時期統計資料來選擇最有效的查詢執行計畫,自 Apache Spark 3.2.0 起預設啟用。Spark SQL 可以透過 spark.sql.adaptive.enabled 作為總設定來開啟和關閉 AQE。截至 Spark 3.0,AQE 中有三個主要功能:包括合併後洗牌分割區、將排序合併聯結轉換為廣播聯結,以及傾斜聯結最佳化。

合併後洗牌分區

spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 設定都為 true 時,此功能會根據地圖輸出統計資料合併後洗牌分割區。在執行查詢時,此功能簡化了洗牌分割區號碼的調整。您不需要設定適當的洗牌分割區號碼以符合您的資料集。一旦您透過 spark.sql.adaptive.coalescePartitions.initialPartitionNum 設定設定足夠大的初始洗牌分割區號碼,Spark 就可以在執行時期選擇適當的洗牌分割區號碼。

屬性名稱預設值意義版本
spark.sql.adaptive.coalescePartitions.enabled true 當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會根據目標大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)合併連續的洗牌分割區,以避免太多小型工作。 3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true 當為 true 時,Spark 會在合併連續的洗牌分割區時忽略由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目標大小(預設為 64MB),並且只遵守由 spark.sql.adaptive.coalescePartitions.minPartitionSize 指定的最小分割區大小(預設為 1MB),以最大化並行度。這是為了避免在啟用適應式查詢執行時效能下降。建議將此設定設為 false,並遵守由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目標大小。 3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 合併後洗牌分區的最小大小。其值最多為 spark.sql.adaptive.advisoryPartitionSizeInBytes 的 20%。當在分區合併期間忽略目標大小(這是預設情況)時,這很有用。 3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (無) 合併前的洗牌分區的初始數量。如果未設定,則等於 spark.sql.shuffle.partitions。此設定僅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 都啟用時才有效。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 在適應性最佳化期間(當 spark.sql.adaptive.enabled 為 true 時)洗牌分區的建議大小(以位元組為單位)。當 Spark 合併小型洗牌分區或分割傾斜的洗牌分區時,它會生效。 3.0.0

分割傾斜的洗牌分區

屬性名稱預設值意義版本
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true 當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會最佳化 RebalancePartitions 中傾斜的洗牌分區,並根據目標大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)將它們分割為較小的分區,以避免資料傾斜。 3.2.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 如果分區大小小於此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,則會在分割期間合併該分區。 3.3.0

將排序合併聯結轉換成廣播聯結

當任何聯結側的執行階段統計資料小於適應性廣播雜湊聯結閾值時,AQE 會將排序合併聯結轉換為廣播雜湊聯結。這不如一開始就規劃廣播雜湊聯結有效率,但比持續執行排序合併聯結好,因為我們可以節省兩個聯結側的排序,並在本地讀取洗牌檔案以節省網路流量(如果 spark.sql.adaptive.localShuffleReader.enabled 為 true)。

屬性名稱預設值意義版本
spark.sql.adaptive.autoBroadcastJoinThreshold (無) 設定在執行聯結時將廣播至所有工作節點的資料表的最大大小(以位元組為單位)。將此值設定為 -1,可以停用廣播。預設值與 spark.sql.autoBroadcastJoinThreshold 相同。請注意,此設定僅在適應性架構中使用。 3.2.0
spark.sql.adaptive.localShuffleReader.enabled true 當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會嘗試使用本機洗牌讀取器在不需要洗牌分區時讀取洗牌資料,例如在將排序合併聯結轉換為廣播雜湊聯結之後。 3.0.0

將排序合併聯結轉換成洗牌雜湊聯結

當所有後洗牌分區都小於閾值時,AQE 會將排序合併聯結轉換為洗牌雜湊聯結,最大閾值可以在設定 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 中看到。

屬性名稱預設值意義版本
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 設定每個分區允許建立本機雜湊映射的最大大小(以位元組為單位)。如果此值不小於 spark.sql.adaptive.advisoryPartitionSizeInBytes,且所有分區大小都不大於此設定,則聯結選擇會優先使用洗牌雜湊聯結,而不是排序合併聯結,而不論 spark.sql.join.preferSortMergeJoin 的值為何。 3.2.0

最佳化傾斜聯結

資料傾斜可能會嚴重降低聯結查詢的效能。此功能透過將傾斜的任務分割(並在需要時複製)成大致大小相等的任務,動態處理排序合併聯結中的傾斜。當 spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled 設定都啟用時,它就會生效。

屬性名稱預設值意義版本
spark.sql.adaptive.skewJoin.enabled true 當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會透過分割(並在需要時複製)傾斜分區來動態處理排序合併聯結中的傾斜。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 如果分區大小大於此因子乘以中位數分區大小,且大於 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,則視為傾斜分區。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果分區大小(以位元組為單位)大於此閾值,且大於 spark.sql.adaptive.skewJoin.skewedPartitionFactor 乘以中位數分區大小,則視為傾斜分區。理想情況下,此設定應大於 spark.sql.adaptive.advisoryPartitionSizeInBytes 3.0.0
spark.sql.adaptive.forceOptimizeSkewedJoin false 當為 true 時,強制啟用 OptimizeSkewedJoin,這是一種適應性規則,用於最佳化傾斜聯結以避免落後工作,即使會產生額外的洗牌。 3.3.0

其他

屬性名稱預設值意義版本
spark.sql.adaptive.optimizer.excludedRules (無) 設定在適應性最佳化器中要停用的規則清單,其中規則以規則名稱指定並以逗號分隔。最佳化器會記錄實際上已排除的規則。 3.1.0
spark.sql.adaptive.customCostEvaluatorClass (無) 要使用於適應性執行的自訂成本評估器類別。如果未設定,Spark 預設會使用自己的 SimpleCostEvaluator 3.2.0