調整 Spark

由於大多數 Spark 計算的內存性質,Spark 程式可能會受到叢集中的任何資源(CPU、網路頻寬或記憶體)的瓶頸。最常見的情況是,如果資料符合記憶體,則瓶頸是網路頻寬,但有時您還需要進行一些調整,例如 將 RDD 儲存在序列化的形式,以減少記憶體使用。本指南將涵蓋兩個主要主題:資料序列化,這對於良好的網路效能至關重要,並且可以減少記憶體使用,以及記憶體調整。我們也會概述幾個較小的主題。

資料序列化

序列化在任何分散式應用程式的效能中扮演重要的角色。序列化物件很慢或使用大量位元組的格式,會大幅降低計算速度。通常,這是您在最佳化 Spark 應用程式時應該調整的第一件事。Spark 旨在取得便利性(讓您可以在運算中使用任何 Java 類型)和效能之間的平衡。它提供兩個序列化函式庫

您可以透過使用 SparkConf 來初始化您的工作,並呼叫 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 來切換使用 Kryo。此設定會設定用於在工作節點之間交換資料,以及將 RDD 序列化至磁碟時所使用的序列化器。Kryo 之所以不是預設值,唯一的原因是需要自訂註冊,但我們建議在任何網路密集型應用程式中嘗試使用它。自 Spark 2.0.0 以來,我們在使用簡單類型、簡單類型陣列或字串類型交換 RDD 時,會在內部使用 Kryo 序列化器。

Spark 會自動包含 Kryo 序列化器,用於 Twitter chill 函式庫中的 AllScalaRegistrar 所涵蓋的許多常用核心 Scala 類別。

若要使用 Kryo 註冊您自己的自訂類別,請使用 registerKryoClasses 方法。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo 文件 說明了更進階的註冊選項,例如新增自訂序列化程式碼。

如果您的物件很大,您可能也需要增加 spark.kryoserializer.buffer 設定。此值必須足夠大,才能容納您要序列化的最大物件。

最後,如果您沒有註冊您的自訂類別,Kryo 仍然會運作,但它必須將完整的類別名稱與每個物件一起儲存,這很浪費。

記憶體調整

在調整記憶體使用量時,有三個考量因素:您的物件所使用的記憶體(您可能希望您的整個資料集都符合記憶體)、存取這些物件的成本,以及垃圾回收的開銷(如果您有大量的物件流動)。

預設情況下,Java 物件存取速度很快,但很容易消耗比其欄位內的「原始」資料多 2-5 倍的空間。這是由於以下幾個原因

本節將從 Spark 中的記憶體管理概述開始,然後討論使用者可以採取的具體策略,以更有效率地使用其應用程式中的記憶體。特別是,我們將說明如何確定您物件的記憶體使用量,以及如何改善它 – 無論是透過變更您的資料結構,或以序列化格式儲存資料。然後,我們將介紹調整 Spark 的快取大小和 Java 垃圾回收機制。

記憶體管理概觀

Spark 中的記憶體使用大致可分為兩類:執行和儲存。執行記憶體是指用於洗牌、聯結、排序和聚合的計算,而儲存記憶體是指用於快取和在叢集間傳播內部資料。在 Spark 中,執行和儲存共用一個統一區域 (M)。當沒有使用執行記憶體時,儲存可以取得所有可用記憶體,反之亦然。執行可能會在必要時驅逐儲存,但僅在總儲存記憶體使用量低於特定閾值 (R) 時才會驅逐。換句話說,R 描述了 M 中的子區域,其中快取區塊永不驅逐。由於實作的複雜性,儲存可能不會驅逐執行。

此設計確保了多項理想的特性。首先,不使用快取的應用程式可以使用整個空間進行執行,避免不必要的磁碟溢位。其次,使用快取的應用程式可以保留一個最小儲存空間 (R),其資料區塊不會被驅逐。最後,此方法為各種工作負載提供了合理的開箱即用效能,而不需要使用者具備記憶體在內部如何劃分的專業知識。

儘管有兩個相關的設定,但一般使用者不需調整它們,因為預設值適用於大多數工作負載

應設定 spark.memory.fraction 的值,以便將此堆積空間舒適地放入 JVM 的舊或「長期」世代。有關詳細資訊,請參閱下方進階 GC 調整的討論。

