SparkR (R on Spark)

概觀

SparkR 是 R 套件,提供輕量級前端,可從 R 使用 Apache Spark。在 Spark 3.5.1 中,SparkR 提供分散式資料框實作,支援選取、篩選、聚合等作業(類似 R 資料框、dplyr),但適用於大型資料集。SparkR 也支援使用 MLlib 進行分散式機器學習。

SparkDataFrame

SparkDataFrame 是分散式資料集合,組織成命名欄。它在概念上等同於關係資料庫中的表格或 R 中的資料框,但底層有更豐富的最佳化。SparkDataFrames 可從各種來源建構,例如:結構化資料檔案、Hive 中的表格、外部資料庫或現有的本機 R 資料框。

此頁面上的所有範例都使用 R 或 Spark 發行版中包含的範例資料,而且可以使用 ./bin/sparkR shell 執行。

啟動:SparkSession

SparkR 的進入點是 SparkSession,它會將 R 程式連線到 Spark 群集。你可以使用 sparkR.session 建立 SparkSession,並傳入選項,例如應用程式名稱、任何依賴的 Spark 套件等。此外,你也可以透過 SparkSession 使用 SparkDataFrames。如果你從 sparkR shell 執行,SparkSession 應該已經為你建立,你不需要呼叫 sparkR.session

sparkR.session()

從 RStudio 啟動

你也可以從 RStudio 啟動 SparkR。你可以從 RStudio、R shell、Rscript 或其他 R IDE 將 R 程式連線到 Spark 群集。首先,請確定環境中已設定 SPARK_HOME(你可以檢查 Sys.getenv),載入 SparkR 套件,並呼叫 sparkR.session,如下所示。它會檢查 Spark 安裝,如果找不到,它會自動下載並快取。或者,你也可以手動執行 install.spark

除了呼叫 sparkR.session 之外,你還可以指定某些 Spark 驅動程式屬性。通常,這些 應用程式屬性執行時期環境 無法以程式設計方式設定,因為驅動程式 JVM 程序已經啟動,在這種情況下,SparkR 會為你處理這件事。若要設定它們,請將它們傳遞給你會在 sparkR.session()sparkConfig 參數中傳遞的其他組態屬性。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

以下 Spark 驅動程式屬性可以在 sparkConfig 中使用 sparkR.session 從 RStudio 設定

屬性名稱屬性群組spark-submit 等效
spark.master 應用程式屬性 --master
spark.kerberos.keytab 應用程式屬性 --keytab
spark.kerberos.principal 應用程式屬性 --principal
spark.driver.memory 應用程式屬性 --driver-memory
spark.driver.extraClassPath 執行時期環境 --driver-class-path
spark.driver.extraJavaOptions 執行時期環境 --driver-java-options
spark.driver.extraLibraryPath 執行時期環境 --driver-library-path

建立 SparkDataFrames

使用 SparkSession,應用程式可以從本機 R 資料格架、Hive 表格 或其他 資料來源 建立 SparkDataFrame

從本機資料框

建立資料格架最簡單的方法是將本機 R 資料格架轉換成 SparkDataFrame。具體來說,我們可以使用 as.DataFramecreateDataFrame,並傳入本機 R 資料格架以建立 SparkDataFrame。舉例來說,下列範例使用 R 中的 faithful 資料集建立 SparkDataFrame

df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

從資料來源

SparkR 支援透過 SparkDataFrame 介面操作各種資料來源。本節說明使用資料來源載入和儲存資料的通用方法。您可以查看 Spark SQL 程式設計指南,以取得內建資料來源可用的更多 特定選項

從資料來源建立 SparkDataFrames 的通用方法是 read.df。此方法會接收要載入的檔案路徑和資料來源類型,且目前作用中的 SparkSession 會自動使用。SparkR 支援原生讀取 JSON、CSV 和 Parquet 檔案,且透過 第三方專案 等來源提供的套件,您可以找到熱門檔案格式(例如 Avro)的資料來源連接器。這些套件可以透過指定 --packages 搭配 spark-submitsparkR 指令來新增,或者在互動式 R shell 或 RStudio 中使用 sparkPackages 參數初始化 SparkSession 時新增。

sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.1")

我們可以使用範例 JSON 輸入檔案來了解如何使用資料來源。請注意,這裡使用的檔案不是典型的 JSON 檔案。檔案中的每一行都必須包含一個獨立、自含的有效 JSON 物件。如需更多資訊,請參閱 JSON Lines 文字格式,也稱為換行區隔 JSON。因此,一般的多行 JSON 檔案最常會失敗。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

資料來源 API 原生支援 CSV 格式的輸入檔案。如需更多資訊,請參閱 SparkR read.df API 文件。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

資料來源 API 也可用於將 SparkDataFrames 儲存到多種檔案格式。例如,我們可以使用 write.df 將前一個範例的 SparkDataFrame 儲存到 Parquet 檔案中。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

從 Hive 表格

