工作排程

概觀

Spark 有多種資源排程設施,用於在運算之間進行排程。首先,請回想一下,如 叢集模式概觀 中所述,每個 Spark 應用程式(SparkContext 的執行個體)執行一組獨立的執行器程序。Spark 執行的叢集管理員提供 跨應用程式排程 的設施。其次,在每個 Spark 應用程式內部,如果由不同的執行緒提交,多個「工作」(Spark 動作)可能會並行執行。如果您的應用程式透過網路提供服務,這很常見。Spark 包含一個 公平排程器,用於在每個 SparkContext 中排程資源。

跨應用程式排程

在叢集上執行時,每個 Spark 應用程式都會取得一組獨立的執行器 JVM,這些 JVM 僅執行工作並儲存該應用程式的資料。如果多個使用者需要共用您的叢集,則有不同的選項可管理配置,具體取決於叢集管理員。

所有叢集管理員都可以使用最簡單的選項,即資源的靜態分割。使用此方法,每個應用程式都會獲得可使用的最大資源量,並在整個期間保留這些資源。這是 Spark 的 獨立YARN 模式中使用的途徑,以及 粗粒度 Mesos 模式。可以根據叢集類型配置資源配置,如下所示

Mesos 上提供的第二個選項為 CPU 核心動態共用。在此模式中,每個 Spark 應用程式仍有固定且獨立的記憶體配置(由 spark.executor.memory 設定),但當應用程式未在機器上執行工作時,其他應用程式可以在這些核心上執行工作。當您預期大量的不太活躍的應用程式,例如來自不同使用者的 shell 會話時,此模式非常有用。但是,它會帶來延遲較難預測的風險,因為應用程式可能需要一段時間才能在一個節點上重新獲得核心以執行工作。若要使用此模式,只需使用 mesos:// URL,並將 spark.mesos.coarse 設定為 false 即可。

請注意,目前沒有任何模式提供跨應用程式的記憶體共用。如果您想以這種方式共用資料,我們建議執行單一伺服器應用程式,該應用程式可以透過查詢相同的 RDD 來處理多個請求。

動態資源配置

Spark 提供了一種機制,可根據工作負載動態調整應用程式佔用的資源。這表示您的應用程式可以在不再使用資源時將資源交還給叢集,並在有需求時再次請求資源。如果多個應用程式在您的 Spark 叢集中共用資源,此功能特別有用。

此功能預設為停用,且在所有粗粒度叢集管理員上可用,即 獨立模式YARN 模式Mesos 粗粒度模式K8s 模式

注意事項

組態和設定

有數種方法可以使用此功能。不論您選擇哪一種方法,您的應用程式都必須先將 spark.dynamicAllocation.enabled 設為 true,此外,

外部洗牌服務或洗牌追蹤或 ShuffleDriverComponents 支援可靠儲存的目的是允許移除執行器,而不刪除它們所寫入的洗牌檔案(更多詳細資訊說明 如下)。雖然啟用洗牌追蹤很簡單,但設定外部洗牌服務的方式會因叢集管理員而異

在獨立模式中,只需將您的工作節點設為 spark.shuffle.service.enabledtrue 即可啟動。

在 Mesos 粗粒度模式中,在所有工作節點上執行 $SPARK_HOME/sbin/start-mesos-shuffle-service.sh,並將 spark.shuffle.service.enabled 設為 true。例如,您可以透過 Marathon 執行。

在 YARN 模式中,請遵循 此處 的說明。

所有其他相關設定都是選用的,且在 spark.dynamicAllocation.*spark.shuffle.service.* 名稱空間中。更多詳細資訊,請參閱 設定頁面

資源配置政策

在高層級中,Spark 應該在不再使用執行器時放棄執行器,並在需要時取得執行器。由於沒有明確的方法可以預測即將移除的執行器是否會在不久的將來執行任務,或即將新增的新執行器是否會實際閒置,因此我們需要一套啟發法來決定何時移除和要求執行器。