判斷記憶體使用量

調整資料集所需記憶體消耗量的最佳方式是建立 RDD,將其放入快取,並查看網路 UI 中的「儲存」頁面。此頁面會告訴您 RDD 佔用了多少記憶體。

若要估計特定物件的記憶體消耗量,請使用 SizeEstimatorestimate 方法。這對於使用不同的資料配置來減少記憶體使用量,以及確定廣播變數在每個執行器堆積中佔用的空間量很有用。

調整資料結構

減少記憶體消耗的第一個方法是避免使用會增加額外負擔的 Java 功能,例如基於指標的資料結構和包裝器物件。有幾種方法可以做到這一點

  1. 設計您的資料結構以優先考慮物件陣列和原始型別,而不是標準的 Java 或 Scala 集合類別 (例如 HashMap)。fastutil 函式庫提供與 Java 標準函式庫相容的原始型別集合類別。
  2. 盡可能避免使用包含大量小型物件和指標的巢狀結構。
  3. 考慮使用數字 ID 或列舉物件,而非字串作為金鑰。
  4. 如果您的 RAM 小於 32 GiB,請設定 JVM 旗標 -XX:+UseCompressedOops,讓指標為四位元組,而非八位元組。您可以在 spark-env.sh 中新增這些選項。

序列化 RDD 儲存

即使進行此調整,您的物件仍然過大,無法有效儲存,一個更簡單的減少記憶體使用量的方式是將它們儲存在序列化格式中,使用 RDD 持久化 API 中的序列化儲存層級,例如 MEMORY_ONLY_SER。Spark 會將每個 RDD 分割儲存為一個大型位元組陣列。以序列化格式儲存資料的唯一缺點是存取時間較慢,因為必須動態解除序列化的每個物件。如果您想要以序列化格式快取資料,我們強烈建議 使用 Kryo,因為它會產生比 Java 序列化(當然也比原始 Java 物件)更小的尺寸。

垃圾回收調整

當您的程式儲存的 RDD 產生大量「變動」時,JVM 垃圾回收可能會成為一個問題。(在僅讀取 RDD 一次,然後對其執行多項操作的程式中,通常不會產生問題。)當 Java 需要驅逐舊物件以騰出空間給新物件時,它需要追蹤您所有的 Java 物件並找出未使用的物件。這裡要記住的主要重點是垃圾回收的成本與 Java 物件的數量成正比,因此使用具有較少物件的資料結構(例如 Int 陣列,而非 LinkedList)會大幅降低此成本。一個更好的方法是以序列化格式儲存物件,如上所述:現在每個 RDD 分割只會有一個物件(位元組陣列)。在嘗試其他技術之前,如果 GC 是問題,第一個要嘗試的方法是使用 序列化快取

GC 也可能是個問題,因為您的工作任務記憶體(執行任務所需的空間量)與節點上快取的 RDD 之間會產生干擾。我們將討論如何控制分配給 RDD 快取的空間,以減輕此問題。

衡量 GC 的影響

GC 調整的第一步是收集垃圾回收發生的頻率和 GC 花費時間的統計資料。這可以透過將 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 加入 Java 選項來完成。(請參閱 設定指南,以取得將 Java 選項傳遞給 Spark 任務的資訊。)下次執行 Spark 任務時,你會在每次發生垃圾回收時看到訊息印在工作人員的記錄中。請注意,這些記錄會在叢集的工作人員節點上 (在工作目錄的 stdout 檔案中),而非 在驅動程式上。

進階 GC 調整

若要進一步調整垃圾回收,我們首先需要了解 JVM 中記憶體管理的一些基本資訊

Spark 中 GC 調整的目標是確保只有生命週期長的 RDD 儲存在 Old 世代,且 Young 世代的大小足以儲存短暫存在的物件。這將有助於避免完整 GC 收集在任務執行期間建立的暫時物件。一些可能有用的步驟包括

我們的經驗表明,GC 調整的效果取決於您的應用程式和可用的記憶體量。線上說明了 更多調整選項,但在高層級中,管理完整 GC 進行的頻率有助於減少開銷。

執行器的 GC 調整旗標可以透過在工作的組態中設定 spark.executor.defaultJavaOptionsspark.executor.extraJavaOptions 來指定。

