效能調整
對於某些工作負載,透過將資料快取在記憶體中或啟用一些實驗選項,可以提升效能。
將資料快取在記憶體中
Spark SQL 可以使用內建記憶體欄位格式快取表格,方法是呼叫 spark.catalog.cacheTable("tableName")
或 dataFrame.cache()
。然後,Spark SQL 只會掃描必要的欄位,並會自動調整壓縮以將記憶體使用量和 GC 壓力降到最低。您可以呼叫 spark.catalog.uncacheTable("tableName")
或 dataFrame.unpersist()
將表格從記憶體中移除。
可以使用 SparkSession
上的 setConf
方法,或使用 SQL 執行 SET key=value
指令來設定內建記憶體快取。
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 設定為 true 時,Spark SQL 會根據資料統計自動為每個欄位選取壓縮編解碼器。 | 1.0.1 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制欄位快取的批次大小。較大的批次大小可以提升記憶體使用率和壓縮,但快取資料時有發生 OOM 的風險。 | 1.1.1 |
其他設定選項
以下選項也可以用來調整查詢執行的效能。隨著更多最佳化自動執行,這些選項在未來版本中可能會被棄用。
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.files.maxPartitionBytes |
134217728 (128 MB) | 讀取檔案時,封裝到單一分區中的最大位元組數。此設定僅在使用檔案為基礎的來源(例如 Parquet、JSON 和 ORC)時有效。 | 2.0.0 |
spark.sql.files.openCostInBytes |
4194304 (4 MB) | 開啟檔案的估計成本,以在相同時間內可以掃描的位元組數來衡量。在將多個檔案放入分區時會使用此設定。最好高估,然後包含小檔案的分區會比包含較大檔案的分區(會先排程)快。此設定僅在使用檔案為基礎的來源(例如 Parquet、JSON 和 ORC)時有效。 | 2.0.0 |
spark.sql.files.minPartitionNum |
預設並行度 | 建議(不保證)分割檔案分區的最小數目。如果未設定,預設值為 `spark.sql.leafNodeDefaultParallelism`。此組態僅在使用基於檔案的來源(例如 Parquet、JSON 和 ORC)時有效。 | 3.1.0 |
spark.sql.files.maxPartitionNum |
無 | 建議(不保證)分割檔案分區的最大數目。如果設定,則 Spark 會重新調整每個分區,以使分區數目接近此值(如果分區的初始數目超過此值)。此組態僅在使用基於檔案的來源(例如 Parquet、JSON 和 ORC)時有效。 | 3.5.0 |
spark.sql.broadcastTimeout |
300 |
廣播連接中廣播等待時間(以秒為單位)的逾時時間 |
1.3.0 |
spark.sql.autoBroadcastJoinThreshold |
10485760(10 MB) | 設定在執行連接時將廣播到所有工作節點的資料表的最大大小(以位元組為單位)。將此值設定為 -1,即可停用廣播。請注意,目前僅支援已執行指令 `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` 的 Hive Metastore 資料表的統計資料。 | 1.1.0 |
spark.sql.shuffle.partitions |
200 | 設定在為連接或聚合交換資料時要使用的分區數目。 | 1.1.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | 設定啟用工作輸入路徑平行清單的閾值。如果輸入路徑的數目大於此閾值,Spark 會使用 Spark 分散式工作來列出檔案。否則,它會退回到順序清單。此組態僅在使用基於檔案的資料來源(例如 Parquet、ORC 和 JSON)時有效。 | 1.5.0 |
spark.sql.sources.parallelPartitionDiscovery.parallelism |
10000 | 設定工作輸入路徑的最大清單平行度。如果輸入路徑的數目大於此值,它會節流到使用此值。此組態僅在使用基於檔案的資料來源(例如 Parquet、ORC 和 JSON)時有效。 | 2.1.1 |
SQL 查詢的聯結策略提示
連接策略提示,即 BROADCAST
、MERGE
、SHUFFLE_HASH
和 SHUFFLE_REPLICATE_NL
,指示 Spark 在與另一個關係連接時對每個指定的關係使用提示的策略。例如,當在資料表「t1」上使用 BROADCAST
提示時,Spark 會優先考慮將「t1」作為建置側的廣播連接(視是否有任何等值連接金鑰而定,可能是廣播雜湊連接或廣播巢狀迴圈連接),即使統計資料建議的資料表「t1」大小高於組態 spark.sql.autoBroadcastJoinThreshold
。
當在聯結的兩側指定不同的聯結策略提示時,Spark 會優先考慮 BROADCAST
提示,優先於 MERGE
提示,優先於 SHUFFLE_HASH
提示,優先於 SHUFFLE_REPLICATE_NL
提示。當兩側都指定 BROADCAST
提示或 SHUFFLE_HASH
提示時,Spark 會根據聯結類型和關係大小來選擇建立側。
請注意,Spark 不保證會選擇提示中指定的聯結策略,因為特定策略可能不支援所有聯結類型。
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
如需更多詳細資訊,請參閱 聯結提示 文件。
SQL 查詢的合併提示
合併提示允許 Spark SQL 使用者控制輸出檔案數量,就像 Dataset API 中的 coalesce
、repartition
和 repartitionByRange
,它們可用於效能調整和減少輸出檔案數量。「COALESCE」提示只有一個分割區號碼作為參數。「REPARTITION」提示有一個分割區號碼、欄位,或兩者皆有/皆無作為參數。「REPARTITION_BY_RANGE」提示必須有欄位名稱,分割區號碼則為選用。「REBALANCE」提示有一個初始分割區號碼、欄位,或兩者皆有/皆無作為參數。
SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;
如需更多詳細資訊,請參閱 分割區提示 文件。
適應式查詢執行
適應式查詢執行 (AQE) 是 Spark SQL 中的一項最佳化技術,它利用執行時期統計資料來選擇最有效的查詢執行計畫,自 Apache Spark 3.2.0 起預設啟用。Spark SQL 可以透過 spark.sql.adaptive.enabled
作為總設定來開啟和關閉 AQE。截至 Spark 3.0,AQE 中有三個主要功能:包括合併後洗牌分割區、將排序合併聯結轉換為廣播聯結,以及傾斜聯結最佳化。
合併後洗牌分區
當 spark.sql.adaptive.enabled
和 spark.sql.adaptive.coalescePartitions.enabled
設定都為 true 時,此功能會根據地圖輸出統計資料合併後洗牌分割區。在執行查詢時,此功能簡化了洗牌分割區號碼的調整。您不需要設定適當的洗牌分割區號碼以符合您的資料集。一旦您透過 spark.sql.adaptive.coalescePartitions.initialPartitionNum
設定設定足夠大的初始洗牌分割區號碼,Spark 就可以在執行時期選擇適當的洗牌分割區號碼。
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.adaptive.coalescePartitions.enabled |
true |
當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會根據目標大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)合併連續的洗牌分割區,以避免太多小型工作。
|
3.0.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true |
當為 true 時,Spark 會在合併連續的洗牌分割區時忽略由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目標大小(預設為 64MB),並且只遵守由 spark.sql.adaptive.coalescePartitions.minPartitionSize 指定的最小分割區大小(預設為 1MB),以最大化並行度。這是為了避免在啟用適應式查詢執行時效能下降。建議將此設定設為 false,並遵守由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目標大小。
|
3.2.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB |
合併後洗牌分區的最小大小。其值最多為 spark.sql.adaptive.advisoryPartitionSizeInBytes 的 20%。當在分區合併期間忽略目標大小(這是預設情況)時,這很有用。
|
3.2.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(無) |
合併前的洗牌分區的初始數量。如果未設定,則等於 spark.sql.shuffle.partitions 。此設定僅在 spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 都啟用時才有效。
|
3.0.0 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
64 MB |
在適應性最佳化期間(當 spark.sql.adaptive.enabled 為 true 時)洗牌分區的建議大小(以位元組為單位)。當 Spark 合併小型洗牌分區或分割傾斜的洗牌分區時,它會生效。
|
3.0.0 |
分割傾斜的洗牌分區
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true |
當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會最佳化 RebalancePartitions 中傾斜的洗牌分區,並根據目標大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)將它們分割為較小的分區,以避免資料傾斜。
|
3.2.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 |
如果分區大小小於此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes ,則會在分割期間合併該分區。
|
3.3.0 |
將排序合併聯結轉換成廣播聯結
當任何聯結側的執行階段統計資料小於適應性廣播雜湊聯結閾值時,AQE 會將排序合併聯結轉換為廣播雜湊聯結。這不如一開始就規劃廣播雜湊聯結有效率,但比持續執行排序合併聯結好,因為我們可以節省兩個聯結側的排序,並在本地讀取洗牌檔案以節省網路流量(如果 spark.sql.adaptive.localShuffleReader.enabled
為 true)。
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.adaptive.autoBroadcastJoinThreshold |
(無) |
設定在執行聯結時將廣播至所有工作節點的資料表的最大大小(以位元組為單位)。將此值設定為 -1,可以停用廣播。預設值與 spark.sql.autoBroadcastJoinThreshold 相同。請注意,此設定僅在適應性架構中使用。
|
3.2.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true |
當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會嘗試使用本機洗牌讀取器在不需要洗牌分區時讀取洗牌資料,例如在將排序合併聯結轉換為廣播雜湊聯結之後。
|
3.0.0 |
將排序合併聯結轉換成洗牌雜湊聯結
當所有後洗牌分區都小於閾值時,AQE 會將排序合併聯結轉換為洗牌雜湊聯結,最大閾值可以在設定 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
中看到。
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0 |
設定每個分區允許建立本機雜湊映射的最大大小(以位元組為單位)。如果此值不小於 spark.sql.adaptive.advisoryPartitionSizeInBytes ,且所有分區大小都不大於此設定,則聯結選擇會優先使用洗牌雜湊聯結,而不是排序合併聯結,而不論 spark.sql.join.preferSortMergeJoin 的值為何。
|
3.2.0 |
最佳化傾斜聯結
資料傾斜可能會嚴重降低聯結查詢的效能。此功能透過將傾斜的任務分割(並在需要時複製)成大致大小相等的任務,動態處理排序合併聯結中的傾斜。當 spark.sql.adaptive.enabled
和 spark.sql.adaptive.skewJoin.enabled
設定都啟用時,它就會生效。
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.adaptive.skewJoin.enabled |
true |
當為 true 且 spark.sql.adaptive.enabled 為 true 時,Spark 會透過分割(並在需要時複製)傾斜分區來動態處理排序合併聯結中的傾斜。
|
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 |
如果分區大小大於此因子乘以中位數分區大小,且大於 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes ,則視為傾斜分區。
|
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB |
如果分區大小(以位元組為單位)大於此閾值,且大於 spark.sql.adaptive.skewJoin.skewedPartitionFactor 乘以中位數分區大小,則視為傾斜分區。理想情況下,此設定應大於 spark.sql.adaptive.advisoryPartitionSizeInBytes 。
|
3.0.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | 當為 true 時,強制啟用 OptimizeSkewedJoin,這是一種適應性規則,用於最佳化傾斜聯結以避免落後工作,即使會產生額外的洗牌。 | 3.3.0 |
其他
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.adaptive.optimizer.excludedRules |
(無) | 設定在適應性最佳化器中要停用的規則清單,其中規則以規則名稱指定並以逗號分隔。最佳化器會記錄實際上已排除的規則。 | 3.1.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(無) |
要使用於適應性執行的自訂成本評估器類別。如果未設定,Spark 預設會使用自己的 SimpleCostEvaluator 。
|
3.2.0 |