調整 Spark
由於大多數 Spark 計算的內存性質,Spark 程式可能會受到叢集中的任何資源(CPU、網路頻寬或記憶體)的瓶頸。最常見的情況是,如果資料符合記憶體,則瓶頸是網路頻寬,但有時您還需要進行一些調整,例如 將 RDD 儲存在序列化的形式,以減少記憶體使用。本指南將涵蓋兩個主要主題:資料序列化,這對於良好的網路效能至關重要,並且可以減少記憶體使用,以及記憶體調整。我們也會概述幾個較小的主題。
資料序列化
序列化在任何分散式應用程式的效能中扮演重要的角色。序列化物件很慢或使用大量位元組的格式,會大幅降低計算速度。通常,這是您在最佳化 Spark 應用程式時應該調整的第一件事。Spark 旨在取得便利性(讓您可以在運算中使用任何 Java 類型)和效能之間的平衡。它提供兩個序列化函式庫
- Java 序列化:預設情況下,Spark 使用 Java 的
ObjectOutputStream
架構序列化物件,並且可以與您建立的任何實作java.io.Serializable
的類別一起使用。您也可以透過延伸java.io.Externalizable
來更嚴密地控制序列化的效能。Java 序列化很靈活,但通常很慢,並且會導致許多類別產生大型序列化格式。 - Kryo 序列化:Spark 也可以使用 Kryo 函式庫(版本 4)更快速地序列化物件。Kryo 比 Java 序列化快很多且更精簡(通常快上 10 倍),但並不支援所有
Serializable
類型,而且需要您預先在程式中註冊您要使用的類別才能獲得最佳效能。
您可以透過使用 SparkConf 來初始化您的工作,並呼叫 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
來切換使用 Kryo。此設定會設定用於在工作節點之間交換資料,以及將 RDD 序列化至磁碟時所使用的序列化器。Kryo 之所以不是預設值,唯一的原因是需要自訂註冊,但我們建議在任何網路密集型應用程式中嘗試使用它。自 Spark 2.0.0 以來,我們在使用簡單類型、簡單類型陣列或字串類型交換 RDD 時,會在內部使用 Kryo 序列化器。
Spark 會自動包含 Kryo 序列化器,用於 Twitter chill 函式庫中的 AllScalaRegistrar 所涵蓋的許多常用核心 Scala 類別。
若要使用 Kryo 註冊您自己的自訂類別,請使用 registerKryoClasses
方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo 文件 說明了更進階的註冊選項,例如新增自訂序列化程式碼。
如果您的物件很大,您可能也需要增加 spark.kryoserializer.buffer
設定。此值必須足夠大,才能容納您要序列化的最大物件。
最後,如果您沒有註冊您的自訂類別,Kryo 仍然會運作,但它必須將完整的類別名稱與每個物件一起儲存,這很浪費。
記憶體調整
在調整記憶體使用量時,有三個考量因素:您的物件所使用的記憶體量(您可能希望您的整個資料集都符合記憶體)、存取這些物件的成本,以及垃圾回收的開銷(如果您有大量的物件流動)。
預設情況下,Java 物件存取速度很快,但很容易消耗比其欄位內的「原始」資料多 2-5 倍的空間。這是由於以下幾個原因
- 每個不同的 Java 物件都有「物件標頭」,大小約為 16 位元組,其中包含指向其類別的指標等資訊。對於一個資料很少的物件(例如一個
Int
欄位),這可能會比資料還大。 - Java
String
在原始字串資料上大約有 40 位元組的開銷(因為它們將其儲存在Char
陣列中,並保留長度等額外資料),並且由於String
內部使用 UTF-16 編碼,因此將每個字元儲存為兩個位元組。因此,一個 10 個字元的字串很容易消耗 60 個位元組。 - 常見的集合類別,例如
HashMap
和LinkedList
,使用連結資料結構,其中每個項目都有「包裝器」物件(例如Map.Entry
)。此物件不僅有一個標頭,還有指向清單中下一個物件的指標(通常每個指標 8 個位元組)。 - 基本類型的集合通常將它們儲存為「已封裝」的物件,例如
java.lang.Integer
。
本節將從 Spark 中的記憶體管理概述開始,然後討論使用者可以採取的具體策略,以更有效率地使用其應用程式中的記憶體。特別是,我們將說明如何確定您物件的記憶體使用量,以及如何改善它 – 無論是透過變更您的資料結構,或以序列化格式儲存資料。然後,我們將介紹調整 Spark 的快取大小和 Java 垃圾回收機制。
記憶體管理概觀
Spark 中的記憶體使用大致可分為兩類:執行和儲存。執行記憶體是指用於洗牌、聯結、排序和聚合的計算,而儲存記憶體是指用於快取和在叢集間傳播內部資料。在 Spark 中,執行和儲存共用一個統一區域 (M)。當沒有使用執行記憶體時,儲存可以取得所有可用記憶體,反之亦然。執行可能會在必要時驅逐儲存,但僅在總儲存記憶體使用量低於特定閾值 (R) 時才會驅逐。換句話說,R
描述了 M
中的子區域,其中快取區塊永不驅逐。由於實作的複雜性,儲存可能不會驅逐執行。
此設計確保了多項理想的特性。首先,不使用快取的應用程式可以使用整個空間進行執行,避免不必要的磁碟溢位。其次,使用快取的應用程式可以保留一個最小儲存空間 (R),其資料區塊不會被驅逐。最後,此方法為各種工作負載提供了合理的開箱即用效能,而不需要使用者具備記憶體在內部如何劃分的專業知識。
儘管有兩個相關的設定,但一般使用者不需調整它們,因為預設值適用於大多數工作負載
spark.memory.fraction
表示M
的大小,為 (JVM 堆積空間 - 300MiB) 的一部分 (預設為 0.6)。其餘空間 (40%) 保留給使用者資料結構、Spark 中的內部元資料,以及在稀疏且異常大的記錄情況下防止 OOM 錯誤。spark.memory.storageFraction
表示R
的大小,為M
的一部分 (預設為 0.5)。R
是M
中的儲存空間,其中快取區塊不會被執行驅逐。
應設定 spark.memory.fraction
的值,以便將此堆積空間舒適地放入 JVM 的舊或「長期」世代。有關詳細資訊,請參閱下方進階 GC 調整的討論。
判斷記憶體使用量
調整資料集所需記憶體消耗量的最佳方式是建立 RDD,將其放入快取,並查看網路 UI 中的「儲存」頁面。此頁面會告訴您 RDD 佔用了多少記憶體。
若要估計特定物件的記憶體消耗量,請使用 SizeEstimator
的 estimate
方法。這對於使用不同的資料配置來減少記憶體使用量,以及確定廣播變數在每個執行器堆積中佔用的空間量很有用。
調整資料結構
減少記憶體消耗的第一個方法是避免使用會增加額外負擔的 Java 功能,例如基於指標的資料結構和包裝器物件。有幾種方法可以做到這一點
- 設計您的資料結構以優先考慮物件陣列和原始型別,而不是標準的 Java 或 Scala 集合類別 (例如
HashMap
)。fastutil 函式庫提供與 Java 標準函式庫相容的原始型別集合類別。 - 盡可能避免使用包含大量小型物件和指標的巢狀結構。
- 考慮使用數字 ID 或列舉物件,而非字串作為金鑰。
- 如果您的 RAM 小於 32 GiB,請設定 JVM 旗標
-XX:+UseCompressedOops
,讓指標為四位元組,而非八位元組。您可以在spark-env.sh
中新增這些選項。
序列化 RDD 儲存
即使進行此調整,您的物件仍然過大,無法有效儲存,一個更簡單的減少記憶體使用量的方式是將它們儲存在序列化格式中,使用 RDD 持久化 API 中的序列化儲存層級,例如 MEMORY_ONLY_SER
。Spark 會將每個 RDD 分割儲存為一個大型位元組陣列。以序列化格式儲存資料的唯一缺點是存取時間較慢,因為必須動態解除序列化的每個物件。如果您想要以序列化格式快取資料,我們強烈建議 使用 Kryo,因為它會產生比 Java 序列化(當然也比原始 Java 物件)更小的尺寸。
垃圾回收調整
當您的程式儲存的 RDD 產生大量「變動」時,JVM 垃圾回收可能會成為一個問題。(在僅讀取 RDD 一次,然後對其執行多項操作的程式中,通常不會產生問題。)當 Java 需要驅逐舊物件以騰出空間給新物件時,它需要追蹤您所有的 Java 物件並找出未使用的物件。這裡要記住的主要重點是垃圾回收的成本與 Java 物件的數量成正比,因此使用具有較少物件的資料結構(例如 Int
陣列,而非 LinkedList
)會大幅降低此成本。一個更好的方法是以序列化格式儲存物件,如上所述:現在每個 RDD 分割只會有一個物件(位元組陣列)。在嘗試其他技術之前,如果 GC 是問題,第一個要嘗試的方法是使用 序列化快取。
GC 也可能是個問題,因為您的工作任務記憶體(執行任務所需的空間量)與節點上快取的 RDD 之間會產生干擾。我們將討論如何控制分配給 RDD 快取的空間,以減輕此問題。
衡量 GC 的影響
GC 調整的第一步是收集垃圾回收發生的頻率和 GC 花費時間的統計資料。這可以透過將 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
加入 Java 選項來完成。(請參閱 設定指南,以取得將 Java 選項傳遞給 Spark 任務的資訊。)下次執行 Spark 任務時,你會在每次發生垃圾回收時看到訊息印在工作人員的記錄中。請注意,這些記錄會在叢集的工作人員節點上 (在工作目錄的 stdout
檔案中),而非 在驅動程式上。
進階 GC 調整
若要進一步調整垃圾回收,我們首先需要了解 JVM 中記憶體管理的一些基本資訊
-
Java 堆積空間分為兩個區域:Young 和 Old。Young 世代用於存放短暫存在的物件,而 Old 世代則用於存放生命週期較長的物件。
-
Young 世代進一步分為三個區域 [Eden、Survivor1、Survivor2]。
-
垃圾回收程序的簡化說明:當 Eden 已滿時,會在 Eden 上執行一次小型 GC,且 Eden 和 Survivor1 中仍存在的物件會複製到 Survivor2。Survivor 區域會互換。如果物件夠舊或 Survivor2 已滿,它會移到 Old。最後,當 Old 接近已滿時,會呼叫一次完整 GC。
Spark 中 GC 調整的目標是確保只有生命週期長的 RDD 儲存在 Old 世代,且 Young 世代的大小足以儲存短暫存在的物件。這將有助於避免完整 GC 收集在任務執行期間建立的暫時物件。一些可能有用的步驟包括
-
透過收集 GC 統計資料,檢查是否有太多垃圾回收。如果在任務完成前呼叫了多次完整 GC,表示沒有足夠的記憶體可執行任務。
-
如果小型收集太多,但大型 GC 不多,則為 Eden 分配更多記憶體會有幫助。你可以將 Eden 的大小設定為每個任務所需記憶體的過高估計值。如果 Eden 的大小確定為
E
,則可以使用選項-Xmn=4/3*E
設定 Young 世代的大小。(放大 4/3 倍是為了考量倖存者區域使用的空間。) -
在列印的 GC 統計資料中,如果 OldGen 接近滿載,請降低
spark.memory.fraction
來減少用於快取的記憶體量;快取較少的物件比減慢工作執行速度來得更好。或者,考慮縮小 Young 世代的大小。這表示降低-Xmn
(如果您已如上設定)。如果不是,請嘗試變更 JVM 的NewRatio
參數值。許多 JVM 預設為 2,表示 Old 世代佔用堆積的 2/3。它應足夠大,使此分數超過spark.memory.fraction
。 -
使用
-XX:+UseG1GC
嘗試 G1GC 垃圾收集器。在某些垃圾收集為瓶頸的情況下,它可以改善效能。請注意,對於大型執行器堆積大小,可能需要使用-XX:G1HeapRegionSize
來增加 G1 區域大小。 -
例如,如果您的工作從 HDFS 讀取資料,則可以使用從 HDFS 讀取的資料區塊大小來估計工作使用的記憶體量。請注意,解壓縮區塊的大小通常是區塊大小的 2 或 3 倍。因此,如果我們希望有 3 或 4 個工作的作業空間,而 HDFS 區塊大小為 128 MiB,我們可以估計 Eden 的大小為
4*3*128MiB
。 -
監控垃圾收集的頻率和時間如何隨新設定而變更。
我們的經驗表明,GC 調整的效果取決於您的應用程式和可用的記憶體量。線上說明了 更多調整選項,但在高層級中,管理完整 GC 進行的頻率有助於減少開銷。
執行器的 GC 調整旗標可以透過在工作的組態中設定 spark.executor.defaultJavaOptions
或 spark.executor.extraJavaOptions
來指定。
其他考量
平行度
除非您將每個作業的平行度設定得夠高,否則叢集將無法充分利用。Spark 會根據檔案大小自動設定在每個檔案上執行的「對應」工作數(不過,您可以透過 SparkContext.textFile
等的選用參數來控制它),而對於分散式「縮減」作業(例如 groupByKey
和 reduceByKey
),它會使用最大的父 RDD 的分割數。您可以傳遞平行度作為第二個引數(請參閱 spark.PairRDDFunctions
文件),或設定組態屬性 spark.default.parallelism
來變更預設值。一般來說,我們建議在叢集中的每個 CPU 核心使用 2-3 個工作。
輸入路徑上的平行列示
有時,當工作輸入有大量的目錄時,您可能也需要增加目錄清單並行性,否則此程序可能會花費很長的時間,特別是在針對 S3 等物件儲存時。如果您的工作在具有 Hadoop 輸入格式的 RDD 上執行(例如,透過 SparkContext.sequenceFile
),則並行性會透過 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads
控制(目前預設為 1)。
對於具有檔案為基礎資料來源的 Spark SQL,您可以調整 spark.sql.sources.parallelPartitionDiscovery.threshold
和 spark.sql.sources.parallelPartitionDiscovery.parallelism
來改善清單並行性。請參閱 Spark SQL 效能調整指南 以取得更多詳細資料。
縮減任務的記憶體使用
有時,您會收到 OutOfMemoryError,並不是因為您的 RDD 不符合記憶體,而是因為您的其中一項工作,例如 groupByKey
中的其中一項縮減工作,其工作組太大。Spark 的混洗操作(sortByKey
、groupByKey
、reduceByKey
、join
等)會在每個工作中建立一個雜湊表來執行分組,這通常會很大。最簡單的解決方法是增加並行性等級,讓每個工作的輸入組更小。Spark 可以有效支援短至 200 毫秒的工作,因為它會在許多工作中重複使用一個執行器 JVM,而且它的工作啟動成本很低,因此您可以安全地將並行性等級增加到群集中核心數量的兩倍以上。
廣播大型變數
使用 SparkContext
中提供的 廣播功能 可以大幅減少每個序列化工作的規模,以及在群集上啟動工作的成本。如果您的工作使用驅動程式內部的任何大型物件(例如靜態查詢表),請考慮將其轉換成廣播變數。Spark 會在主控端列印每個工作的序列化大小,因此您可以查看它來決定您的工作是否太大;一般來說,大於約 20 KiB 的工作可能值得最佳化。
資料區域性
資料區域性會對 Spark 工作的效能造成重大影響。如果資料和對其進行操作的程式碼在一起,則運算往往會很快。但是,如果程式碼和資料分開,則必須從一個移到另一個。通常,從一個地方運送序列化程式碼比運送一塊資料更快,因為程式碼大小遠小於資料。Spark 的排程就是圍繞著資料區域性的這個一般原則建立的。
資料區域性是指資料與處理它的程式碼的接近程度。根據資料的目前位置,有數個區域性等級。從最接近到最遠的順序排列
PROCESS_LOCAL
資料與執行中的程式碼在同一個 JVM 中。這是最佳的區域性。NODE_LOCAL
資料在同一個節點上。範例可能是同一個節點上的 HDFS,或同一個節點上的另一個執行器。這比PROCESS_LOCAL
慢一點,因為資料必須在程序之間傳輸。NO_PREF
資料從任何地方存取的速度都一樣快,而且沒有區域偏好。RACK_LOCAL
資料在同一個伺服器機架上。資料在同一個機架上的不同伺服器上,因此需要透過網路傳送,通常透過單一交換器。ANY
資料在網路上的其他地方,不在同一個機架上。
Spark 會優先在最佳區域層級排程所有工作,但這並不總是可行。在任何閒置執行器上都沒有未處理資料的情況下,Spark 會切換到較低的區域層級。有兩個選項:a) 等待忙碌的 CPU 釋放,以在同一個伺服器上的資料上啟動工作,或 b) 立即在需要將資料移至較遠位置的新工作中啟動工作。
Spark 通常會稍等一會兒,希望忙碌的 CPU 能釋放。一旦逾時,它就會開始將資料從遠處移至空閒的 CPU。每個層級之間的後備等待逾時可以單獨設定,或在一個參數中全部設定;請參閱 設定頁面 上的 spark.locality
參數以取得詳細資訊。如果工作很長且區域性不佳,您應該增加這些設定,但預設值通常運作良好。
摘要
這是一個簡短的指南,說明在調整 Spark 應用程式時您應該知道的重點,最重要的是資料序列化和記憶體調整。對於大多數程式,切換到 Kryo 序列化並以序列化形式保留資料,將可以解決大多數常見的效能問題。歡迎在 Spark 郵件清單 上詢問有關其他最佳調整實務。