其他考量

平行度

除非您將每個作業的平行度設定得夠高,否則叢集將無法充分利用。Spark 會根據檔案大小自動設定在每個檔案上執行的「對應」工作數(不過,您可以透過 SparkContext.textFile 等的選用參數來控制它),而對於分散式「縮減」作業(例如 groupByKeyreduceByKey),它會使用最大的父 RDD 的分割數。您可以傳遞平行度作為第二個引數(請參閱 spark.PairRDDFunctions 文件),或設定組態屬性 spark.default.parallelism 來變更預設值。一般來說,我們建議在叢集中的每個 CPU 核心使用 2-3 個工作。

輸入路徑上的平行列示

有時,當工作輸入有大量的目錄時,您可能也需要增加目錄清單並行性,否則此程序可能會花費很長的時間,特別是在針對 S3 等物件儲存時。如果您的工作在具有 Hadoop 輸入格式的 RDD 上執行(例如,透過 SparkContext.sequenceFile),則並行性會透過 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads 控制(目前預設為 1)。

對於具有檔案為基礎資料來源的 Spark SQL,您可以調整 spark.sql.sources.parallelPartitionDiscovery.thresholdspark.sql.sources.parallelPartitionDiscovery.parallelism 來改善清單並行性。請參閱 Spark SQL 效能調整指南 以取得更多詳細資料。

縮減任務的記憶體使用

有時,您會收到 OutOfMemoryError,並不是因為您的 RDD 不符合記憶體,而是因為您的其中一項工作,例如 groupByKey 中的其中一項縮減工作,其工作組太大。Spark 的混洗操作(sortByKeygroupByKeyreduceByKeyjoin 等)會在每個工作中建立一個雜湊表來執行分組,這通常會很大。最簡單的解決方法是增加並行性等級,讓每個工作的輸入組更小。Spark 可以有效支援短至 200 毫秒的工作,因為它會在許多工作中重複使用一個執行器 JVM,而且它的工作啟動成本很低,因此您可以安全地將並行性等級增加到群集中核心數量的兩倍以上。

廣播大型變數

使用 SparkContext 中提供的 廣播功能 可以大幅減少每個序列化工作的規模,以及在群集上啟動工作的成本。如果您的工作使用驅動程式內部的任何大型物件(例如靜態查詢表),請考慮將其轉換成廣播變數。Spark 會在主控端列印每個工作的序列化大小,因此您可以查看它來決定您的工作是否太大;一般來說,大於約 20 KiB 的工作可能值得最佳化。

資料區域性

資料區域性會對 Spark 工作的效能造成重大影響。如果資料和對其進行操作的程式碼在一起,則運算往往會很快。但是,如果程式碼和資料分開,則必須從一個移到另一個。通常,從一個地方運送序列化程式碼比運送一塊資料更快,因為程式碼大小遠小於資料。Spark 的排程就是圍繞著資料區域性的這個一般原則建立的。

資料區域性是指資料與處理它的程式碼的接近程度。根據資料的目前位置,有數個區域性等級。從最接近到最遠的順序排列

Spark 會優先在最佳區域層級排程所有工作,但這並不總是可行。在任何閒置執行器上都沒有未處理資料的情況下,Spark 會切換到較低的區域層級。有兩個選項:a) 等待忙碌的 CPU 釋放,以在同一個伺服器上的資料上啟動工作,或 b) 立即在需要將資料移至較遠位置的新工作中啟動工作。

Spark 通常會稍等一會兒,希望忙碌的 CPU 能釋放。一旦逾時,它就會開始將資料從遠處移至空閒的 CPU。每個層級之間的後備等待逾時可以單獨設定,或在一個參數中全部設定;請參閱 設定頁面 上的 spark.locality 參數以取得詳細資訊。如果工作很長且區域性不佳,您應該增加這些設定,但預設值通常運作良好。

摘要

這是一個簡短的指南,說明在調整 Spark 應用程式時您應該知道的重點,最重要的是資料序列化和記憶體調整。對於大多數程式,切換到 Kryo 序列化並以序列化形式保留資料,將可以解決大多數常見的效能問題。歡迎在 Spark 郵件清單 上詢問有關其他最佳調整實務。