要求政策

已啟用動態分配的 Spark 應用程式會在有待排程的任務等待時,要求額外的執行器。此條件必然表示現有的執行器組不足以同時滿足所有已提交但尚未完成的任務。

Spark 依回合請求執行器。實際請求會在有待處理任務持續 spark.dynamicAllocation.schedulerBacklogTimeout 秒後觸發,然後在待處理任務佇列持續存在的情況下,每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 秒再次觸發。此外,每回合請求的執行器數量會從前一回合呈指數增加。例如,應用程式會在第一回合新增 1 個執行器,然後在後續回合新增 2、4、8 個執行器,以此類推。

指數增加政策的動機有兩個。首先,應用程式應該在開始時謹慎請求執行器,以防僅需要少數幾個額外的執行器就足夠。這呼應了 TCP 慢速啟動的理由。其次,如果實際上需要許多執行器,應用程式應該能夠適時地增加其資源使用量。

移除政策

移除執行器的政策簡單許多。Spark 應用程式會在執行器閒置超過 spark.dynamicAllocation.executorIdleTimeout 秒時移除執行器。請注意,在大多數情況下,此條件與請求條件互斥,因為如果仍有待排程的任務,執行器不應處於閒置狀態。

執行緒的優雅除役

在動態配置之前,如果 Spark 執行器在關聯應用程式也已結束時結束,則與執行器關聯的所有狀態都不再需要,可以安全地捨棄。然而,在動態配置中,當執行器被明確移除時,應用程式仍在執行。如果應用程式嘗試存取執行器中儲存或寫入的狀態,它必須重新計算狀態。因此,Spark 需要一種機制,在移除執行器之前透過保留其狀態來優雅地停用執行器。

此需求對於洗牌尤其重要。在洗牌期間,Spark 執行器會先將其自己的映射輸出寫入本機磁碟,然後在其他執行器嘗試擷取這些檔案時充當這些檔案的伺服器。在落後者(執行時間遠遠超過其同儕的任務)的情況下,動態配置可能會在洗牌完成之前移除執行器,這種情況下,由該執行器寫入的洗牌檔案必須不必要地重新計算。

保留洗牌檔案的解決方案是使用外部洗牌服務,這也是 Spark 1.2 中引入的。此服務是指在叢集的每個節點上獨立於 Spark 應用程式及其執行器執行的長期執行程序。如果啟用此服務,Spark 執行器會從服務中擷取洗牌檔案,而不是彼此擷取。這表示由執行器寫入的任何洗牌狀態都可能在執行器的生命週期結束後繼續提供服務。

除了寫入混排檔案,執行器也會將資料快取在磁碟或記憶體中。然而,當執行器被移除時,所有快取資料都將無法存取。為了減輕這個問題,預設情況下包含快取資料的執行器永遠不會被移除。你可以使用 spark.dynamicAllocation.cachedExecutorIdleTimeout 來設定這個行為。當將 spark.shuffle.service.fetch.rdd.enabled 設定為 true 時,Spark 可以使用 ExternalShuffleService 來擷取磁碟上持續存在的 RDD 區塊。在動態配置的情況下,如果啟用這個功能,只有磁碟持續存在區塊的執行器會在 spark.dynamicAllocation.executorIdleTimeout 之後被視為閒置,並會適時釋放。在未來的版本中,快取資料可能會透過類似於混排檔案透過外部混排服務保留的方式,保留在堆外儲存空間中。

應用程式內的排程

在給定的 Spark 應用程式(SparkContext 實例)中,如果從不同的執行緒提交多個並行工作,這些工作可以同時執行。在這個部分中,我們將「工作」定義為 Spark 動作(例如 savecollect)以及需要執行才能評估該動作的任何工作。Spark 的排程器完全執行緒安全,並支援這個使用案例,以啟用服務多個請求(例如多個使用者的查詢)的應用程式。

