此頁面會透過簡單的範例,說明如何使用不同的 Apache Spark API。
Spark 是適用於小型和大型資料集的絕佳引擎。它可用於單一節點/本機環境,或分散式叢集。Spark 的擴充 API、絕佳效能和彈性,使其成為許多分析的理想選擇。本指南會說明以下 Spark API 的範例
範例使用小型資料集,以便於理解。
本節說明如何建立 Spark 資料框並執行簡單的運算。範例使用小型資料框,以便您輕鬆查看功能。
我們從建立 Spark 會話開始
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
部分 Spark 執行時期環境附帶預先建立的 Spark 會話。getOrCreate()
方法會使用現有的 Spark 會話,或在尚未存在時建立新的 Spark 會話。
建立 Spark 資料框
先建立一個資料框,其中包含 first_name
和 age
欄位,以及四列資料
df = spark.createDataFrame(
[
("sue", 32),
("li", 3),
("bob", 75),
("heo", 13),
],
["first_name", "age"],
)
使用 show()
方法查看資料框的內容
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
現在,我們要在資料框上執行一些資料處理運算。
將欄位新增至 Spark 資料框
我們將 life_stage
欄位新增至資料框,如果年齡為 12 歲或以下,則傳回「child」;如果年齡介於 13 到 19 歲之間,則傳回「teenager」;如果年齡為 20 歲或以上,則傳回「adult」。
from pyspark.sql.functions import col, when
df1 = df.withColumn(
"life_stage",
when(col("age") < 13, "child")
.when(col("age").between(13, 19), "teenager")
.otherwise("adult"),
)
將欄位新增至 Spark 資料框很簡單。我們來查看 df1
的內容。
df1.show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| li| 3| child|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
請注意,原始 DataFrame 沒有變更
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
Spark 操作不會改變 DataFrame。您必須將結果指定給新的變數,才能存取 DataFrame 變更以進行後續操作。
篩選 Spark DataFrame
現在,篩選 DataFrame,使其僅包含青少年和成人。
df1.where(col("life_stage").isin(["teenager", "adult"])).show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
依 Spark DataFrame 的彙總進行分組
現在,讓我們計算資料集中所有人的平均年齡
from pyspark.sql.functions import avg
df1.select(avg("age")).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
您也可以計算每個 life_stage
的平均年齡
df1.groupBy("life_stage").avg().show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
如果您不想使用程式設計 API,Spark 讓您可以使用 SQL 對 DataFrame 執行查詢。
使用 SQL 查詢 DataFrame
以下是使用 SQL 計算所有人平均年齡的方式
spark.sql("select avg(age) from {df1}", df1=df1).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
以下是使用 SQL 計算依 life_stage
的平均年齡的方式
spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
Spark 讓您可以使用程式設計 API、SQL API 或兩者的組合。這種彈性讓 Spark 能夠讓各種使用者存取,並具有強大的表達力。
讓我們將 DataFrame 持久儲存在命名的 Parquet 表格中,以便透過 SQL API 輕鬆存取。
df1.write.saveAsTable("some_people")
請確定可透過表格名稱存取表格
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
+----------+---+----------+
現在,讓我們使用 SQL 在表格中插入幾列資料
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")
檢查表格內容以確認已插入列
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
| frank| 4| child|
+----------+---+----------+
執行查詢以傳回青少年
spark.sql("select * from some_people where life_stage='teenager'").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
+----------+---+----------+
Spark 讓您可以輕鬆註冊表格並使用純粹的 SQL 查詢表格。
Spark 也有結構化串流 API,讓您可以建立批次或即時串流應用程式。
讓我們看看如何使用 Spark 結構化串流從 Kafka 讀取資料,並每小時將其寫入 Parquet 表格。
假設您有一個 Kafka 串流,會持續填入下列資料
{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}
以下是將 Kafka 來源讀取到 Spark DataFrame 的方式
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", subscribeTopic)
.load()
)
建立一個用於清除輸入資料的函數。
schema = StructType([
StructField("student_name", StringType()),
StructField("graduation_year", StringType()),
StructField("major", StringType()),
])
def with_normalized_names(df, schema):
parsed_df = (
df.withColumn("json_data", from_json(col("value").cast("string"), schema))
.withColumn("student_name", col("json_data.student_name"))
.withColumn("graduation_year", col("json_data.graduation_year"))
.withColumn("major", col("json_data.major"))
.drop(col("json_data"))
.drop(col("value"))
)
split_col = split(parsed_df["student_name"], "XX")
return (
parsed_df.withColumn("first_name", split_col.getItem(0))
.withColumn("last_name", split_col.getItem(1))
.drop("student_name")
)
現在,建立一個函數,用於在每次執行時讀取 Kafka 中所有新的資料。
def perform_available_now_update():
checkpointPath = "data/tmp_students_checkpoint/"
path = "data/tmp_students"
return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
availableNow=True
).format("parquet").option("checkpointLocation", checkpointPath).start(path)
呼叫 perform_available_now_update()
函數,並查看 Parquet 表格的內容。
您可以設定一個 cron 工作,每小時執行一次 perform_available_now_update()
函數,讓您的 Parquet 表格定期更新。
Spark RDD API 適用於非結構化資料。
Spark DataFrame API 對於結構化資料來說較為容易且效能較佳。
假設您有一個名為 some_text.txt
的文字檔案,其中包含下列三行資料
these are words
these are more words
words in english
您想要計算文字檔案中每個字詞的數量。以下是使用 Spark RDD 執行此計算的方式
text_file = spark.sparkContext.textFile("some_words.txt")
counts = (
text_file.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
讓我們看看結果
counts.collect()
[('these', 2),
('are', 2),
('more', 1),
('in', 1),
('words', 3),
('english', 1)]
Spark 允許有效執行查詢,因為它將此計算平行化。許多其他查詢引擎無法平行化計算。
這些範例顯示了 Spark 如何提供友善的使用者 API,用於對小型資料集進行計算。Spark 可以將這些相同的程式碼範例擴充到分散式叢集中的大型資料集。Spark 能夠處理大型和小型資料集,真是太棒了。
與其他查詢引擎相比,Spark 也有廣泛的 API。Spark 讓您可以使用程式設計 API 執行 DataFrame 操作、撰寫 SQL、執行串流分析,並進行機器學習。Spark 讓您不必學習多個架構,也不必拼湊各種函式庫來執行分析。
Spark 提供許多其他範例