快速入門

本教學課程提供有關如何使用 Spark 的快速入門。我們將先透過 Spark 的互動式 shell (使用 Python 或 Scala) 介紹 API,然後說明如何在 Java、Scala 和 Python 中撰寫應用程式。

若要按照本指南進行操作,請先從 Spark 網站 下載 Spark 的封裝版本。由於我們不會使用 HDFS,因此您可以為任何版本的 Hadoop 下載封裝。

請注意,在 Spark 2.0 之前,Spark 的主要程式設計介面是彈性分散式資料集 (RDD)。在 Spark 2.0 之後,RDD 已被資料集取代,資料集與 RDD 類似,具有強類型,但底層有更豐富的最佳化。RDD 介面仍受支援,您可以在 RDD 程式設計指南 中取得更詳細的參考。不過,我們強烈建議您改用資料集,其效能優於 RDD。請參閱 SQL 程式設計指南 以取得有關資料集的更多資訊。

使用 Spark Shell 進行互動式分析

基礎

Spark 的 shell 提供學習 API 的簡單方式,以及分析資料的強大工具。它可以在 Scala(在 Java VM 上執行,因此是使用現有 Java 函式庫的好方法)或 Python 中使用。在 Spark 目錄中執行下列指令以啟動它

./bin/pyspark

或者,如果在您目前的環境中使用 pip 安裝 PySpark

pyspark

Spark 的主要抽象化是稱為資料集的分散式項目集合。資料集可以從 Hadoop InputFormats(例如 HDFS 檔案)建立,或透過轉換其他資料集建立。由於 Python 的動態特性,我們不需要在 Python 中將資料集設定為強類型。因此,Python 中的所有資料集都是 Dataset[Row],我們稱之為 DataFrame,以符合 Pandas 和 R 中的資料框概念。讓我們從 Spark 來源目錄中的 README 檔案文字建立新的 DataFrame

>>> textFile = spark.read.text("README.md")

您可以直接從 DataFrame 取得值,方法是呼叫一些動作,或轉換 DataFrame 以取得新的 DataFrame。如需更多詳細資料,請閱讀 API 文件

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

現在,讓我們將此 DataFrame 轉換為新的 DataFrame。我們呼叫 filter 以傳回新的 DataFrame,其中包含檔案中的一組子行。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

我們可以串連轉換和動作

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15
./bin/spark-shell

Spark 的主要抽象化是稱為資料集的分散式項目集合。資料集可以從 Hadoop InputFormats(例如 HDFS 檔案)建立,或透過轉換其他資料集建立。讓我們從 Spark 來源目錄中的 README 檔案文字建立新的資料集

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

您可以直接從資料集取得值,方法是呼叫一些動作,或轉換資料集以取得新的資料集。如需更多詳細資料,請閱讀 API 文件

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

現在,讓我們將此資料集轉換為新的資料集。我們呼叫 filter 以傳回新的資料集,其中包含檔案中的一組子項目。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我們可以串連轉換和動作

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多資料集操作

資料集動作和轉換可用於更複雜的運算。假設我們想要找出字數最多的那一行

>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]

這會先將一行對應到一個整數值,並將它命名為「numWords」,建立一個新的 DataFrame。在該 DataFrame 上呼叫 agg 來找出最大的字數。傳遞給 selectagg 的參數都是 Column,我們可以使用 df.colName 從 DataFrame 取得一欄。我們也可以匯入 pyspark.sql.functions,它提供了許多方便的功能,可以從舊的 Column 建立新的 Column。

MapReduce 是一種常見的資料流模式,由 Hadoop 推廣。Spark 可以輕鬆實作 MapReduce 流程

>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

在此,我們在 select 中使用 explode 函數,將一組行的 Dataset 轉換成一組字詞的 Dataset,然後結合 groupBycount 來計算檔案中每個字詞的計數,形成一個有 2 欄的 DataFrame:「word」和「count」。若要收集 shell 中的字詞計數,我們可以呼叫 collect

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15

這會先將一行對應到一個整數值,建立一個新的 Dataset。在該 Dataset 上呼叫 reduce 來找出最大的字數。傳遞給 mapreduce 的參數是 Scala 函數文字 (封閉),可以使用任何語言功能或 Scala/Java 函式庫。例如,我們可以輕鬆呼叫在其他地方宣告的函數。我們將使用 Math.max() 函數,讓這段程式碼更容易理解

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

MapReduce 是一種常見的資料流模式,由 Hadoop 推廣。Spark 可以輕鬆實作 MapReduce 流程

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

在此,我們呼叫 flatMap 將一組行的 Dataset 轉換成一組字詞的 Dataset,然後結合 groupByKeycount 來計算檔案中每個字詞的計數,形成一個 (String, Long) 成對的 Dataset。若要收集 shell 中的字詞計數,我們可以呼叫 collect

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

