RDD 程式設計指南
概觀
在高層級上,每個 Spark 應用程式都包含一個執行使用者 main
函數並在叢集上執行各種 平行運算 的 驅動程式。Spark 提供的主要抽象化是 彈性分散式資料集 (RDD),它是一個在叢集節點上分區的元素集合,可以在平行運算中操作。RDD 是透過從 Hadoop 檔案系統(或任何其他 Hadoop 支援的檔案系統)中的檔案,或驅動程式中現有的 Scala 集合開始,並對其進行轉換而建立的。使用者也可以要求 Spark 在記憶體中 保留 RDD,讓它可以在平行運算中有效地重複使用。最後,RDD 會自動從節點故障中復原。
Spark 中的第二個抽象化是 共用變數,可以在平行運算中使用。預設情況下,當 Spark 在不同節點上以一組工作執行平行函數時,它會將函數中使用的每個變數的副本傳送給每個工作。有時,需要在工作之間,或在工作與驅動程式之間共用變數。Spark 支援兩種共用變數:廣播變數,可用於將值快取在所有節點的記憶體中,以及 累加器,這是一種只能「新增」的變數,例如計數器和總和。
本指南在 Spark 支援的每種語言中顯示這些功能。如果您啟動 Spark 的互動式外殼程式,最容易遵循,Scala 外殼程式為 bin/spark-shell
,Python 外殼程式為 bin/pyspark
。
與 Spark 連結
Spark 3.5.1 可搭配 Python 3.8+ 使用。它可以使用標準的 CPython 解譯器,因此可以搭配 NumPy 等 C 函式庫使用。它也可以搭配 PyPy 7.3.6+ 使用。
Python 中的 Spark 應用程式可以搭配 bin/spark-submit
執行,它包含執行時期的 Spark,或將其包含在您的 setup.py 中,如下所示
install_requires=[
'pyspark=={site.SPARK_VERSION}'
]
若要在不安裝 PySpark 的情況下執行 Python 中的 Spark 應用程式,請使用位於 Spark 目錄中的 bin/spark-submit
執行。此執行會載入 Spark 的 Java/Scala 函式庫,並讓您將應用程式提交至叢集。您也可以使用 bin/pyspark
啟動互動式 Python shell。
如果您想要存取 HDFS 資料,您需要使用連結到您 HDFS 版本的 PySpark 建置。您也可以在 Spark 首頁取得常見 HDFS 版本的 預先建置套件。
最後,您需要將一些 Spark 類別匯入您的程式中。加入下列程式碼列
from pyspark import SparkContext, SparkConf
PySpark 需要在驅動程式和工作人員中使用相同次要版本的 Python。它會使用 PATH 中的預設 Python 版本,您可以透過 PYSPARK_PYTHON
指定您想要使用的 Python 版本,例如
$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py
Spark 3.5.1 預設建置和發佈時會搭配 Scala 2.12 使用。(Spark 也可以建置為搭配其他版本的 Scala 使用。) 若要撰寫 Scala 中的應用程式,您需要使用相容的 Scala 版本 (例如 2.12.X)。
若要撰寫 Spark 應用程式,您需要在 Spark 中加入 Maven 相依性。Spark 可透過 Maven Central 取得,網址為
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1
此外,如果您想要存取 HDFS 叢集,您需要為您的 HDFS 版本加入 hadoop-client
的相依性。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後,您需要將一些 Spark 類別匯入您的程式中。加入下列程式碼列
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在 Spark 1.3.0 之前,您需要明確 import org.apache.spark.SparkContext._
來啟用必要的隱式轉換。)
Spark 3.5.1 支援 lambda 運算式,以便簡潔地撰寫函式,否則您可以使用 org.apache.spark.api.java.function 套件中的類別。
請注意,Spark 2.2.0 已移除對 Java 7 的支援。
若要撰寫 Java 中的 Spark 應用程式,您需要在 Spark 中加入相依性。Spark 可透過 Maven Central 取得,網址為
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1
此外,如果您想要存取 HDFS 叢集,您需要為您的 HDFS 版本加入 hadoop-client
的相依性。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後,您需要將一些 Spark 類別匯入您的程式中。加入下列程式碼列
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
初始化 Spark
Spark 程式必須執行的第一件事是建立 SparkContext 物件,它會告訴 Spark 如何存取叢集。若要建立 SparkContext
,您首先需要建置一個 SparkConf 物件,其中包含有關您的應用程式的資訊。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
Spark 程式必須做的第一件事就是建立一個 SparkContext 物件,這會告訴 Spark 如何存取叢集。若要建立 SparkContext
,您首先需要建立一個 SparkConf 物件,其中包含有關您應用程式的資訊。
每個 JVM 只應有一個 SparkContext 處於作用中。您必須在建立新的 SparkContext 之前 stop()
正在作用中的 SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
Spark 程式必須做的第一件事就是建立一個 JavaSparkContext 物件,這會告訴 Spark 如何存取叢集。若要建立 SparkContext
,您首先需要建立一個 SparkConf 物件,其中包含有關您應用程式的資訊。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
appName
參數是您應用程式在叢集 UI 上顯示的名稱。master
是 Spark、Mesos 或 YARN 叢集 URL,或是在本地模式下執行的特殊「local」字串。實際上,在叢集上執行時,您不會希望在程式中硬編碼 master
,而是使用 spark-submit
啟動應用程式並在那裡接收它。不過,對於本地測試和單元測試,您可以傳遞「local」以在處理序中執行 Spark。
使用 Shell
在 PySpark shell 中,一個特別的解釋器感知 SparkContext 已為您建立,在變數 sc
中。建立您自己的 SparkContext 將無法運作。您可以使用 --master
參數設定 context 連線的 master,並可透過傳遞逗號分隔的清單給 --py-files
來將 Python .zip、.egg 或 .py 檔案新增至執行時間路徑。對於第三方 Python 相依性,請參閱 Python 套件管理。您也可以透過提供逗號分隔的 Maven 座標清單給 --packages
參數,將相依性 (例如 Spark 套件) 新增至您的 shell 會話。任何可能存在相依性的其他儲存庫 (例如 Sonatype) 都可以傳遞給 --repositories
參數。例如,要在四個核心上執行 bin/pyspark
,請使用
$ ./bin/pyspark --master local[4]
或者,若要將 code.py
也新增至搜尋路徑 (以便稍後可以 import code
),請使用
$ ./bin/pyspark --master local[4] --py-files code.py
如需完整選項清單,請執行 pyspark --help
。在幕後,pyspark
會呼叫更通用的 spark-submit
腳本。
也可以在 IPython 中啟動 PySpark shell,這是增強的 Python 解釋器。PySpark 可與 IPython 1.0.0 及後續版本搭配使用。若要使用 IPython,請在執行 bin/pyspark
時將 PYSPARK_DRIVER_PYTHON
變數設定為 ipython
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
若要使用 Jupyter notebook (以前稱為 IPython notebook),
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
您可以透過設定 PYSPARK_DRIVER_PYTHON_OPTS
來自訂 ipython
或 jupyter
命令。
啟動 Jupyter Notebook 伺服器後,您可以從「檔案」標籤建立新的筆記本。在筆記本中,您可以在開始從 Jupyter notebook 嘗試 Spark 之前,將命令 %pylab inline
作為筆記本的一部分輸入。
在 Spark shell 中,一個特別的解釋器感知 SparkContext 已為您建立,在變數 sc
中。建立您自己的 SparkContext 將無法運作。您可以使用 --master
參數設定 context 連線的 master,並可透過傳遞逗號分隔的清單給 --jars
參數來將 JAR 新增至類別路徑。您也可以透過提供逗號分隔的 Maven 座標清單給 --packages
參數,將相依性 (例如 Spark 套件) 新增至您的 shell 會話。任何可能存在相依性的其他儲存庫 (例如 Sonatype) 都可以傳遞給 --repositories
參數。例如,要在四個核心上執行 bin/spark-shell
,請使用
$ ./bin/spark-shell --master local[4]
或者,若要將 code.jar
也新增至其類別路徑,請使用
$ ./bin/spark-shell --master local[4] --jars code.jar
若要使用 Maven 座標包含相依性
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
如需完整選項清單,請執行 spark-shell --help
。在幕後,spark-shell
會呼叫更通用的 spark-submit
腳本。
彈性分散式資料集 (RDD)
Spark 圍繞著有復原能力的分布式資料集 (RDD) 的概念,這是一個具有容錯能力的元素集合,可以並行運算。有兩種方式可以建立 RDD:並行化驅動程式中的現有集合,或參照外部儲存系統中的資料集,例如共享檔案系統、HDFS、HBase 或提供 Hadoop InputFormat 的任何資料來源。
平行化集合
並行化集合是透過在驅動程式中呼叫 SparkContext
的 parallelize
方法,對現有的 iterable 或集合建立。集合的元素會複製以形成一個分布式資料集,可以並行運算。例如,以下是如何建立一個包含數字 1 到 5 的並行化集合
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
建立後,可以並行運算分布式資料集 (distData
)。例如,我們可以呼叫 distData.reduce(lambda a, b: a + b)
來加總清單中的元素。我們稍後會說明分布式資料集的運算。
並行化集合是透過在驅動程式中呼叫 SparkContext
的 parallelize
方法,對現有集合 (Scala Seq
) 建立。集合的元素會複製以形成一個分布式資料集,可以並行運算。例如,以下是如何建立一個包含數字 1 到 5 的並行化集合
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
建立後,分布式資料集 (distData
) 可以並行運作。例如,我們可以呼叫 distData.reduce((a, b) => a + b)
來加總陣列中的元素。我們稍後會說明分布式資料集的運算。
並行化集合是透過在驅動程式中呼叫 JavaSparkContext
的 parallelize
方法,針對現有的 Collection
建立的。集合的元素會複製形成一個分布式資料集,以便並行運作。例如,以下是建立一個包含數字 1 到 5 的並行化集合的方法
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
建立後,分布式資料集 (distData
) 可以並行運作。例如,我們可以呼叫 distData.reduce((a, b) -> a + b)
來加總清單中的元素。我們稍後會說明分布式資料集的運算。
並行化集合的一個重要參數是將資料集切成多少個分割區。Spark 會為叢集的每個分割區執行一個工作。通常,您會希望每個叢集的 CPU 有 2-4 個分割區。一般來說,Spark 會根據您的叢集自動設定分割區的數量。不過,您也可以手動設定,方法是將分割區的數量傳遞為 parallelize
的第二個參數 (例如 sc.parallelize(data, 10)
)。注意:程式碼中有些地方會使用區段 (分割區的同義詞) 一詞,以維持向後相容性。
外部資料集
PySpark 可以從 Hadoop 支援的任何儲存來源建立分布式資料集,包括您的本機檔案系統、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支援文字檔、SequenceFiles,以及任何其他 Hadoop InputFormat。
文字檔 RDD 可以使用 SparkContext
的 textFile
方法建立。此方法會取得檔案的 URI (機器上的本機路徑,或 hdfs://
、s3a://
等 URI),並將其讀取為一組行。以下是呼叫範例
>>> distFile = sc.textFile("data.txt")
建立後,distFile
可以透過資料集運算來執行動作。例如,我們可以使用 map
和 reduce
運算來加總所有行的長度,如下所示:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
。
使用 Spark 讀取檔案的一些注意事項
-
如果使用本地檔案系統上的路徑,檔案也必須可以在工作節點上的相同路徑存取。將檔案複製到所有工作節點,或使用網路掛載的共用檔案系統。
-
Spark 的所有基於檔案的輸入方法,包括
textFile
,也支援在目錄、壓縮檔案和萬用字元上執行。例如,您可以使用textFile("/my/directory")
、textFile("/my/directory/*.txt")
和textFile("/my/directory/*.gz")
。 -
textFile
方法也採用一個選用的第二個引數來控制檔案的分區數。預設情況下,Spark 會為檔案的每個區塊建立一個分區(在 HDFS 中,區塊預設為 128MB),但您也可以透過傳遞較大的值來要求更多的分區數。請注意,您無法擁有少於區塊數的分區數。
除了文字檔案之外,Spark 的 Python API 也支援其他數種資料格式
-
SparkContext.wholeTextFiles
讓您可以讀取包含多個小型文字檔案的目錄,並將它們每個都傳回為 (檔名,內容) 配對。這與textFile
不同,後者會傳回每個檔案中每行的記錄。 -
RDD.saveAsPickleFile
和SparkContext.pickleFile
支援以包含醃製 Python 物件的簡單格式儲存 RDD。批次處理用於醃製序列化,預設批次大小為 10。 -
SequenceFile 和 Hadoop 輸入/輸出格式
注意此功能目前標記為 實驗性
,並供進階使用者使用。它可能會在未來被基於 Spark SQL 的讀取/寫入支援取代,在這種情況下,Spark SQL 是首選方法。
可寫入支援
PySpark SequenceFile 支援在 Java 中載入 RDD 的鍵值對,將 Writable 轉換為基本 Java 類型,並使用 pickle 將產生的 Java 物件序列化。當將 RDD 的鍵值對儲存到 SequenceFile 時,PySpark 會執行相反的動作。它會將 Python 物件反序列化為 Java 物件,然後將它們轉換為 Writable。下列 Writable 會自動轉換
Writable 類型 | Python 類型 |
---|---|
文字 | str |
IntWritable | int |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
陣列並未預設處理。使用者需要在讀取或寫入時指定自訂 ArrayWritable
子類型。在寫入時,使用者也需要指定自訂轉換器,將陣列轉換為自訂 ArrayWritable
子類型。在讀取時,預設轉換器會將自訂 ArrayWritable
子類型轉換為 Java Object[]
,然後將其序列化為 Python 元組。若要取得基本類型陣列的 Python array.array
,使用者需要指定自訂轉換器。
儲存和載入 SequenceFile
類似於文字檔案,SequenceFile 可以透過指定路徑來儲存和載入。可以指定鍵和值類別,但對於標準 Writable 來說,這並非必要。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
儲存和載入其他 Hadoop 輸入/輸出格式
PySpark 也能讀取任何 Hadoop InputFormat 或寫入任何 Hadoop OutputFormat,適用於「新」和「舊」Hadoop MapReduce API。如有需要,Hadoop 組態可以作為 Python dict 傳入。以下是使用 Elasticsearch ESInputFormat 的範例
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})
請注意,如果 InputFormat 僅依賴於 Hadoop 組態和/或輸入路徑,而且鍵和值類別可以根據上表輕鬆轉換,那麼這種方法應該很適合這種情況。
如果您有自訂序列化二進位資料(例如從 Cassandra/HBase 載入資料),則您需要先在 Scala/Java 端將該資料轉換為 pickle 的 pickler 可以處理的內容。有一個 Converter 特性可供使用。只要延伸此特性,並在 convert
方法中實作轉換程式碼即可。請務必確保此類別連同存取 InputFormat
所需的任何依賴項,都打包到您的 Spark 任務 jar 中,並包含在 PySpark classpath 中。
請參閱 Python 範例 和 Converter 範例,以取得使用 Cassandra/HBase InputFormat
和 OutputFormat
搭配自訂轉換器的範例。
Spark 可以從 Hadoop 支援的任何儲存來源建立分散式資料集,包括您的本機檔案系統、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支援文字檔案、SequenceFiles,以及任何其他 Hadoop InputFormat。
文字檔 RDD 可以使用 SparkContext
的 textFile
方法建立。此方法會取得檔案的 URI (機器上的本機路徑,或 hdfs://
、s3a://
等 URI),並將其讀取為一組行。以下是呼叫範例
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
建立後,distFile
可由資料集操作執行。例如,我們可以使用 map
和 reduce
操作,將所有列的大小加總,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)
。
使用 Spark 讀取檔案的一些注意事項
-
如果使用本地檔案系統上的路徑,檔案也必須可以在工作節點上的相同路徑存取。將檔案複製到所有工作節點,或使用網路掛載的共用檔案系統。
-
Spark 的所有基於檔案的輸入方法,包括
textFile
,也支援執行目錄、壓縮檔案和萬用字元。例如,您可以使用textFile("/my/directory")
、textFile("/my/directory/*.txt")
和textFile("/my/directory/*.gz")
。當讀取多個檔案時,分割區的順序取決於檔案從檔案系統傳回的順序。例如,它可能不會遵循檔案路徑的字順排序。在分割區內,元素會根據其在底層檔案中的順序排序。 -
textFile
方法也採用一個選用的第二個引數來控制檔案的分區數。預設情況下,Spark 會為檔案的每個區塊建立一個分區(在 HDFS 中,區塊預設為 128MB),但您也可以透過傳遞較大的值來要求更多的分區數。請注意,您無法擁有少於區塊數的分區數。
除了文字檔案之外,Spark 的 Scala API 也支援其他幾種資料格式
-
SparkContext.wholeTextFiles
讓您可以讀取包含多個小型文字檔案的目錄,並將它們每個傳回為 (檔名、內容) 配對。這與textFile
不同,後者會傳回每個檔案中每行的記錄。分割是由資料區域性決定的,在某些情況下,可能會導致分割區太少。對於這些情況,wholeTextFiles
提供一個控制分割區最小數量的第二個選用引數。 -
對於 SequenceFiles,請使用 SparkContext 的
sequenceFile[K, V]
方法,其中K
和V
是檔案中金鑰和值的類型。這些應該是 Hadoop 的 Writable 介面的子類別,例如 IntWritable 和 Text。此外,Spark 允許您為一些常見的 Writable 指定原生類型;例如,sequenceFile[Int, String]
會自動讀取 IntWritable 和 Text。 -
對於其他 Hadoop InputFormat,您可以使用
SparkContext.hadoopRDD
方法,它會採用任意的JobConf
和輸入格式類別、金鑰類別和值類別。請以與使用輸入來源的 Hadoop 工作相同的設定方式設定這些。您也可以使用SparkContext.newAPIHadoopRDD
,針對基於「新」MapReduce API (org.apache.hadoop.mapreduce
) 的 InputFormat。 -
RDD.saveAsObjectFile
和SparkContext.objectFile
支援以由序列化 Java 物件組成的簡單格式儲存 RDD。雖然這不如 Avro 等專門格式有效率,但它提供了一種儲存任何 RDD 的簡單方法。
Spark 可以從 Hadoop 支援的任何儲存來源建立分散式資料集,包括您的本機檔案系統、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支援文字檔案、SequenceFiles,以及任何其他 Hadoop InputFormat。
文字檔 RDD 可以使用 SparkContext
的 textFile
方法建立。此方法會取得檔案的 URI (機器上的本機路徑,或 hdfs://
、s3a://
等 URI),並將其讀取為一組行。以下是呼叫範例
JavaRDD<String> distFile = sc.textFile("data.txt");
建立後,distFile
可以由資料集操作處理。例如,我們可以使用 map
和 reduce
操作來加總所有行的長度,如下所示:distFile.map(s -> s.length()).reduce((a, b) -> a + b)
。
使用 Spark 讀取檔案的一些注意事項
-
如果使用本地檔案系統上的路徑,檔案也必須可以在工作節點上的相同路徑存取。將檔案複製到所有工作節點,或使用網路掛載的共用檔案系統。
-
Spark 的所有基於檔案的輸入方法,包括
textFile
,也支援在目錄、壓縮檔案和萬用字元上執行。例如,您可以使用textFile("/my/directory")
、textFile("/my/directory/*.txt")
和textFile("/my/directory/*.gz")
。 -
textFile
方法也採用一個選用的第二個引數來控制檔案的分區數。預設情況下,Spark 會為檔案的每個區塊建立一個分區(在 HDFS 中,區塊預設為 128MB),但您也可以透過傳遞較大的值來要求更多的分區數。請注意,您無法擁有少於區塊數的分區數。
除了文字檔案,Spark 的 Java API 也支援其他幾種資料格式
-
JavaSparkContext.wholeTextFiles
讓您可以讀取包含多個小型文字檔案的目錄,並將每個檔案回傳為 (檔案名稱、內容) 配對。這與textFile
不同,後者會回傳每個檔案中每行的單一記錄。 -
對於 SequenceFiles,請使用 SparkContext 的
sequenceFile[K, V]
方法,其中K
和V
是檔案中金鑰和值的類型。這些應該是 Hadoop 的 Writable 介面的子類別,例如 IntWritable 和 Text。 -
對於其他 Hadoop InputFormats,您可以使用
JavaSparkContext.hadoopRDD
方法,此方法會採用任意JobConf
和輸入格式類別、金鑰類別和值類別。請以與 Hadoop 作業使用輸入來源相同的方式設定這些內容。您也可以使用JavaSparkContext.newAPIHadoopRDD
來處理基於「新」MapReduce API (org.apache.hadoop.mapreduce
) 的 InputFormats。 -
JavaRDD.saveAsObjectFile
和JavaSparkContext.objectFile
支援以包含序列化 Java 物件的簡單格式儲存 RDD。雖然這不如 Avro 等專門格式有效率,但它提供了一個儲存任何 RDD 的簡單方法。
RDD 操作
RDD 支援兩種類型的操作:轉換,從現有資料集建立新的資料集,以及動作,在資料集上執行運算後回傳值給驅動程式。例如,map
是一個轉換,它會將每個資料集元素傳遞給一個函式,並回傳表示結果的新 RDD。另一方面,reduce
是一個動作,它會使用某個函式彙總 RDD 的所有元素,並將最終結果回傳給驅動程式(儘管還有一個並行的 reduceByKey
,它會回傳一個分散式資料集)。
Spark 中的所有轉換都是延遲的,這表示它們不會立即運算其結果。相反地,它們只會記住套用至某個基礎資料集(例如檔案)的轉換。只有當動作需要將結果回傳給驅動程式時,才會運算轉換。此設計讓 Spark 能夠更有效率地執行。例如,我們可以了解到透過 map
建立的資料集將會用於 reduce
,並只將 reduce
的結果回傳給驅動程式,而不是較大的已對應資料集。
預設情況下,每次對轉換後的 RDD 執行動作時,可能會重新計算。不過,您也可以使用 persist
(或 cache
)方法將 RDD 保留在記憶體中,這樣 Spark 會將元素保留在叢集中,以便下次查詢時能更快存取。此外,也支援將 RDD 保存在磁碟上,或複製到多個節點。
基礎
為了說明 RDD 的基礎知識,請考慮以下簡單程式
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行定義來自外部檔案的基本 RDD。此資料集不會載入記憶體或執行其他動作: lines
僅為指向檔案的指標。第二行定義 lineLengths
為 map
轉換的結果。同樣地,由於惰性, lineLengths
不會 立即計算。最後,我們執行 reduce
,這是一個動作。此時,Spark 會將計算分解成在個別機器上執行的任務,而每部機器都會執行其對應的地圖部分和區域性縮減,只傳回其答案給驅動程式。
如果我們稍後也想再次使用 lineLengths
,我們可以在 reduce
之前新增
lineLengths.persist()
,這會導致 lineLengths
在第一次計算後儲存在記憶體中。
為了說明 RDD 的基礎知識,請考慮以下簡單程式
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行定義來自外部檔案的基本 RDD。此資料集不會載入記憶體或執行其他動作: lines
僅為指向檔案的指標。第二行定義 lineLengths
為 map
轉換的結果。同樣地,由於惰性, lineLengths
不會 立即計算。最後,我們執行 reduce
,這是一個動作。此時,Spark 會將計算分解成在個別機器上執行的任務,而每部機器都會執行其對應的地圖部分和區域性縮減,只傳回其答案給驅動程式。
如果我們稍後也想再次使用 lineLengths
,我們可以在 reduce
之前新增
lineLengths.persist()
,這會導致 lineLengths
在第一次計算後儲存在記憶體中。
為了說明 RDD 的基礎知識,請考慮以下簡單程式
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
第一行定義來自外部檔案的基本 RDD。此資料集不會載入記憶體或執行其他動作: lines
僅為指向檔案的指標。第二行定義 lineLengths
為 map
轉換的結果。同樣地,由於惰性, lineLengths
不會 立即計算。最後,我們執行 reduce
,這是一個動作。此時,Spark 會將計算分解成在個別機器上執行的任務,而每部機器都會執行其對應的地圖部分和區域性縮減,只傳回其答案給驅動程式。
如果我們稍後也想再次使用 lineLengths
,我們可以在 reduce
之前新增
lineLengths.persist(StorageLevel.MEMORY_ONLY());
,這會導致 lineLengths
在第一次計算後儲存在記憶體中。
傳遞函數給 Spark
Spark 的 API 大量依賴在驅動程式中傳遞函式在叢集中執行。有三個建議的執行方式
- Lambda 運算式,用於可以寫成運算式的簡單函式。(Lambda 不支援多敘述函式或未傳回值的敘述。)
- 呼叫 Spark 函式內的區域
def
,用於較長的程式碼。 - 模組中的頂層函式。
例如,若要傳遞比使用 lambda
所支援的更長的函式,請考慮以下程式碼
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
請注意,雖然也可以傳遞對類別執行個體中方法的參考(與單例物件相反),但這需要傳送包含該類別的物件以及方法。例如,請考慮
class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)
在此,如果我們建立一個 new MyClass
並對其呼叫 doStuff
,則其中的 map
參照 MyClass
執行個體的 func
方法,因此需要將整個物件傳送到叢集。
以類似的方式,存取外部物件的欄位會參照整個物件
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + s)
若要避免此問題,最簡單的方法是將 field
複製到一個區域變數,而不是從外部存取它
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + s)
Spark 的 API 大量依賴於將驅動程式中的函式傳遞到叢集上執行。有兩種建議的方法可以執行此操作
- 匿名函式語法,可用於簡短的程式碼片段。
- 全域單例物件中的靜態方法。例如,您可以定義
object MyFunctions
,然後傳遞MyFunctions.func1
,如下所示
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
請注意,雖然也可以傳遞對類別執行個體中方法的參考(與單例物件相反),但這需要傳送包含該類別的物件以及方法。例如,請考慮
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
在此,如果我們建立一個新的 MyClass
執行個體並對其呼叫 doStuff
,則其中的 map
參照 MyClass
執行個體的 func1
方法,因此需要將整個物件傳送到叢集。這類似於撰寫 rdd.map(x => this.func1(x))
。
以類似的方式,存取外部物件的欄位會參照整個物件
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
等於撰寫 rdd.map(x => this.field + x)
,它參照所有 this
。若要避免此問題,最簡單的方法是將 field
複製到一個區域變數,而不是從外部存取它
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
Spark 的 API 大量依賴傳遞驅動程式中的函式在叢集上執行。在 Java 中,函式由實作 org.apache.spark.api.java.function 套件中介面的類別表示。有兩種方式可以建立此類函式
- 在您自己的類別中實作 Function 介面,做為匿名內部類別或命名類別,並傳遞其執行個體給 Spark。
- 使用 lambda 表達式 簡潔地定義實作。
雖然本指南的大部分內容都使用 lambda 語法以求簡潔,但所有相同的 API 都很容易使用長格式。例如,我們可以將上述程式碼寫成如下
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
或者,如果內嵌撰寫函式很笨重
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
請注意,Java 中的匿名內部類別也可以存取封裝範圍中的變數,只要它們標記為 final
。Spark 會將這些變數的複本傳送給每個工作節點,就像它對其他語言所做的一樣。
瞭解封閉
關於 Spark 較困難的事情之一是在叢集上執行程式碼時瞭解變數和方法的範圍和生命週期。修改其範圍外變數的 RDD 作業可能是造成混淆的常見來源。在以下範例中,我們將檢視使用 foreach()
來遞增計數器的程式碼,但其他作業也可能發生類似問題。
範例
考慮以下天真的 RDD 元素總和,其行為可能因執行是否發生在同一個 JVM 中而有所不同。一個常見的範例是在 local
模式 (--master = local[n]
) 下執行 Spark,而不是將 Spark 應用程式部署到叢集(例如透過 spark-submit 到 YARN)
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
本地模式與叢集模式
上述程式碼的行為未定義,可能無法按預期運作。為了執行作業,Spark 會將 RDD 作業的處理分解成任務,每個任務由執行器執行。在執行之前,Spark 會計算任務的封閉。封閉是執行器在 RDD 上執行其運算(在本例中為 foreach()
)時必須可見的變數和方法。此封閉會序列化並傳送給每個執行器。
傳送給每個執行器的封閉中的變數現在是複本,因此,當在 foreach
函式中參照 counter 時,它不再是驅動程式節點上的 counter。驅動程式節點的記憶體中仍然有一個 counter,但執行器不再能看到它!執行器只會看到序列化封閉中的複本。因此,counter 的最終值仍然會是零,因為對 counter 的所有作業都參照序列化封閉中的值。
在某些情況下,在 local 模式中,foreach
函式實際上會在與驅動程式相同的 JVM 中執行,並會參照同一個原始 counter,並且實際上可能會更新它。
為了確保在這些類型的場景中行為明確,應該使用 Accumulator
。Spark 中的 Accumulator 專門用於提供一種機制,以便在叢集中的工作節點之間分割執行時安全地更新變數。本指南的 Accumulator 區段會更詳細地討論這些內容。
一般而言,不應使用封閉結構(例如迴圈或局部定義的方法)來變更某些全域狀態。Spark 沒有定義或保證從封閉結構外部參照的物件變更行為。有些程式碼在本地模式下可能會執行,但這只是意外,而且此類程式碼在分散式模式下不會按照預期執行。如果需要進行某些全域聚合,請改用累加器。
列印 RDD 的元素
另一種常見的慣用語法是嘗試使用 rdd.foreach(println)
或 rdd.map(println)
列印 RDD 的元素。在單一機器上,這會產生預期的輸出並列印所有 RDD 的元素。然而,在 cluster
模式中,執行器呼叫的 stdout
輸出現在會寫入執行器的 stdout
,而不是驅動程式上的 stdout
,因此驅動程式上的 stdout
不會顯示這些輸出!若要在驅動程式上列印所有元素,可以使用 collect()
方法,先將 RDD 帶到驅動程式節點,如下所示:rdd.collect().foreach(println)
。不過,這可能會導致驅動程式記憶體不足,因為 collect()
會將整個 RDD 取用至單一機器;如果您只需要列印 RDD 的幾個元素,較安全的做法是使用 take()
:rdd.take(100).foreach(println)
。
使用鍵值對
雖然大多數 Spark 作業都針對包含任何類型物件的 RDD,但只有少數特殊作業僅適用於鍵值配對的 RDD。最常見的是分散式「洗牌」作業,例如依據某個鍵對元素進行分組或聚合。
在 Python 中,這些作業會針對包含內建 Python 元組的 RDD 執行,例如 (1, 2)
。只要建立此類元組,然後呼叫您想要的作業即可。
例如,下列程式碼對鍵值配對使用 reduceByKey
作業,以計算檔案中每一行文字出現的次數
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
我們也可以使用 counts.sortByKey()
,例如,按字母順序對配對進行排序,最後使用 counts.collect()
將它們作為物件清單帶回驅動程式。
雖然大多數 Spark 作業都針對包含任何類型物件的 RDD,但只有少數特殊作業僅適用於鍵值配對的 RDD。最常見的是分散式「洗牌」作業,例如依據某個鍵對元素進行分組或聚合。
在 Scala 中,這些操作會自動在包含 Tuple2 物件 (語言中的內建元組,透過簡單撰寫 (a, b)
) 的 RDD 上提供。在 PairRDDFunctions 類別中提供鍵值配對操作,該類別會自動封裝在元組 RDD 中。
例如,下列程式碼對鍵值配對使用 reduceByKey
作業,以計算檔案中每一行文字出現的次數
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
我們也可以使用 counts.sortByKey()
,例如,按字母順序對配對進行排序,最後使用 counts.collect()
將它們作為物件陣列帶回驅動程式。
注意:在鍵值配對操作中將自訂物件用作鍵時,您必須確定自訂 equals()
方法會搭配相符的 hashCode()
方法。有關完整詳細資料,請參閱 Object.hashCode() 文件 中概述的合約。
雖然大多數 Spark 作業都針對包含任何類型物件的 RDD,但只有少數特殊作業僅適用於鍵值配對的 RDD。最常見的是分散式「洗牌」作業,例如依據某個鍵對元素進行分組或聚合。
在 Java 中,鍵值配對會使用 Scala 標準函式庫中的 scala.Tuple2 類別表示。您只要呼叫 new Tuple2(a, b)
即可建立元組,並使用 tuple._1()
和 tuple._2()
在稍後存取其欄位。
鍵值配對的 RDD 會由 JavaPairRDD 類別表示。您可以使用 map
操作的特殊版本,例如 mapToPair
和 flatMapToPair
,從 JavaRDD 建立 JavaPairRDD。JavaPairRDD 會同時具備標準 RDD 函式和特殊鍵值函式。
例如,下列程式碼對鍵值配對使用 reduceByKey
作業,以計算檔案中每一行文字出現的次數
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
我們也可以使用 counts.sortByKey()
,例如,按字母順序對配對進行排序,最後使用 counts.collect()
將它們作為物件陣列帶回驅動程式。
注意:在鍵值配對操作中將自訂物件用作鍵時,您必須確定自訂 equals()
方法會搭配相符的 hashCode()
方法。有關完整詳細資料,請參閱 Object.hashCode() 文件 中概述的合約。
轉換
下表列出 Spark 支援的一些常見轉換。有關詳細資料,請參閱 RDD API 文件 (Scala、Java、Python、R) 和配對 RDD 函式文件 (Scala、Java)。
動作
下列表格列出 Spark 支援的一些常見動作。請參閱 RDD API 文件 (Scala, Java, Python, R)
和配對 RDD 函數文件 (Scala, Java) 以取得詳細資訊。
動作 | 意義 |
---|---|
reduce(func) | 使用函數 func (它接收兩個引數並傳回一個引數) 來彙總資料集的元素。該函數應具有交換性和關聯性,以便能正確地並行運算。 |
collect() | 將資料集的所有元素以陣列形式傳回驅動程式。這通常在過濾或其他傳回足夠小資料子集的運算後會很有用。 |
count() | 傳回資料集中元素的數量。 |
first() | 傳回資料集的第一個元素 (類似於 take(1))。 |
take(n) | 傳回一個陣列,其中包含資料集的前 n 個元素。 |
takeSample(withReplacement, num, [seed]) | 傳回一個陣列,其中包含資料集中 num 個元素的隨機樣本,可選擇是否替換,並可選擇預先指定亂數產生器種子。 |
takeOrdered(n, [ordering]) | 使用其自然順序或自訂比較器傳回 RDD 的前 n 個元素。 |
saveAsTextFile(path) | 將資料集的元素寫入給定目錄中的文字檔 (或一組文字檔),該目錄位於本機檔案系統、HDFS 或任何其他 Hadoop 支援的檔案系統中。Spark 會對每個元素呼叫 toString,將其轉換為檔案中的一行文字。 |
saveAsSequenceFile(path) (Java 和 Scala) |
將資料集的元素寫入給定路徑中的 Hadoop SequenceFile,該路徑位於本機檔案系統、HDFS 或任何其他 Hadoop 支援的檔案系統中。這適用於實作 Hadoop 的 Writable 介面的鍵值配對 RDD。在 Scala 中,它也適用於可隱式轉換為 Writable 的類型 (Spark 包含基本類型 (如 Int、Double、String 等) 的轉換)。 |
saveAsObjectFile(path) (Java 和 Scala) |
使用 Java 序列化以簡單格式撰寫資料集的元素,然後可以使用 SparkContext.objectFile() 載入。 |
countByKey() | 僅可用於類型為 (K, V) 的 RDD。傳回包含每個金鑰計數的 (K, Int) 對應的雜湊映射。 |
foreach(func) | 對資料集的每個元素執行函式 func。這通常用於副作用,例如更新 累加器 或與外部儲存系統互動。
注意:修改 foreach() 外部的變數(累加器除外)可能會導致未定義的行為。請參閱 了解封閉 以取得更多詳細資訊。 |
Spark RDD API 也公開某些動作的非同步版本,例如 foreach
的 foreachAsync
,它會立即傳回 FutureAction
給呼叫者,而不是在動作完成時封鎖。這可用於管理或等待動作的非同步執行。
洗牌運算
Spark 內的某些操作會觸發稱為洗牌的事件。洗牌是 Spark 重新分配資料的機制,以便在分割區中以不同的方式分組。這通常涉及在執行器和機器之間複製資料,使洗牌成為複雜且昂貴的操作。
背景
為了了解洗牌期間發生的情況,我們可以考慮 reduceByKey
操作的範例。reduceByKey
操作會產生一個新的 RDD,其中單一金鑰的所有值都會合併成一個元組 - 金鑰和針對與該金鑰關聯的所有值執行還原函式的結果。挑戰在於,單一金鑰的所有值不一定都位於同一個分割區,甚至同一個機器上,但它們必須並置才能計算結果。
在 Spark 中,資料通常不會分散在分區中,以供特定作業使用。在運算期間,單一工作會在單一分區上執行 - 因此,若要整理所有資料,以便執行單一 reduceByKey
縮減工作,Spark 需要執行全對全作業。它必須從所有分區讀取,以找出所有金鑰的所有值,然後彙整分區中的值,以計算每個金鑰的最終結果 - 這稱為洗牌。
儘管新洗牌資料的每個分區中的元素集合會是確定的,分區本身的順序也是如此,但這些元素的順序卻不確定。如果希望在洗牌後取得可預測順序的資料,則可以使用
mapPartitions
使用.sorted
等方法對每個分區進行排序repartitionAndSortWithinPartitions
在重新分區的同時有效率地對分區進行排序sortBy
建立全域順序的 RDD
可能導致洗牌的作業包括 repartition
和 coalesce
等重新分區作業、groupByKey
和 reduceByKey
等「依金鑰」作業(計數除外),以及 cogroup
和 join
等聯結作業。
效能影響
洗牌是一種昂貴的作業,因為它涉及磁碟 I/O、資料序列化和網路 I/O。為了整理洗牌資料,Spark 會產生工作集合 - 對應工作用於整理資料,而一組縮減工作則用於彙總資料。此術語源自 MapReduce,與 Spark 的 map
和 reduce
作業無直接關聯。
在內部,個別對應工作產生的結果會保留在記憶體中,直到無法容納為止。然後,這些結果會根據目標分區進行排序,並寫入單一檔案。在縮減端,工作會讀取相關的已排序區塊。
某些混洗操作會消耗大量的堆積記憶體,因為它們會使用記憶體中的資料結構來整理記錄,在傳輸記錄之前或之後。特別是,reduceByKey
和 aggregateByKey
會在映射端建立這些結構,而 'ByKey
操作會在縮減端產生這些結構。當資料無法放入記憶體時,Spark 會將這些表格溢位到磁碟,產生額外的磁碟 I/O 負擔和增加垃圾回收。
混洗也會在磁碟上產生大量的中間檔案。從 Spark 1.3 開始,這些檔案會保留到對應的 RDD 不再使用並被垃圾回收為止。這樣做是為了讓混洗檔案在重新計算血緣關係時不需要重新建立。垃圾回收可能只會在很長一段時間後發生,如果應用程式保留對這些 RDD 的參照,或者如果 GC 沒有頻繁啟動。這表示執行時間很長的 Spark 作業可能會消耗大量的磁碟空間。在設定 Spark 環境時,暫時儲存目錄會由 spark.local.dir
設定參數指定。
可以透過調整各種設定參數來調整混洗行為。請參閱 Spark 設定指南 中的「混洗行為」區段。
RDD 持久性
Spark 中最重要的功能之一是在作業之間將資料集持續(或快取)在記憶體中。當您持續一個 RDD 時,每個節點會將它計算的任何分區儲存在記憶體中,並在對該資料集(或從該資料集衍生的資料集)執行的其他動作中重複使用它們。這讓後續的動作可以快很多(通常快 10 倍以上)。快取是反覆運算演算法和快速互動使用的關鍵工具。
您可以使用 RDD 上的 persist()
或 cache()
方法來標記 RDD 以持續。在動作中第一次計算它時,它會保留在節點的記憶體中。Spark 的快取具有容錯能力,如果 RDD 的任何分區遺失,它會自動使用最初建立它的轉換重新計算。
此外,每個持續的 RDD 都可以使用不同的儲存層級儲存,例如,讓您可以將資料集持續儲存在磁碟中、將其持續儲存在記憶體中(但以序列化 Java 物件儲存,以節省空間)、將其複製到各個節點。這些層級是透過將 StorageLevel
物件(Scala、Java、Python)傳遞給 persist()
來設定的。 cache()
方法是使用預設儲存層級的簡寫,也就是 StorageLevel.MEMORY_ONLY
(將反序列化的物件儲存在記憶體中)。完整的儲存層級如下:
儲存層級 | 意義 |
---|---|
MEMORY_ONLY | 將 RDD 以反序列化的 Java 物件儲存在 JVM 中。如果 RDD 無法放入記憶體,某些分割區將不會快取,且每次需要時都會重新計算。這是預設層級。 |
MEMORY_AND_DISK | 將 RDD 以反序列化的 Java 物件儲存在 JVM 中。如果 RDD 無法放入記憶體,將無法放入的分割區儲存在磁碟中,並在需要時從磁碟中讀取。 |
MEMORY_ONLY_SER (Java 和 Scala) |
將 RDD 以序列化的 Java 物件儲存(每個分割區一個位元組陣列)。這通常比反序列化的物件更省空間,特別是在使用快速序列化器時,但讀取時會更耗費 CPU。 |
MEMORY_AND_DISK_SER (Java 和 Scala) |
類似於 MEMORY_ONLY_SER,但將無法放入記憶體的分割區溢位到磁碟中,而不是每次需要時重新計算。 |
DISK_ONLY | 只將 RDD 分割區儲存在磁碟中。 |
MEMORY_ONLY_2、MEMORY_AND_DISK_2 等 | 與上述層級相同,但將每個分割區複製到兩個叢集節點上。 |
OFF_HEAP(實驗性質) | 類似於 MEMORY_ONLY_SER,但將資料儲存在堆外記憶體中。這需要啟用堆外記憶體。 |
注意:在 Python 中,儲存的物件將永遠使用 Pickle 函式庫序列化,因此您選擇哪個序列化層級並無影響。Python 中可用的儲存層級包括 MEMORY_ONLY
、MEMORY_ONLY_2
、MEMORY_AND_DISK
、MEMORY_AND_DISK_2
、DISK_ONLY
、DISK_ONLY_2
和 DISK_ONLY_3
。
Spark 也會在混和操作中自動持續儲存一些中間資料(例如 reduceByKey
),即使使用者沒有呼叫 persist
。這樣做是為了避免在混和期間節點發生故障時重新計算整個輸入。我們仍建議使用者在結果 RDD 上呼叫 persist
,如果他們打算重複使用它。
選擇哪種儲存層級?
Spark 的儲存層級旨在提供記憶體使用量和 CPU 效率之間的不同權衡。我們建議透過以下流程來選擇一個
-
如果您的 RDD 舒適地符合預設儲存層級 (
MEMORY_ONLY
),請保持它們這樣。這是最省 CPU 的選項,允許對 RDD 的操作盡可能快地執行。 -
如果不符合,請嘗試使用
MEMORY_ONLY_SER
和 選擇一個快速的序列化函式庫,以使物件更省空間,但仍然可以合理的速度存取。(Java 和 Scala) -
除非計算您的資料集的函式很昂貴,或它們會過濾大量資料,否則不要溢位到磁碟。否則,重新計算一個分割區可能會和從磁碟讀取它一樣快。
-
如果您想要快速的故障復原(例如,如果使用 Spark 來處理來自 Web 應用程式的要求),請使用複製儲存層級。所有儲存層級都透過重新計算遺失的資料來提供完整的容錯能力,但複製儲存層級讓您可以在不等待重新計算遺失分割區的情況下繼續在 RDD 上執行任務。
移除資料
Spark 會自動監控每個節點上的快取使用量,並以最近最少使用 (LRU) 的方式刪除舊的資料分割區。如果您想手動移除 RDD 而不是等待它從快取中移除,請使用 RDD.unpersist()
方法。請注意,此方法預設不會封鎖。若要封鎖直到資源釋放,請在呼叫此方法時指定 blocking=true
。
共用變數
通常,當傳遞給 Spark 操作的函式(例如 map
或 reduce
)在遠端叢集節點上執行時,它會處理函式中使用的所有變數的獨立副本。這些變數會複製到每部機器,而且遠端機器上的變數更新不會傳播回驅動程式。支援跨任務的一般讀寫共用變數會很沒有效率。然而,Spark 確實提供兩種有限類型的共用變數,以供兩種常見使用模式使用:廣播變數和累加器。
廣播變數
廣播變數讓程式設計師可以在每部機器上快取唯讀變數,而不是隨任務運送其副本。例如,它們可用於以有效率的方式為每個節點提供大型輸入資料集的副本。Spark 也會嘗試使用有效率的廣播演算法來散佈廣播變數,以降低通訊成本。
Spark 動作會透過一組階段執行,並以分散式「洗牌」作業分隔。Spark 會自動廣播每個階段內任務所需的共用資料。以這種方式廣播的資料會以序列化形式快取,並在執行每個任務前解除序列化。這表示只有當跨多個階段的任務需要相同的資料,或快取解除序列化形式的資料很重要時,才需要明確建立廣播變數。
廣播變數會從變數 v
建立,方法是呼叫 SparkContext.broadcast(v)
。廣播變數是 v
的包裝器,而其值可透過呼叫 value
方法存取。以下程式碼顯示此內容
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
建立廣播變數後,應在叢集上執行的任何函式中使用它,而不是值 v
,如此一來 v
就不會運送到節點超過一次。此外,在廣播物件 v
後,不應修改它,以確保所有節點取得廣播變數相同的值(例如,如果變數稍後運送到新節點)。
若要釋放廣播變數複製到執行器的資源,請呼叫 .unpersist()
。如果稍後再次使用廣播,它將會重新廣播。若要永久釋放廣播變數使用的所有資源,請呼叫 .destroy()
。之後無法使用廣播變數。請注意,這些方法預設不會封鎖。若要在資源釋放前封鎖,請在呼叫它們時指定 blocking=true
。
累加器
累加器是僅透過關聯和交換運算「新增」的變數,因此可以在平行中有效率地支援它們。它們可用於實作計數器(如 MapReduce)或總和。Spark 本機支援數值類型的累加器,而程式設計師可以新增對新類型的支援。
作為使用者,您可以建立命名或未命名的累加器。如以下圖片所示,命名累加器(在本例中為 counter
)會顯示在修改該累加器的階段的網路使用者介面中。Spark 會在「工作」表格中顯示由工作修改的每個累加器的值。
追蹤使用者介面中的累加器有助於了解執行階段的進度(注意:Python 中尚未支援此功能)。
累加器是由初始值 v
透過呼叫 SparkContext.accumulator(v)
建立。然後,執行於叢集上的工作可以使用 add
方法或 +=
算子新增至累加器。不過,工作無法讀取其值。只有驅動程式可以使用其 value
方法讀取累加器的值。
以下程式碼顯示累加器用於加總陣列元素
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
>>> accum.value
10
雖然此程式碼使用內建的 Int 類型累加器支援,但程式設計師也可以透過建立 AccumulatorParam 的子類別來建立自己的類型。AccumulatorParam 介面有兩個方法:zero
用於提供資料類型的「零值」,以及 addInPlace
用於將兩個值加總。例如,假設我們有一個代表數學向量的 Vector
類別,我們可以撰寫
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)
def addInPlace(self, v1, v2):
v1 += v2
return v1
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
可以透過呼叫 SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
來建立數字累加器,分別累積 Long 或 Double 類型的值。然後,執行於叢集上的工作可以使用 add
方法新增至累加器。不過,工作無法讀取其值。只有驅動程式可以使用其 value
方法讀取累加器的值。
以下程式碼顯示累加器用於加總陣列元素
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
雖然此程式碼使用內建的 Long 型累加器支援,但程式設計人員也可以透過建立 AccumulatorV2 的子類別來建立自己的型別。AccumulatorV2 抽象類別有數個必須覆寫的方法:reset
用於將累加器重設為零,add
用於將另一個值加入累加器,merge
用於將另一個相同型別的累加器合併到此累加器。必須覆寫的其他方法包含在 API 文件 中。例如,假設我們有一個表示數學向量的 MyVector
類別,我們可以撰寫
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
請注意,當程式設計人員定義自己的 AccumulatorV2 型別時,產生的型別可能與所加入元素的型別不同。
可以透過呼叫 SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
來建立數字累加器,分別累積 Long 或 Double 類型的值。然後,執行於叢集上的工作可以使用 add
方法新增至累加器。不過,工作無法讀取其值。只有驅動程式可以使用其 value
方法讀取累加器的值。
以下程式碼顯示累加器用於加總陣列元素
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
雖然此程式碼使用內建的 Long 型累加器支援,但程式設計人員也可以透過建立 AccumulatorV2 的子類別來建立自己的型別。AccumulatorV2 抽象類別有數個必須覆寫的方法:reset
用於將累加器重設為零,add
用於將另一個值加入累加器,merge
用於將另一個相同型別的累加器合併到此累加器。必須覆寫的其他方法包含在 API 文件 中。例如,假設我們有一個表示數學向量的 MyVector
類別,我們可以撰寫
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
private MyVector myVector = MyVector.createZeroVector();
public void reset() {
myVector.reset();
}
public void add(MyVector v) {
myVector.add(v);
}
...
}
// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
請注意,當程式設計人員定義自己的 AccumulatorV2 型別時,產生的型別可能與所加入元素的型別不同。
警告:當 Spark 工作完成時,Spark 會嘗試將此工作中的累積更新合併到累加器。如果失敗,Spark 會忽略失敗並仍將工作標記為成功,並繼續執行其他工作。因此,有錯誤的累加器不會影響 Spark 工作,但即使 Spark 工作成功,它也可能無法正確更新。
對於僅在動作中執行的累加器更新,Spark 保證每個工作對累加器的更新只會套用一次,亦即重新啟動的工作不會更新值。在轉換中,使用者應注意,如果工作或工作階段重新執行,每個工作的更新可能會套用多次。
累加器不會變更 Spark 的延遲評估模型。如果它們在對 RDD 的操作中更新,它們的值只會在 RDD 作為動作的一部分計算時更新一次。因此,不保證累加器更新會在延遲轉換(例如 map()
)中執行。以下程式碼片段示範此屬性
accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
部署至叢集
應用程式提交指南 說明如何將應用程式提交到叢集。簡而言之,一旦您將應用程式打包成 JAR(適用於 Java/Scala)或一組 .py
或 .zip
檔案(適用於 Python),bin/spark-submit
指令碼便讓您將其提交到任何受支援的叢集管理員。
從 Java/Scala 啟動 Spark 工作
org.apache.spark.launcher 套件提供使用簡單的 Java API 將 Spark 工作啟動為子程式的類別。
單元測試
Spark 對任何流行的單元測試架構都友善。只要在你的測試中建立一個 SparkContext
,將主控 URL 設定為 local
,執行你的操作,然後呼叫 SparkContext.stop()
來終止它。請務必在 finally
區塊或測試架構的 tearDown
方法中停止內容,因為 Spark 不支援在同一個程式中同時執行兩個內容。
後續步驟
你可以在 Spark 網站上看到一些 範例 Spark 程式。此外,Spark 在 examples
目錄中包含了幾個範例 (Scala、Java、Python、R)。你可以透過將類別名稱傳遞給 Spark 的 bin/run-example
腳本來執行 Java 和 Scala 範例;例如
./bin/run-example SparkPi
對於 Python 範例,請改用 spark-submit
./bin/spark-submit examples/src/main/python/pi.py
對於 R 範例,請改用 spark-submit
./bin/spark-submit examples/src/main/r/dataframe.R
若要取得最佳化程式方面的說明,組態 和 調整 指南提供了最佳實務的資訊。它們對於確保你的資料以有效率的格式儲存在記憶體中特別重要。若要取得部署方面的說明,叢集模式概觀 說明了分散式操作中所涉及的元件和支援的叢集管理員。