與雲端基礎架構整合

簡介

所有主要雲端供應商都在物件儲存中提供持續性資料儲存。這些不是傳統的「POSIX」檔案系統。為了在沒有任何單點故障的情況下儲存數百 PB 的資料,物件儲存會以更簡單的 物件名稱 => 資料 模型取代傳統的檔案系統目錄樹。為了啟用遠端存取,物件上的操作通常以(較慢的)HTTP REST 操作提供。

Spark 可以透過 Hadoop 中實作的檔案系統連接器或由基礎設施供應商本身提供的檔案系統連接器,在物件儲存中讀取和寫入資料。這些連接器會讓物件儲存看起來幾乎像檔案系統,具有目錄和檔案,以及對它們的傳統操作,例如列出、刪除和重新命名。

重要:雲端物件儲存空間並非真正的檔案系統

雖然這些儲存看起來像是檔案系統,但它們在底層仍然是物件儲存,而且差異很大

它們不能用於直接取代群集檔案系統(例如 HDFS),除非明確說明

主要差異如下

這對 Spark 有什麼影響?

  1. 讀取和寫入資料可能會比使用一般檔案系統慢很多。
  2. 某些目錄結構在查詢分割計算期間掃描起來可能非常沒有效率。
  3. Spark 在儲存 RDD、資料框或資料集時通常會透過基於重新命名的演算法來提交工作,這種演算法可能又慢又不穩定。

由於這些原因,將物件儲存用作查詢的直接目的地或查詢鏈中的中間儲存並不總是安全的。請參閱物件儲存及其連接器的文件,以確定哪些用途被視為安全的。

一致性

截至 2021 年,Amazon (S3)、Google Cloud (GCS) 和 Microsoft (Azure Storage、ADLS Gen1、ADLS Gen2) 的物件儲存都是一致的

這表示檔案寫入或更新後,其他程序可以立即列出、檢視和開啟該檔案,而且會擷取最新版本。這是 AWS S3 的已知問題,特別是在物件建立之前對 HEAD 要求進行 404 快取時。

即使如此:沒有任何儲存連接器提供任何擔保,說明其客戶端如何應對在串流讀取物件時被覆寫的物件。不要假設舊檔案可以安全讀取,也不要假設變更可見的時間範圍有限 - 或者,如果正在讀取的檔案被覆寫,客戶端不會單純失敗。

因此:避免覆寫已知或可能由其他客戶端主動讀取的檔案。

其他物件儲存是不一致

這包括 OpenStack Swift

此類儲存不總是安全可用於工作的目的地 - 請查閱每個儲存的特定文件。

安裝

在類別路徑和 Spark 中設定有效憑證後,可以使用物件的 URL 作為資料路徑來讀取或寫入物件。例如 sparkContext.textFile("s3a://landsat-pds/scene_list.gz") 將使用 s3a 連接器建立 S3 中儲存的檔案 scene_list.gz 的 RDD。

若要將相關函式庫新增到應用程式的類別路徑,請包含 hadoop-cloud 模組及其相依性。

在 Maven 中,將下列內容新增到 pom.xml 檔案,假設 spark.version 設定為所選的 Spark 版本

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.12</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

基於 Apache Spark 的商業產品通常會直接設定類別路徑以與雲端基礎架構對話,這種情況下可能不需要此模組。

驗證

Spark 工作必須使用物件儲存進行驗證,才能存取其中的資料。

  1. 當 Spark 在雲端基礎架構中執行時,通常會自動設定憑證。
  2. spark-submit 讀取 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN 環境變數,並設定 s3ns3a 連接器對應的驗證選項,以連線 Amazon S3。
  3. 在 Hadoop 群集中,可以在 core-site.xml 檔案中設定設定。
  4. 驗證詳細資料可以手動新增到 spark-defaults.conf 中的 Spark 設定。
  5. 或者,可以在用於設定應用程式 SparkContextSparkConf 執行個體中以程式方式設定這些詳細資料。

重要:切勿將驗證機密資訊提交至原始碼存放庫,尤其是公開的存放庫

參閱 Hadoop 文件 以取得相關組態和安全性選項。

組態