預設情況下,Spark 的排程器以 FIFO 方式執行工作。每個工作被分為「階段」(例如對應和簡約階段),第一個工作會優先使用所有可用的資源,直到其階段有工作要啟動,然後第二個工作會優先,依此類推。如果佇列開頭的工作不需要使用整個叢集,後面的工作可以立即開始執行,但如果佇列開頭的工作很大,後面的工作可能會延遲很長一段時間。

從 Spark 0.8 開始,也可以設定工作之間的公平共享。在公平共享下,Spark 會以「循環」的方式在工作之間分配工作,讓所有工作都能獲得大約相等的叢集資源。這表示在長工作執行時提交的短工作可以立即開始接收資源,並仍然獲得良好的回應時間,而不用等到長工作完成。此模式最適合多使用者設定。

此功能預設為停用,並在所有粗粒度叢集管理員上可用,即 獨立模式YARN 模式K8s 模式Mesos 粗粒度模式。若要啟用公平排程器,只需在設定 SparkContext 時將 spark.scheduler.mode 屬性設定為 FAIR 即可

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平排程器池

公平排程器也支援將工作群組成,並為每個池設定不同的排程選項(例如權重)。這可協助建立「高優先順序」池,以供更重要的工作使用,例如,或將每個使用者的工作群組在一起,並提供使用者相等的份額,而不論他們有多少並行的工作,而不是提供工作相等的份額。此方法是根據 Hadoop 公平排程器 建模。

在沒有任何介入的情況下,新提交的工作會進入預設池,但工作池可透過將 spark.scheduler.pool「本機屬性」新增至提交工作的執行緒中的 SparkContext 來設定。執行方式如下

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

設定此本機屬性後,在此執行緒中提交的所有工作(透過此執行緒呼叫 RDD.savecountcollect 等)將使用此池名稱。此設定為每個執行緒,以簡化執行緒代表同一個使用者執行多項工作。如果您想要清除與執行緒關聯的池,只需呼叫

sc.setLocalProperty("spark.scheduler.pool", null)

池的預設行為

預設情況下,每個池會取得叢集的相等份額(與預設池中的每個工作相等的份額),但在每個池中,工作會以先進先出順序執行。例如,如果您為每個使用者建立一個池,這表示每個使用者將取得叢集的相等份額,且每個使用者的查詢將依序執行,而不是讓後面的查詢取得該使用者的前面查詢的資源。

設定池屬性

也可透過組態檔修改特定池的屬性。每個池支援三個屬性

可以透過建立一個 XML 檔案來設定池屬性,類似於 conf/fairscheduler.xml.template,然後將名為 fairscheduler.xml 的檔案放在類別路徑中,或是在 SparkConf 中設定 spark.scheduler.allocation.file 屬性。檔案路徑會遵循 Hadoop 設定,可以是本機檔案路徑或 HDFS 檔案路徑。

// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")

XML 檔案的格式只是一個 <pool> 元素(代表每個池),其中包含不同元素,用於各種設定。例如

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

完整的範例也提供在 conf/fairscheduler.xml.template 中。請注意,XML 檔案中未設定的任何池,其所有設定都會取得預設值(排程模式為 FIFO、權重為 1,以及 minShare 為 0)。

使用 JDBC 連線進行排程

若要為 JDBC 用戶端工作階段設定 公平排程器 池,使用者可以設定 spark.sql.thriftserver.scheduler.pool 變數

SET spark.sql.thriftserver.scheduler.pool=accounting;

PySpark 中的並行工作

PySpark 預設不支援將 PVM 執行緒與 JVM 執行緒同步,而且在多個 PVM 執行緒中啟動多個工作並不能保證在每個對應的 JVM 執行緒中啟動每個工作。由於這個限制,無法在個別的 PVM 執行緒中透過 sc.setJobGroup 設定不同的工作群組,這也禁止稍後透過 sc.cancelJobGroup 取消工作。

建議將 pyspark.InheritableThread 與 PVM 執行緒一起使用,以繼承 JVM 執行緒中的可繼承屬性,例如本機屬性。