您也可以從 Hive 資料表建立 SparkDataFrames。為此,我們需要建立一個具有 Hive 支援的 SparkSession,它可以存取 Hive MetaStore 中的資料表。請注意,Spark 應該已使用 Hive 支援 建置,而且可以在 SQL 程式設計指南 中找到更多詳細資料。在 SparkR 中,它會預設嘗試建立一個已啟用 Hive 支援的 SparkSession (enableHiveSupport = TRUE)。

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame 操作

SparkDataFrames 支援多項功能來執行結構化資料處理。在此,我們會納入一些基本範例,而完整的清單可以在 API 文件中找到。

選取列、欄

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

群組、聚合

SparkR 資料框支援多項常用功能,以便在分組後彙總資料。例如,我們可以計算 faithful 資料集中 waiting 時間的直方圖,如下所示

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

除了標準彙總之外,SparkR 還支援 OLAP 立方體 運算子 cube

head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1  NA 140.8    4     22.8
##2   4  75.7    4     30.4
##3   8 400.0    3     19.2
##4   8 318.0    3     15.5
##5  NA 351.0   NA     15.8
##6  NA 275.8   NA     16.3

rollup

head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1   4  75.7    4     30.4
##2   8 400.0    3     19.2
##3   8 318.0    3     15.5
##4   4  78.7   NA     32.4
##5   8 304.0    3     15.2
##6   4  79.0   NA     27.3

運算欄

SparkR 也提供多項功能,可在資料處理和彙總期間直接套用至欄位。以下範例顯示基本算術功能的用法。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

套用使用者定義函數

在 SparkR 中,我們支援多種使用者定義函式

使用 dapplydapplyCollect 在大型資料集上執行特定函數

dapply

將函式套用至 SparkDataFrame 的每個分割區。要套用至 SparkDataFrame 的每個分割區的函式,而且應該只有一個參數,對應到每個分割區的 data.frame 將會傳遞。函式的輸出應該是一個 data.frame。Schema 會指定結果 SparkDataFrame 的列格式。它必須符合傳回值的 資料類型

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

類似 dapply,將函式套用至 SparkDataFrame 的每個分割區,並收集結果。函式的輸出應為 data.frame。但是,不需要傳遞 Schema。請注意,如果無法將在所有分割區執行的 UDF 輸出拉取至驅動程式並放入驅動程式記憶體中,則 dapplyCollect 可能會失敗。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

使用 gapplygapplyCollect 在大型資料集上執行特定函數,並依輸入欄分組

gapply

將函式套用至 SparkDataFrame 的每個群組。函式應套用至 SparkDataFrame 的每個群組,且應僅有兩個參數:群組金鑰和對應於該金鑰的 R data.frame。群組會從 SparkDataFrame 的欄位中選取。函式的輸出應為 data.frame。Schema 會指定結果 SparkDataFrame 的列格式。它必須根據 Spark 資料類型 來表示 R 函式的輸出 Schema。已傳回 data.frame 的欄位名稱是由使用者設定。

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

類似 gapply,將函式套用至 SparkDataFrame 的每個分割區,並將結果收集回 R data.frame。函式的輸出應為 data.frame。但是,不需要傳遞 Schema。請注意,如果無法將在所有分割區執行的 UDF 輸出拉取至驅動程式並放入驅動程式記憶體中,則 gapplyCollect 可能會失敗。

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

使用 spark.lapply 分散執行本機 R 函數

spark.lapply

類似於原生 R 中的 lapplyspark.lapply 會對元素清單執行函數,並透過 Spark 分散運算。以類似於 doParallellapply 的方式,將函數套用至清單的元素。所有運算的結果都應該能放入單一機器中。若非如此,他們可以執行類似 df <- createDataFrame(list) 的動作,然後使用 dapply

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

立即執行

如果啟用立即執行,資料會在建立 SparkDataFrame 時立即傳回 R 客戶端。預設情況下,立即執行並未啟用,可以在啟動 SparkSession 時,將組態屬性 spark.sql.repl.eagerEval.enabled 設定為 true 來啟用。

可以分別透過 spark.sql.repl.eagerEval.maxNumRowsspark.sql.repl.eagerEval.truncate 組態屬性,來控制要顯示資料的最大列數和每欄的最大字元數。這些屬性僅在啟用立即執行時才有效。如果未明確設定這些屬性,預設會顯示最多 20 列和每欄最多 20 個字元的資料。

# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
                                  spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))

# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")

# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2

##+-------+-----+
##|waiting|count|
##+-------+-----+
##|   43.0|    1|
##|   45.0|    3|
##|   46.0|    5|
##|   47.0|    4|
##|   48.0|    3|
##|   49.0|    5|
##|   50.0|    5|
##|   51.0|    6|
##|   52.0|    5|
##|   53.0|    7|
##+-------+-----+
##only showing top 10 rows

請注意,若要在 sparkR shell 中啟用立即執行,請將 spark.sql.repl.eagerEval.enabled=true 組態屬性新增至 --conf 選項。

從 SparkR 執行 SQL 查詢