每個雲端連接器都有自己的一組組態參數,請再次參閱相關文件。

對於一致性模型表示基於重新命名的提交是安全的物件儲存體,請使用 FileOutputCommitter v2 演算法以提升效能;v1 則用於安全性。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

這會在工作結束時減少比「版本 1」演算法更少的重新命名。由於它仍使用 rename() 來提交檔案,因此當物件儲存體沒有相符的資料/清單時,使用它是不安全的。

提交器也可以設定為在清理暫時檔案時忽略失敗;這會降低暫時網路問題升級為工作失敗的風險

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

原始 v1 提交演算法會將成功工作的輸出重新命名為工作嘗試目錄,然後在工作提交階段將該目錄中的所有檔案重新命名為最終目的地

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

Amazon S3 上模擬重新命名的速度很慢,這使得此演算法非常非常慢。建議的解決方案是切換到 S3「零重新命名」提交器(請見下方)。

供您參考,以下是重新命名目錄時不同儲存體和連接器的效能和安全性特徵

儲存體 連接器 目錄重新命名安全性 重新命名效能
Amazon S3 s3a 不安全 O(資料)
Azure Storage wasb 安全 O(檔案)
Azure Datalake Gen 2 abfs 安全 O(1)
Google Cloud Storage gs 混合 O(檔案)
  1. 由於儲存暫時檔案會產生費用;請定期刪除名為 "_temporary" 的目錄。
  2. 對於 AWS S3,請設定多部分上傳可以保留未完成狀態的時間限制。這可避免因未完成的上傳而產生費用。
  3. 對於 Google Cloud,目錄重新命名是逐檔案進行。請考慮使用 v2 提交器,並且僅撰寫會產生冪等輸出的程式碼,包括檔名,因為它不再比 v1 提交器不安全,而且速度更快。

Parquet I/O 設定

在使用 Parquet 資料時,請使用下列設定以獲得最佳效能

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

這些設定會將在查詢期間讀取的資料量減到最少。

ORC I/O 設定

在使用 ORC 資料時,請使用下列設定以獲得最佳效能

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

同樣地,這些設定會將在查詢期間讀取的資料量減到最少。

Spark 串流處理和物件儲存

Spark Streaming 可以透過建立 FileInputDStream 來監控物件儲存體中新增的檔案,並透過呼叫 StreamingContext.textFileStream() 來監控儲存體中的路徑。

  1. 掃描新檔案的時間與路徑下的檔案數量成正比,而非檔案的數量,因此可能會變成一個緩慢的作業。視窗的大小需要設定為處理這個問題。

  2. 檔案只有在完全寫入後才會出現在物件儲存體中;不需要寫入然後重新命名的工作流程來確保檔案不會在寫入時被選取。應用程式可以直接寫入到受監控的目錄。

  3. 如果預設檢查點檔案管理員稱為 FileContextBasedCheckpointFileManager,串流應該只檢查點到實作快速且原子 rename() 作業的儲存體。否則,檢查點可能會很慢,而且潛在不可靠。在 AWS S3 上使用 Hadoop 3.3.1 或更新版本,並使用 S3A 連接器,可以將可中止串流為基礎的檢查點檔案管理員(透過將 spark.sql.streaming.checkpointFileManagerClass 設定為 org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager)用於消除緩慢的重新命名。在這種情況下,使用者必須特別小心,避免在平行執行的多個查詢之間重複使用檢查點位置,因為這可能會導致檢查點資料損毀。

安全快速地將工作提交到雲端儲存。

如前所述,在任何表現出最終一致性的物件儲存體(例如:S3)上,透過重新命名提交是很危險的,而且通常比傳統檔案系統重新命名更慢。

有些物件儲存體連接器提供自訂提交器,可以在不使用重新命名的情況下提交工作和作業。

Hadoop S3A 提交器

在使用 Hadoop 3.1 或更新版本建置的 Spark 版本中,hadoop-aws JAR 包含可安全用於透過 s3a 連接器存取的 S3 儲存體的提交器。

