Apache Spark 範例

此頁面會透過簡單的範例,說明如何使用不同的 Apache Spark API。

Spark 是適用於小型和大型資料集的絕佳引擎。它可用於單一節點/本機環境,或分散式叢集。Spark 的擴充 API、絕佳效能和彈性,使其成為許多分析的理想選擇。本指南會說明以下 Spark API 的範例

  • 資料框
  • SQL
  • 結構化串流
  • RDD

範例使用小型資料集,以便於理解。

Spark 資料框範例

本節說明如何建立 Spark 資料框並執行簡單的運算。範例使用小型資料框,以便您輕鬆查看功能。

我們從建立 Spark 會話開始

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

部分 Spark 執行時期環境附帶預先建立的 Spark 會話。getOrCreate() 方法會使用現有的 Spark 會話,或在尚未存在時建立新的 Spark 會話。

建立 Spark 資料框

先建立一個資料框,其中包含 first_nameage 欄位,以及四列資料

df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

使用 show() 方法查看資料框的內容

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

現在,我們要在資料框上執行一些資料處理運算。

將欄位新增至 Spark 資料框

我們將 life_stage 欄位新增至資料框,如果年齡為 12 歲或以下,則傳回「child」;如果年齡介於 13 到 19 歲之間,則傳回「teenager」;如果年齡為 20 歲或以上,則傳回「adult」。

from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

將欄位新增至 Spark 資料框很簡單。我們來查看 df1 的內容。

df1.show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

請注意,原始 DataFrame 沒有變更

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

Spark 操作不會改變 DataFrame。您必須將結果指定給新的變數,才能存取 DataFrame 變更以進行後續操作。

篩選 Spark DataFrame

現在,篩選 DataFrame,使其僅包含青少年和成人。

df1.where(col("life_stage").isin(["teenager", "adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

依 Spark DataFrame 的彙總進行分組

現在,讓我們計算資料集中所有人的平均年齡

from pyspark.sql.functions import avg

df1.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+

您也可以計算每個 life_stage 的平均年齡

df1.groupBy("life_stage").avg().show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

如果您不想使用程式設計 API,Spark 讓您可以使用 SQL 對 DataFrame 執行查詢。

使用 SQL 查詢 DataFrame

以下是使用 SQL 計算所有人平均年齡的方式

spark.sql("select avg(age) from {df1}", df1=df1).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+

以下是使用 SQL 計算依 life_stage 的平均年齡的方式

spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

Spark 讓您可以使用程式設計 API、SQL API 或兩者的組合。這種彈性讓 Spark 能夠讓各種使用者存取,並具有強大的表達力。

Spark SQL 範例

讓我們將 DataFrame 持久儲存在命名的 Parquet 表格中,以便透過 SQL API 輕鬆存取。

df1.write.saveAsTable("some_people")

請確定可透過表格名稱存取表格

spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       sue| 32|     adult|
|       bob| 75|     adult|
|        li|  3|     child|
+----------+---+----------+

現在,讓我們使用 SQL 在表格中插入幾列資料

spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")

檢查表格內容以確認已插入列

spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       sue| 32|     adult|
|       bob| 75|     adult|
|        li|  3|     child|
|     frank|  4|     child|
+----------+---+----------+

執行查詢以傳回青少年

spark.sql("select * from some_people where life_stage='teenager'").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
+----------+---+----------+

Spark 讓您可以輕鬆註冊表格並使用純粹的 SQL 查詢表格。

Spark 結構化串流範例

Spark 也有結構化串流 API,讓您可以建立批次或即時串流應用程式。

讓我們看看如何使用 Spark 結構化串流從 Kafka 讀取資料,並每小時將其寫入 Parquet 表格。

假設您有一個 Kafka 串流,會持續填入下列資料

{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}

以下是將 Kafka 來源讀取到 Spark DataFrame 的方式

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", subscribeTopic)
    .load()
)

建立一個用於清除輸入資料的函數。

schema = StructType([
 StructField("student_name", StringType()),
 StructField("graduation_year", StringType()),
 StructField("major", StringType()),
])

def with_normalized_names(df, schema):
    parsed_df = (
        df.withColumn("json_data", from_json(col("value").cast("string"), schema))
        .withColumn("student_name", col("json_data.student_name"))
        .withColumn("graduation_year", col("json_data.graduation_year"))
        .withColumn("major", col("json_data.major"))
        .drop(col("json_data"))
        .drop(col("value"))
    )
    split_col = split(parsed_df["student_name"], "XX")
    return (
        parsed_df.withColumn("first_name", split_col.getItem(0))
        .withColumn("last_name", split_col.getItem(1))
        .drop("student_name")
    )

現在,建立一個函數,用於在每次執行時讀取 Kafka 中所有新的資料。

def perform_available_now_update():
    checkpointPath = "data/tmp_students_checkpoint/"
    path = "data/tmp_students"
    return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
        availableNow=True
    ).format("parquet").option("checkpointLocation", checkpointPath).start(path)

呼叫 perform_available_now_update() 函數,並查看 Parquet 表格的內容。

您可以設定一個 cron 工作,每小時執行一次 perform_available_now_update() 函數,讓您的 Parquet 表格定期更新。

Spark RDD 範例

Spark RDD API 適用於非結構化資料。

Spark DataFrame API 對於結構化資料來說較為容易且效能較佳。

假設您有一個名為 some_text.txt 的文字檔案,其中包含下列三行資料

these are words
these are more words
words in english

您想要計算文字檔案中每個字詞的數量。以下是使用 Spark RDD 執行此計算的方式

text_file = spark.sparkContext.textFile("some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

讓我們看看結果

counts.collect()

[('these', 2),
 ('are', 2),
 ('more', 1),
 ('in', 1),
 ('words', 3),
 ('english', 1)]

Spark 允許有效執行查詢,因為它將此計算平行化。許多其他查詢引擎無法平行化計算。

結論

這些範例顯示了 Spark 如何提供友善的使用者 API,用於對小型資料集進行計算。Spark 可以將這些相同的程式碼範例擴充到分散式叢集中的大型資料集。Spark 能夠處理大型和小型資料集,真是太棒了。

與其他查詢引擎相比,Spark 也有廣泛的 API。Spark 讓您可以使用程式設計 API 執行 DataFrame 操作、撰寫 SQL、執行串流分析,並進行機器學習。Spark 讓您不必學習多個架構,也不必拼湊各種函式庫來執行分析。

其他範例

Spark 提供許多其他範例

最新消息

檔案