SparkDataFrame 也可以註冊為 Spark SQL 中的暫時檢視,這可讓您對其資料執行 SQL 查詢。sql 函數可讓應用程式以程式化方式執行 SQL 查詢,並將結果傳回為 SparkDataFrame

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

機器學習

演算法

SparkR 目前支援下列機器學習演算法

分類

迴歸

樹狀結構

叢集

協同過濾

頻繁模式探勘

統計

在底層,SparkR 使用 MLlib 來訓練模型。請參閱 MLlib 使用者指南的對應區段以取得範例程式碼。使用者可以呼叫 summary 來列印已配適模型的摘要、predict 來對新資料進行預測,以及 write.ml/read.ml 來儲存/載入已配適模型。SparkR 支援模型配適的一組可用 R 公式運算子,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-’。

模型持久性

下列範例顯示如何使用 SparkR 儲存/載入 MLlib 模型。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)

unlink(modelPath)
在 Spark 儲存庫的 "examples/src/main/r/ml/ml.R" 中尋找完整的範例程式碼。

R 和 Spark 之間的資料類型對應

RSpark
位元組 位元組
整數 整數
浮點數 浮點數
雙精度浮點數 雙精度浮點數
數值 雙精度浮點數
字元 字串
字串 字串
二進位 二進位
原始 二進位
邏輯 布林
POSIXct 時間戳記
POSIXlt 時間戳記
日期 日期
陣列 陣列
清單 陣列
環境 對應

結構化串流

SparkR 支援結構化串流 API。結構化串流是建立在 Spark SQL 引擎上的可擴充且容錯的串流處理引擎。如需更多資訊,請參閱 結構化串流程式設計指南 中的 R API

SparkR 中的 Apache Arrow

Apache Arrow 是一種內存中欄位資料格式,用於 Spark 在 JVM 和 R 程序之間有效率地傳輸資料。另請參閱 PySpark 最佳化已完成,使用 Apache Arrow 的 PySpark 使用指南,適用於 Pandas。本指南旨在說明如何使用 SparkR 中的 Arrow 最佳化,並提供一些重點。

確保已安裝 Arrow

Arrow R 函式庫可在 CRAN 上取得,且可如下所述安裝。

Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'

請參閱 Apache Arrow 官方文件 以取得更多詳細資訊。

請注意,您必須確保 Arrow R 套件已安裝並在所有叢集節點上可用。目前支援的最低版本為 1.0.0;但是,由於 SparkR 中的 Arrow 最佳化為實驗性質,因此此版本可能會在次要版本之間變更。

啟用轉換至/從 R DataFrame、dapplygapply

在使用呼叫 collect(spark_df) 將 Spark DataFrame 轉換為 R DataFrame、使用 createDataFrame(r_df) 從 R DataFrame 建立 Spark DataFrame、透過 dapply(...) 將 R 原生函式套用至每個分割區,以及透過 gapply(...) 將 R 原生函式套用至群組資料時,Arrow 最佳化可用。若要在執行這些操作時使用 Arrow,使用者需要先將 Spark 組態設定為「spark.sql.execution.arrow.sparkr.enabled」為「true」。此設定預設為停用。

不論最佳化是否啟用,SparkR 都會產生相同的結果。此外,在最佳化因任何原因在實際運算前失敗時,Spark DataFrame 和 R DataFrame 之間的轉換會自動改回非 Arrow 最佳化實作。

# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)

# Converts Spark DataFrame to an R DataFrame
collect(spark_df)

# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))

# Apply an R native function to grouped data.
collect(gapply(spark_df,
               "gear",
               function(key, group) {
                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
               },
               structType("gear double, disp boolean")))

請注意,即使使用 Arrow,collect(spark_df) 仍會將 DataFrame 中的所有記錄收集到驅動程式,且應在資料的少量子集上執行。此外,gapply(...)dapply(...) 中指定的輸出架構應與給定函式傳回的 R DataFrame 相符。

支援的 SQL 類型

目前,所有 Spark SQL 資料類型都支援基於 Arrow 的轉換,但 FloatTypeBinaryTypeArrayTypeStructTypeMapType 除外。

R 函數名稱衝突

在 R 中載入並附加新的套件時,可能會發生名稱 衝突,其中一個函式會遮蔽另一個函式。

下列函式會被 SparkR 套件遮蔽

遮蔽函式如何存取
package:stats 中的 cov
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
package:stats 中的 filter
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
package:base 中的 sample base::sample(x, size, replace = FALSE, prob = NULL)

由於 SparkR 的一部分是以 dplyr 套件為範本,因此 SparkR 中的某些函式與 dplyr 中的函式名稱相同。根據這兩個套件的載入順序,載入第一個套件中的某些函式會被載入第二個套件中的函式遮蔽。在此情況下,請在這些呼叫前面加上套件名稱,例如 SparkR::cume_dist(x)dplyr::cume_dist(x)

您可以使用 search() 來檢查 R 中的搜尋路徑。

遷移指南

移轉指南現已 在此頁面上封存。