這些提交器不會將資料寫入儲存體上的暫時目錄以進行重新命名,而是將檔案寫入最終目的地,但不會發出最後的 POST 指令,以使大型「多部分」上傳可見。這些作業會延後到作業提交本身。因此,工作和作業提交快很多,而且工作失敗不會影響結果。

若要切換到 S3A 提交器,請使用使用 Hadoop 3.1 或更新版本建置的 Spark 版本,並透過下列選項切換提交器。

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

已使用 Spark 支援的最常見格式測試過。

mydataframe.write.format("parquet").save("s3a://bucket/destination")

可以在 最新的 Hadoop 文件 中找到更多關於這些提交器的詳細資訊,其中包含 使用 S3A 提交器提交工作到 S3 的 S3A 提交器詳細資料。

注意:根據所使用的提交器,在 Hadoop 版本 3.3.1 之前,進行中的統計資料可能會被低報。

Amazon EMR:針對 S3 最佳化的 EMRFS 提交器

Amazon EMR 有其自己的 S3 感知提交器,適用於 Parquet 資料。有關使用說明,請參閱 EMRFS S3 最佳化提交器

有關實作和效能詳細資訊,請參閱 [“使用 EMRFS S3 最佳化提交器改善 Apache Parquet 格式的 Apache Spark 寫入效能”](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

Azure 和 Google 雲端儲存:MapReduce 中間清單提交器。

2022 年 9 月之後發布的 hadoop-mapreduce-core JAR 版本(3.3.5 及更新版本)包含一個針對 Azure ADLS 第 2 代和 Google Cloud Storage 的效能和復原力最佳化的提交器。

此提交器(「明細提交器」)使用明細檔案,將目錄清單資訊從工作提交器傳播到工作提交器。這些明細可以原子化地寫入,而不需要依賴原子目錄重新命名,這是 GCS 所缺乏的。

工作提交器會讀取這些明細,並將檔案從工作輸出目錄直接重新命名到目標目錄,同時進行並行處理,並可選擇限制速率以避免節流 IO。這可在物件儲存體上提供效能和可擴充性。

對於工作正確性來說,使用 Azure 儲存體並非必要;傳統的 FileOutputCommitter 在那裡是安全的 - 不過,這個新的提交器對於具有深度和廣度目錄樹的大型工作來說,可擴充性更好。

由於 Google GCS 不支援原子目錄重新命名,因此應在可用的情況下使用明細提交器。

此提交器支援「動態分割覆寫」(請參閱下方)。

有關此提交器的可用性和使用詳細資訊,請參閱所使用的 Hadoop 版本的 Hadoop 文件。

它在 Hadoop 3.3.4 或更早版本中不可用。

IBM 雲端物件儲存:Stocator

IBM 提供 Stocator 輸出提交器,適用於 IBM Cloud Object Storage 和 OpenStack Swift。

原始碼、文件和版本可以在 https://github.com/CODAIT/stocator 找到。

雲端提交器和 INSERT OVERWRITE TABLE

Spark 有一個稱為「動態分割覆寫」的功能;可以更新表格,並且只會將新增資料的分割內容取代。

這用於 INSERT OVERWRITE TABLE 形式的 SQL 陳述,以及在以「覆寫」模式寫入資料集時

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

此功能使用檔案重新命名,並且對提交器和檔案系統都有特定要求

  1. 提交器的工作目錄必須在目標檔案系統中。
  2. 目標檔案系統必須有效地支援檔案重新命名。

S3A 提交者和 AWS S3 儲存未符合這些條件。

其他雲端儲存的提交者可能會支援此功能,並宣告與 Spark 相容。如果在透過 Hadoop 提交者寫入資料時需要動態分割覆寫,Spark 會在使用原始 FileOutputCommitter 時永遠允許此功能。對於其他提交者,Spark 會在實例化後探查其相容性宣告,並在宣告相容時允許此操作。

如果提交者不相容,操作會失敗,並顯示錯誤訊息 PathOutputCommitter 不支援 dynamicPartitionOverwrite

除非目標檔案系統有相容的提交者,否則唯一的解決方案是使用雲端友善格式來儲存資料。

延伸閱讀

以下是來自 Apache 和雲端供應商的標準連接器的文件。

雲端提交者問題和與 Hive 相容的解決方案