Spark Connect 概觀
建置用戶端 Spark 應用程式
在 Apache Spark 3.4 中,Spark Connect 導入了解耦的用戶端-伺服器架構,讓您可以使用 DataFrame API 和未解析的邏輯計畫作為通訊協定,遠端連線到 Spark 集群。用戶端和伺服器之間的分離,讓 Spark 和其開放生態系可以從各處加以運用。它可以內嵌在現代資料應用程式、IDE、筆記本和程式設計語言中。
若要開始,請參閱 快速入門:Spark Connect。
Spark Connect 的運作方式
Spark Connect 函式庫旨在簡化 Spark 應用程式的開發。它是一個輕量級 API,可內嵌在任何地方:應用程式伺服器、IDE、筆記本和程式語言中。Spark Connect API 建立在 Spark 的 DataFrame API 上,使用未解析的邏輯計畫作為客戶端和 Spark 驅動程式之間的語言非相依協定。
Spark Connect 函式庫將 DataFrame 作業轉換為未解析的邏輯查詢計畫,並使用通訊協定緩衝區編碼。這些計畫會使用 gRPC 架構傳送至伺服器。
內嵌在 Spark 伺服器上的 Spark Connect 端點會接收未解析的邏輯計畫,並將其轉換為 Spark 的邏輯計畫運算子。這類似於剖析 SQL 查詢,其中會剖析屬性和關聯,並建立初始剖析計畫。從這裡開始,標準 Spark 執行程序就會啟動,確保 Spark Connect 能夠利用 Spark 的所有最佳化和強化功能。結果會透過 gRPC 以 Apache Arrow 編碼的行批次串流回傳至客戶端。
Spark Connect 的營運優點
有了這個新架構,Spark Connect 可以緩解多個多租戶營運問題
穩定性:使用過多記憶體的應用程式現在只會影響自己的環境,因為它們可以在自己的程序中執行。使用者可以在客戶端定義自己的依賴項,而不用擔心與 Spark 驅動程式發生潛在衝突。
可升級性:Spark 驅動程式現在可以獨立於應用程式無縫升級,例如受益於效能改善和安全性修正。這表示應用程式可以向前相容,只要伺服器端 RPC 定義被設計為向後相容即可。
可除錯性和可觀察性:Spark Connect 讓您可以在開發期間直接從您最愛的 IDE 進行互動式除錯。同樣地,應用程式可以使用應用程式的架構原生指標和記錄函式庫進行監控。
如何使用 Spark Connect
從 Spark 3.4 開始,Spark Connect 可用,並支援 PySpark 和 Scala 應用程式。我們將逐步說明如何使用 Spark Connect 執行 Apache Spark 伺服器,並使用 Spark Connect 客户端程式庫從客户端應用程式連線到伺服器。
下載並使用 Spark Connect 啟動 Spark 伺服器
首先,從 下載 Apache Spark 頁面下載 Spark。Spark Connect 在 Apache Spark 版本 3.4 中推出,因此請務必在頁面頂端的版本下拉式選單中選擇 3.4.0 或更新版本。然後選擇您的套件類型,通常為「預先建置給 Apache Hadoop 3.3 及更新版本」,然後按一下連結下載。
現在,在您的電腦上解壓縮剛下載的 Spark 套件,例如
tar -xvf spark-3.5.1-bin-hadoop3.tgz
在終端機視窗中,前往您先前解壓縮 Spark 的位置中的 spark
資料夾,並執行 start-connect-server.sh
腳本,以使用 Spark Connect 啟動 Spark 伺服器,如下面的範例所示
./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1
請注意,我們在啟動 Spark 伺服器時,會包含一個 Spark Connect 套件 (spark-connect_2.12:3.5.1
)。這是使用 Spark Connect 的必要條件。請務必使用與您先前下載的 Spark 版本相同的套件版本。在此範例中,為 Spark 3.5.1 搭配 Scala 2.12。
現在,Spark 伺服器正在執行,並準備接受來自客户端應用程式的 Spark Connect 會話。在下一節中,我們將逐步說明在撰寫客户端應用程式時如何使用 Spark Connect。
使用 Spark Connect 進行互動式分析
在建立 Spark 會話時,您可以指定要使用 Spark Connect,而且有幾種方法可以這樣做,如下所述。
如果您沒有使用這裡概述的其中一種機制,您的 Spark 會話將像以前一樣執行,而不會使用 Spark Connect。
設定 SPARK_REMOTE 環境變數
如果您在執行 Spark 客户端應用程式的客户端電腦上設定 SPARK_REMOTE
環境變數,並建立一個新的 Spark 會話,如下面的範例所示,則該會話將會是 Spark Connect 會話。使用此方法,無需變更程式碼即可開始使用 Spark Connect。
在終端機視窗中,將 SPARK_REMOTE
環境變數設定為指向您先前在電腦上啟動的本機 Spark 伺服器
export SPARK_REMOTE="sc://127.0.0.1"
然後像往常一樣啟動 Spark shell
./bin/pyspark
PySpark shell 現在已使用 Spark Connect 連線到 Spark,如歡迎訊息中所示
Client connected to the Spark Connect server at localhost
在建立 Spark 會話時指定 Spark Connect
您也可以在建立 Spark 會話時明確指定要使用 Spark Connect。
例如,您可以使用 Spark Connect 啟動 PySpark shell,如這裡所示。
要使用 Spark Connect 啟動 PySpark shell,只需包含 remote
參數並指定 Spark 伺服器的位址。在此範例中,我們使用 localhost
連線到先前啟動的本機 Spark 伺服器
./bin/pyspark --remote "sc://127.0.0.1"
您會注意到 PySpark shell 歡迎訊息會告訴您已使用 Spark Connect 連線到 Spark
Client connected to the Spark Connect server at localhost
您也可以檢查 Spark 會話類型。如果它包含 .connect.
,表示您正在使用 Spark Connect,如本範例所示
SparkSession available as 'spark'.
>>> type(spark)
<class 'pyspark.sql.connect.session.SparkSession'>
現在您可以在 shell 中執行 PySpark 程式碼,以查看 Spark Connect 的實際運作
>>> columns = ["id","name"]
>>> data = [(1,"Sarah"),(2,"Maria")]
>>> df = spark.createDataFrame(data).toDF(*columns)
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1|Sarah|
| 2|Maria|
+---+-----+
對於 Scala shell,我們使用目前未包含在 Apache Spark 套件中的 Ammonite 為基礎的 REPL。
要設定新的 Scala shell,請先下載並安裝 Coursier CLI。然後,在終端機視窗中使用下列指令安裝 REPL
cs install –-contrib spark-connect-repl
現在,您可以啟動以 Ammonite 為基礎的 Scala REPL/shell,以連線到您的 Spark 伺服器,如下所示
spark-connect-repl
當 REPL 成功初始化時,會出現歡迎訊息
Spark session available as 'spark'.
_____ __ ______ __
/ ___/____ ____ ______/ /__ / ____/___ ____ ____ ___ _____/ /_
\__ \/ __ \/ __ `/ ___/ //_/ / / / __ \/ __ \/ __ \/ _ \/ ___/ __/
___/ / /_/ / /_/ / / / ,< / /___/ /_/ / / / / / / / __/ /__/ /_
/____/ .___/\__,_/_/ /_/|_| \____/\____/_/ /_/_/ /_/\___/\___/\__/
/_/
預設情況下,REPL 會嘗試連線到本機 Spark 伺服器。在 shell 中執行下列 Scala 程式碼,以查看 Spark Connect 的實際運作
@ spark.range(10).count
res0: Long = 10L
設定用戶端伺服器連線
預設情況下,REPL 會嘗試連線到埠 15002 上的本機 Spark 伺服器。不過,連線可以透過多種方式設定,如本設定 參考 所述。
設定 SPARK_REMOTE 環境變數
可以在用戶端機器上設定 SPARK_REMOTE 環境變數,以自訂在 REPL 啟動時初始化的用戶端伺服器連線。
export SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG"
spark-connect-repl
或
SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG" spark-connect-repl
使用 CLI 參數
也可以透過 CLI 參數傳遞自訂設定,如下所示
spark-connect-repl --host myhost.com --port 443 --token ABCDEFG
支援的 CLI 參數清單可以在 這裡 找到。
使用連線字串以程式化方式設定
連線也可以使用 SparkSession#builder 以程式化方式建立,如下例所示
@ import org.apache.spark.sql.SparkSession
@ val spark = SparkSession.builder.remote("sc://127.0.0.1:443/;token=ABCDEFG").build()
在獨立應用程式中使用 Spark Connect
首先,使用 pip install pyspark[connect]==3.5.0
安裝 PySpark,或者如果要建置封裝的 PySpark 應用程式/函式庫,請將其新增到 setup.py 檔案中,如下所示
install_requires=[
'pyspark[connect]==3.5.0'
]
撰寫自己的程式碼時,請在建立 Spark 會話時,包含 remote
函式,並參照您的 Spark 伺服器,如下例所示
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://127.0.0.1").getOrCreate()
為了說明,我們將建立一個簡單的 Spark Connect 應用程式 SimpleApp.py
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.remote("sc://127.0.0.1").appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
此程式只計算文字檔中包含「a」的行數和包含「b」的行數。請注意,您需要將 YOUR_SPARK_HOME 替換為安裝 Spark 的位置。
我們可以使用一般的 Python 解譯器執行此應用程式,如下所示
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 72, lines with b: 39
若要將 Spark Connect 用於 Scala 應用程式/專案的一部分,我們首先需要包含正確的相依性。使用 sbt
建置系統為例,我們將下列相依性新增到 build.sbt
檔案中
libraryDependencies += "org.apache.spark" %% "spark-sql-api" % "3.5.0"
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.0"
撰寫自己的程式碼時,請在建立 Spark 會話時,包含 remote
函式,並參照您的 Spark 伺服器,如下例所示
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().remote("sc://127.0.0.1").build()
注意:參照使用者定義程式碼(例如 UDF、filter、map 等)的作業需要註冊 ClassFinder 以擷取和上傳任何必要的類別檔案。此外,任何 JAR 相依性都必須使用 SparkSession#AddArtifact
上傳到伺服器。
範例
import org.apache.spark.sql.connect.client.REPLClassDirMonitor
// Register a ClassFinder to monitor and upload the classfiles from the build output.
val classFinder = new REPLClassDirMonitor(<ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR>)
spark.registerClassFinder(classfinder)
// Upload JAR dependencies
spark.addArtifact(<ABSOLUTE_PATH_JAR_DEP>)
在此,ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR
是建置系統將類別檔案寫入的輸出目錄,而 ABSOLUTE_PATH_JAR_DEP
是 JAR 在本機檔案系統中的位置。
REPLClassDirMonitor
是 ClassFinder
提供的實作,用於監控特定目錄,但使用者可以實作自己的類別,延伸 ClassFinder
以進行自訂搜尋和監控。
用戶端應用程式驗證
雖然 Spark Connect 沒有內建驗證,但它設計為可以與您現有的驗證基礎架構無縫運作。其 gRPC HTTP/2 介面允許使用驗證代理程式,這使得在 Spark 中不用實作驗證邏輯,就能保護 Spark Connect。
Spark 3.4 支援哪些功能
PySpark:在 Spark 3.4 中,Spark Connect 支援大部分 PySpark API,包括 DataFrame、Functions 和 Column。不過,某些 API 例如 SparkContext 和 RDD 不受支援。您可以在 API 參考 文件中查看目前支援哪些 API。受支援的 API 會標示為「支援 Spark Connect」,因此在將現有程式碼移轉到 Spark Connect 之前,您可以檢查您正在使用的 API 是否可用。
Scala:在 Spark 3.5 中,Spark Connect 支援大部分 Scala API,包括 Dataset、functions、Column、Catalog 和 KeyValueGroupedDataset。
預設情況下,殼層和獨立應用程式支援使用者定義函數 (UDF),但有額外的設定需求。
大部分串流 API 受支援,包括 DataStreamReader、DataStreamWriter、StreamingQuery 和 StreamingQueryListener。
在所有 Spark Connect 版本中,SparkContext 和 RDD 等 API 已棄用。
後續的 Spark 版本計畫支援更多 API。