快取

Spark 也支援將資料集拉入叢集範圍內記憶體快取中。當資料重複存取時,這會非常有用,例如查詢小型「熱門」資料集或執行像 PageRank 等反覆運算演算法時。舉個簡單的例子,我們將 linesWithSpark 資料集標記為快取

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

使用 Spark 來探索和快取 100 行文字檔可能看起來很傻。有趣的是,這些相同的功能可以用於非常大的資料集,即使這些資料集分佈在數十或數百個節點上也是如此。您也可以透過將 bin/pyspark 連線到叢集來互動執行此操作,如 RDD 程式設計指南 中所述。

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

使用 Spark 來探索和快取 100 行文字檔可能看起來很傻。有趣的是,這些相同的功能可以用於非常大的資料集,即使這些資料集分佈在數十或數百個節點上也是如此。您也可以透過將 bin/spark-shell 連線到叢集來互動執行此操作,如 RDD 程式設計指南 中所述。

獨立應用程式

假設我們希望使用 Spark API 編寫自給自足的應用程式。我們將逐步示範使用 Scala (搭配 sbt)、Java (搭配 Maven) 和 Python (pip) 編寫簡單的應用程式。

現在我們將示範如何使用 Python API (PySpark) 編寫應用程式。

如果您正在建置封裝的 PySpark 應用程式或函式庫,您可以將其新增到 setup.py 檔案中,如下所示

    install_requires=[
        'pyspark==3.5.1'
    ]

舉例來說,我們將建立一個簡單的 Spark 應用程式,SimpleApp.py

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

這個程式只計算文字檔中包含「a」和包含「b」的行數。請注意,您需要將 YOUR_SPARK_HOME 替換為 Spark 安裝的位置。與 Scala 和 Java 範例一樣,我們使用 SparkSession 來建立資料集。對於使用自訂類別或第三方函式庫的應用程式,我們也可以透過 spark-submit--py-files 參數新增程式碼相依性,方法是將它們封裝到 .zip 檔案中 (請參閱 spark-submit --help 以取得詳細資料)。SimpleApp 夠簡單,我們不需要指定任何程式碼相依性。

我們可以使用 bin/spark-submit 指令碼執行這個應用程式

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果您已將 PySpark pip 安裝到您的環境中 (例如,pip install pyspark),您可以使用一般 Python 解譯器執行您的應用程式,或使用提供的「spark-submit」,依您的喜好而定。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

我們將建立一個非常簡單的 Spark 應用程式,使用 Scala 撰寫,事實上,它簡單到取名為 SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

請注意,應用程式應定義 main() 方法,而不是延伸 scala.Appscala.App 的子類別可能無法正常運作。

這個程式只計算 Spark README 中包含「a」和包含「b」的行數。請注意,您需要將 YOUR_SPARK_HOME 替換為 Spark 安裝的位置。與先前使用 Spark shell 的範例不同,Spark shell 會初始化自己的 SparkSession,我們會在程式中初始化 SparkSession。

我們呼叫 SparkSession.builder 來建構 SparkSession,然後設定應用程式名稱,最後呼叫 getOrCreate 來取得 SparkSession 執行個體。

我們的應用程式仰賴 Spark API,因此我們也會包含一個 sbt 設定檔 build.sbt,說明 Spark 是相依性。這個檔案也新增 Spark 所依賴的儲存庫

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"

為了讓 sbt 正常運作,我們需要根據典型的目錄結構配置 SimpleApp.scalabuild.sbt。配置完成後,我們可以建立包含應用程式程式碼的 JAR 套件,然後使用 spark-submit 指令碼來執行我們的程式。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

這個範例將使用 Maven 來編譯應用程式 JAR,但任何類似的建置系統都可行。

我們將建立一個非常簡單的 Spark 應用程式 SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

這個程式只計算 Spark README 中包含「a」和包含「b」的行數。請注意,您需要將 YOUR_SPARK_HOME 替換為 Spark 安裝的位置。與先前使用 Spark shell 的範例不同,Spark shell 會初始化自己的 SparkSession,我們會在程式中初始化 SparkSession。

為了建置這個程式,我們也撰寫一個 Maven pom.xml 檔案,將 Spark 列為相依性。請注意,Spark 人工製品會標記為 Scala 版本。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.5.1</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

我們根據正規的 Maven 目錄結構配置這些檔案

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

現在,我們可以使用 Maven 封裝應用程式,並使用 ./bin/spark-submit 執行它。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

其他相依性管理工具,例如 Conda 和 pip,也可使用於自訂類別或第三方函式庫。另請參閱 Python 套件管理

後續步驟

恭喜您執行第一個 Spark 應用程式!

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R