SparkR (R on Spark)
- 概觀
- SparkDataFrame
- 機器學習
- R 和 Spark 之間的資料類型對應
- 結構化串流
- SparkR 中的 Apache Arrow
- R 函數名稱衝突
- 遷移指南
概觀
SparkR 是 R 套件,提供輕量級前端,可從 R 使用 Apache Spark。在 Spark 3.5.1 中,SparkR 提供分散式資料框實作,支援選取、篩選、聚合等作業(類似 R 資料框、dplyr),但適用於大型資料集。SparkR 也支援使用 MLlib 進行分散式機器學習。
SparkDataFrame
SparkDataFrame 是分散式資料集合,組織成命名欄。它在概念上等同於關係資料庫中的表格或 R 中的資料框,但底層有更豐富的最佳化。SparkDataFrames 可從各種來源建構,例如:結構化資料檔案、Hive 中的表格、外部資料庫或現有的本機 R 資料框。
此頁面上的所有範例都使用 R 或 Spark 發行版中包含的範例資料,而且可以使用 ./bin/sparkR
shell 執行。
啟動:SparkSession
SparkR 的進入點是 SparkSession
,它會將 R 程式連線到 Spark 群集。你可以使用 sparkR.session
建立 SparkSession
,並傳入選項,例如應用程式名稱、任何依賴的 Spark 套件等。此外,你也可以透過 SparkSession
使用 SparkDataFrames。如果你從 sparkR
shell 執行,SparkSession
應該已經為你建立,你不需要呼叫 sparkR.session
。
sparkR.session()
從 RStudio 啟動
你也可以從 RStudio 啟動 SparkR。你可以從 RStudio、R shell、Rscript 或其他 R IDE 將 R 程式連線到 Spark 群集。首先,請確定環境中已設定 SPARK_HOME(你可以檢查 Sys.getenv),載入 SparkR 套件,並呼叫 sparkR.session
,如下所示。它會檢查 Spark 安裝,如果找不到,它會自動下載並快取。或者,你也可以手動執行 install.spark
。
除了呼叫 sparkR.session
之外,你還可以指定某些 Spark 驅動程式屬性。通常,這些 應用程式屬性 和 執行時期環境 無法以程式設計方式設定,因為驅動程式 JVM 程序已經啟動,在這種情況下,SparkR 會為你處理這件事。若要設定它們,請將它們傳遞給你會在 sparkR.session()
的 sparkConfig
參數中傳遞的其他組態屬性。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
以下 Spark 驅動程式屬性可以在 sparkConfig
中使用 sparkR.session
從 RStudio 設定
屬性名稱 | 屬性群組 | spark-submit 等效 |
---|---|---|
spark.master |
應用程式屬性 | --master |
spark.kerberos.keytab |
應用程式屬性 | --keytab |
spark.kerberos.principal |
應用程式屬性 | --principal |
spark.driver.memory |
應用程式屬性 | --driver-memory |
spark.driver.extraClassPath |
執行時期環境 | --driver-class-path |
spark.driver.extraJavaOptions |
執行時期環境 | --driver-java-options |
spark.driver.extraLibraryPath |
執行時期環境 | --driver-library-path |
建立 SparkDataFrames
使用 SparkSession
,應用程式可以從本機 R 資料格架、Hive 表格 或其他 資料來源 建立 SparkDataFrame
。
從本機資料框
建立資料格架最簡單的方法是將本機 R 資料格架轉換成 SparkDataFrame。具體來說,我們可以使用 as.DataFrame
或 createDataFrame
,並傳入本機 R 資料格架以建立 SparkDataFrame。舉例來說,下列範例使用 R 中的 faithful
資料集建立 SparkDataFrame
。
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
從資料來源
SparkR 支援透過 SparkDataFrame
介面操作各種資料來源。本節說明使用資料來源載入和儲存資料的通用方法。您可以查看 Spark SQL 程式設計指南,以取得內建資料來源可用的更多 特定選項。
從資料來源建立 SparkDataFrames 的通用方法是 read.df
。此方法會接收要載入的檔案路徑和資料來源類型,且目前作用中的 SparkSession 會自動使用。SparkR 支援原生讀取 JSON、CSV 和 Parquet 檔案,且透過 第三方專案 等來源提供的套件,您可以找到熱門檔案格式(例如 Avro)的資料來源連接器。這些套件可以透過指定 --packages
搭配 spark-submit
或 sparkR
指令來新增,或者在互動式 R shell 或 RStudio 中使用 sparkPackages
參數初始化 SparkSession 時新增。
sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.1")
我們可以使用範例 JSON 輸入檔案來了解如何使用資料來源。請注意,這裡使用的檔案不是典型的 JSON 檔案。檔案中的每一行都必須包含一個獨立、自含的有效 JSON 物件。如需更多資訊,請參閱 JSON Lines 文字格式,也稱為換行區隔 JSON。因此,一般的多行 JSON 檔案最常會失敗。
people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
資料來源 API 原生支援 CSV 格式的輸入檔案。如需更多資訊,請參閱 SparkR read.df API 文件。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
資料來源 API 也可用於將 SparkDataFrames 儲存到多種檔案格式。例如,我們可以使用 write.df
將前一個範例的 SparkDataFrame 儲存到 Parquet 檔案中。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
從 Hive 表格
您也可以從 Hive 資料表建立 SparkDataFrames。為此,我們需要建立一個具有 Hive 支援的 SparkSession,它可以存取 Hive MetaStore 中的資料表。請注意,Spark 應該已使用 Hive 支援 建置,而且可以在 SQL 程式設計指南 中找到更多詳細資料。在 SparkR 中,它會預設嘗試建立一個已啟用 Hive 支援的 SparkSession (enableHiveSupport = TRUE
)。
sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
SparkDataFrame 操作
SparkDataFrames 支援多項功能來執行結構化資料處理。在此,我們會納入一些基本範例,而完整的清單可以在 API 文件中找到。
選取列、欄
# Create the SparkDataFrame
df <- as.DataFrame(faithful)
# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48
群組、聚合
SparkR 資料框支援多項常用功能,以便在分組後彙總資料。例如,我們可以計算 faithful
資料集中 waiting
時間的直方圖,如下所示
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 70 4
##2 67 1
##3 69 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13
除了標準彙總之外,SparkR 還支援 OLAP 立方體 運算子 cube
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3
和 rollup
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3
運算欄
SparkR 也提供多項功能,可在資料處理和彙總期間直接套用至欄位。以下範例顯示基本算術功能的用法。
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
套用使用者定義函數
在 SparkR 中,我們支援多種使用者定義函式
使用 dapply
或 dapplyCollect
在大型資料集上執行特定函數
dapply
將函式套用至 SparkDataFrame
的每個分割區。要套用至 SparkDataFrame
的每個分割區的函式,而且應該只有一個參數,對應到每個分割區的 data.frame
將會傳遞。函式的輸出應該是一個 data.frame
。Schema 會指定結果 SparkDataFrame
的列格式。它必須符合傳回值的 資料類型。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300
dapplyCollect
類似 dapply
,將函式套用至 SparkDataFrame
的每個分割區,並收集結果。函式的輸出應為 data.frame
。但是,不需要傳遞 Schema。請注意,如果無法將在所有分割區執行的 UDF 輸出拉取至驅動程式並放入驅動程式記憶體中,則 dapplyCollect
可能會失敗。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
使用 gapply
或 gapplyCollect
在大型資料集上執行特定函數,並依輸入欄分組
gapply
將函式套用至 SparkDataFrame
的每個群組。函式應套用至 SparkDataFrame
的每個群組,且應僅有兩個參數:群組金鑰和對應於該金鑰的 R data.frame
。群組會從 SparkDataFrame
的欄位中選取。函式的輸出應為 data.frame
。Schema 會指定結果 SparkDataFrame
的列格式。它必須根據 Spark 資料類型 來表示 R 函式的輸出 Schema。已傳回 data.frame
的欄位名稱是由使用者設定。
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
gapplyCollect
類似 gapply
,將函式套用至 SparkDataFrame
的每個分割區,並將結果收集回 R data.frame。函式的輸出應為 data.frame
。但是,不需要傳遞 Schema。請注意,如果無法將在所有分割區執行的 UDF 輸出拉取至驅動程式並放入驅動程式記憶體中,則 gapplyCollect
可能會失敗。
# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
使用 spark.lapply
分散執行本機 R 函數
spark.lapply
類似於原生 R 中的 lapply
,spark.lapply
會對元素清單執行函數,並透過 Spark 分散運算。以類似於 doParallel
或 lapply
的方式,將函數套用至清單的元素。所有運算的結果都應該能放入單一機器中。若非如此,他們可以執行類似 df <- createDataFrame(list)
的動作,然後使用 dapply
# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# Print the summary of each model
print(model.summaries)
立即執行
如果啟用立即執行,資料會在建立 SparkDataFrame
時立即傳回 R 客戶端。預設情況下,立即執行並未啟用,可以在啟動 SparkSession
時,將組態屬性 spark.sql.repl.eagerEval.enabled
設定為 true
來啟用。
可以分別透過 spark.sql.repl.eagerEval.maxNumRows
和 spark.sql.repl.eagerEval.truncate
組態屬性,來控制要顯示資料的最大列數和每欄的最大字元數。這些屬性僅在啟用立即執行時才有效。如果未明確設定這些屬性,預設會顯示最多 20 列和每欄最多 20 個字元的資料。
# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2
##+-------+-----+
##|waiting|count|
##+-------+-----+
##| 43.0| 1|
##| 45.0| 3|
##| 46.0| 5|
##| 47.0| 4|
##| 48.0| 3|
##| 49.0| 5|
##| 50.0| 5|
##| 51.0| 6|
##| 52.0| 5|
##| 53.0| 7|
##+-------+-----+
##only showing top 10 rows
請注意,若要在 sparkR
shell 中啟用立即執行,請將 spark.sql.repl.eagerEval.enabled=true
組態屬性新增至 --conf
選項。
從 SparkR 執行 SQL 查詢
SparkDataFrame 也可以註冊為 Spark SQL 中的暫時檢視,這可讓您對其資料執行 SQL 查詢。sql
函數可讓應用程式以程式化方式執行 SQL 查詢,並將結果傳回為 SparkDataFrame
。
# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")
# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin
機器學習
演算法
SparkR 目前支援下列機器學習演算法
分類
spark.logit
:邏輯迴歸
spark.mlp
:多層感知器 (MLP)
spark.naiveBayes
:朴素貝氏
spark.svmLinear
:線性支援向量機
spark.fmClassifier
:因子分解機分類器
迴歸
spark.survreg
:加速失效時間 (AFT) 生存模型
spark.glm
或glm
:廣義線性模型 (GLM)
spark.isoreg
:等值回歸
spark.lm
:線性回歸
spark.fmRegressor
:因子分解機回歸器
樹狀結構
spark.decisionTree
:決策樹用於
回歸
和
分類
spark.gbt
:梯度提升樹用於
回歸
和
分類
spark.randomForest
:隨機森林用於
回歸
和
分類
叢集
spark.bisectingKmeans
:二分 k 平均
spark.gaussianMixture
:高斯混合模型 (GMM)
spark.kmeans
:K 平均
spark.lda
:潛在狄利克雷配置 (LDA)
spark.powerIterationClustering (PIC)
:冪次迭代聚類 (PIC)
協同過濾
頻繁模式探勘
統計
spark.kstest
:柯爾莫哥洛夫-史密諾夫檢定
在底層,SparkR 使用 MLlib 來訓練模型。請參閱 MLlib 使用者指南的對應區段以取得範例程式碼。使用者可以呼叫 summary
來列印已配適模型的摘要、predict 來對新資料進行預測,以及 write.ml/read.ml 來儲存/載入已配適模型。SparkR 支援模型配適的一組可用 R 公式運算子,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-’。
模型持久性
下列範例顯示如何使用 SparkR 儲存/載入 MLlib 模型。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
R 和 Spark 之間的資料類型對應
R | Spark |
---|---|
位元組 | 位元組 |
整數 | 整數 |
浮點數 | 浮點數 |
雙精度浮點數 | 雙精度浮點數 |
數值 | 雙精度浮點數 |
字元 | 字串 |
字串 | 字串 |
二進位 | 二進位 |
原始 | 二進位 |
邏輯 | 布林 |
POSIXct | 時間戳記 |
POSIXlt | 時間戳記 |
日期 | 日期 |
陣列 | 陣列 |
清單 | 陣列 |
環境 | 對應 |
結構化串流
SparkR 支援結構化串流 API。結構化串流是建立在 Spark SQL 引擎上的可擴充且容錯的串流處理引擎。如需更多資訊,請參閱 結構化串流程式設計指南 中的 R API
SparkR 中的 Apache Arrow
Apache Arrow 是一種內存中欄位資料格式,用於 Spark 在 JVM 和 R 程序之間有效率地傳輸資料。另請參閱 PySpark 最佳化已完成,使用 Apache Arrow 的 PySpark 使用指南,適用於 Pandas。本指南旨在說明如何使用 SparkR 中的 Arrow 最佳化,並提供一些重點。
確保已安裝 Arrow
Arrow R 函式庫可在 CRAN 上取得,且可如下所述安裝。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
請參閱 Apache Arrow 官方文件 以取得更多詳細資訊。
請注意,您必須確保 Arrow R 套件已安裝並在所有叢集節點上可用。目前支援的最低版本為 1.0.0;但是,由於 SparkR 中的 Arrow 最佳化為實驗性質,因此此版本可能會在次要版本之間變更。
啟用轉換至/從 R DataFrame、dapply
和 gapply
在使用呼叫 collect(spark_df)
將 Spark DataFrame 轉換為 R DataFrame、使用 createDataFrame(r_df)
從 R DataFrame 建立 Spark DataFrame、透過 dapply(...)
將 R 原生函式套用至每個分割區,以及透過 gapply(...)
將 R 原生函式套用至群組資料時,Arrow 最佳化可用。若要在執行這些操作時使用 Arrow,使用者需要先將 Spark 組態設定為「spark.sql.execution.arrow.sparkr.enabled」為「true」。此設定預設為停用。
不論最佳化是否啟用,SparkR 都會產生相同的結果。此外,在最佳化因任何原因在實際運算前失敗時,Spark DataFrame 和 R DataFrame 之間的轉換會自動改回非 Arrow 最佳化實作。
# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)
# Converts Spark DataFrame to an R DataFrame
collect(spark_df)
# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
# Apply an R native function to grouped data.
collect(gapply(spark_df,
"gear",
function(key, group) {
data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
},
structType("gear double, disp boolean")))
請注意,即使使用 Arrow,collect(spark_df)
仍會將 DataFrame 中的所有記錄收集到驅動程式,且應在資料的少量子集上執行。此外,gapply(...)
和 dapply(...)
中指定的輸出架構應與給定函式傳回的 R DataFrame 相符。
支援的 SQL 類型
目前,所有 Spark SQL 資料類型都支援基於 Arrow 的轉換,但 FloatType
、BinaryType
、ArrayType
、StructType
和 MapType
除外。
R 函數名稱衝突
在 R 中載入並附加新的套件時,可能會發生名稱 衝突,其中一個函式會遮蔽另一個函式。
下列函式會被 SparkR 套件遮蔽
遮蔽函式 | 如何存取 |
---|---|
package:stats 中的 cov |
|
package:stats 中的 filter |
|
package:base 中的 sample |
base::sample(x, size, replace = FALSE, prob = NULL) |
由於 SparkR 的一部分是以 dplyr
套件為範本,因此 SparkR 中的某些函式與 dplyr
中的函式名稱相同。根據這兩個套件的載入順序,載入第一個套件中的某些函式會被載入第二個套件中的函式遮蔽。在此情況下,請在這些呼叫前面加上套件名稱,例如 SparkR::cume_dist(x)
或 dplyr::cume_dist(x)
。
您可以使用 search()
來檢查 R 中的搜尋路徑。
遷移指南
移轉指南現已 在此頁面上封存。