Apache Avro 資料來源指南
- 部署
- 載入和儲存函數
- to_avro() 和 from_avro()
- 資料來源選項
- 設定
- 與 Databricks spark-avro 相容
- Avro -> Spark SQL 轉換支援的類型
- Spark SQL -> Avro 轉換支援的類型
自 Spark 2.4 發行以來,Spark SQL 提供內建支援,可讀取和寫入 Apache Avro 資料。
部署
spark-avro
模組是外部的,預設不會包含在 spark-submit
或 spark-shell
中。
與任何 Spark 應用程式一樣,spark-submit
用於啟動您的應用程式。 spark-avro_2.12
及其相依性可以使用 --packages
直接新增到 spark-submit
,例如:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
對於在 spark-shell
上進行實驗,您也可以使用 --packages
直接新增 org.apache.spark:spark-avro_2.12
及其相依性:
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
請參閱 應用程式提交指南,以取得有關提交具有外部相依性的應用程式的更多詳細資訊。
載入和儲存函數
由於 spark-avro
模組是外部的,因此 DataFrameReader
或 DataFrameWriter
中沒有 .avro
API。
若要載入/儲存 Avro 格式的資料,您需要將資料來源選項 format
指定為 avro
(或 org.apache.spark.sql.avro
)。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
to_avro() 和 from_avro()
Avro 套件提供函數 to_avro
,以將欄位編碼為 Avro 格式的二進制,以及 from_avro()
,以將 Avro 二進制資料解碼為欄位。這兩個函數都會將一個欄位轉換為另一個欄位,而輸入/輸出 SQL 資料類型可以是複雜類型或原始類型。
在從 Kafka 等串流來源讀取或寫入時,使用 Avro 記錄作為欄位非常有用。每個 Kafka 鍵值記錄都會增加一些元資料,例如,傳輸到 Kafka 的時間戳記、Kafka 中的偏移量等。
- 如果包含資料的「值」欄位在 Avro 中,您可以使用
from_avro()
萃取資料、豐富資料、清除資料,然後再次將資料推送到 Kafka 或寫入檔案。 to_avro()
可用於將結構轉換為 Avro 記錄。當您想要在將資料寫入 Kafka 時將多個欄位重新編碼為一個欄位時,這個方法特別有用。
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()
import org.apache.spark.sql.avro.functions._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"user")
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as $"value")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;
// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.select(to_avro(col("user.name")).as("value"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)
資料來源選項
Avro 的資料來源選項可透過以下方式設定:
- 在
DataFrameReader
或DataFrameWriter
上的.option
方法。 - 函數
from_avro
中的options
參數。
屬性名稱 | 預設值 | 意義 | 範圍 | 版本 |
---|---|---|---|---|
avroSchema |
無 | 使用者提供的 JSON 格式選用架構。
|
讀取、寫入和函數 from_avro |
2.4.0 |
recordName |
topLevelRecord | 寫入結果中的頂層記錄名稱,這是 Avro 規格中所要求的。 | 寫入 | 2.4.0 |
recordNamespace |
"" | 寫入結果中的記錄命名空間。 | 寫入 | 2.4.0 |
ignoreExtension |
true | 此選項控制在讀取時忽略沒有 .avro 副檔名的檔案。如果啟用此選項,則會載入所有檔案(有和沒有 .avro 副檔名)。此選項已過時,且會在未來的版本中移除。請使用一般資料來源選項 pathGlobFilter 來過濾檔案名稱。 |
讀取 | 2.4.0 |
compression |
snappy | compression 選項允許指定寫入時使用的壓縮編解碼器。目前支援的編解碼器為 uncompressed 、snappy 、deflate 、bzip2 、xz 和 zstandard 。如果未設定選項,則會考量設定 spark.sql.avro.compression.codec 。 |
寫入 | 2.4.0 |
mode |
FAILFAST | mode 選項允許指定函數 from_avro 的剖析模式。目前支援的模式為
|
函式 from_avro |
2.4.0 |
datetimeRebaseMode |
(spark.sql.avro.datetimeRebaseModeInRead 設定的值) |
datetimeRebaseMode 選項可指定 date 、timestamp-micros 、timestamp-millis 邏輯類型值的重新設定基準模式,從儒略曆轉換為前儒略格里曆。目前支援的模式為
|
讀取和函式 from_avro |
3.2.0 |
positionalFieldMatching |
false | 這可與 `avroSchema` 選項搭配使用,調整提供的 Avro 架構中欄位與 SQL 架構中欄位配對的行為。預設情況下,配對會使用欄位名稱執行,忽略其位置。如果將此選項設定為「true」,配對將根據欄位的位置執行。 | 讀取和寫入 | 3.2.0 |
enableStableIdentifiersForUnionType |
false | 如果設定為 true,Avro 架構會反序列化為 Spark SQL 架構,而 Avro Union 類型會轉換為欄位名稱與其各自類型保持一致的結構。產生的欄位名稱會轉換為小寫,例如 member_int 或 member_string。如果兩個使用者定義的類型名稱或使用者定義的類型名稱和內建類型名稱相同(不分大小寫),系統會引發例外狀況。但在其他情況下,欄位名稱可以唯一識別。 | 讀取 | 3.5.0 |
設定
可以使用 SparkSession 上的 setConf
方法或使用 SQL 執行 SET key=value
指令來設定 Avro。
屬性名稱 | 預設值 | 意義 | 版本 |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true |
如果設定為 true,資料來源提供者 com.databricks.spark.avro 會對應到內建但外部的 Avro 資料來源模組,以維持向後相容性。
注意:SQL 設定已在 Spark 3.2 中棄用,未來可能會移除。 |
2.4.0 |
spark.sql.avro.compression.codec | snappy | 用於寫入 AVRO 檔案的壓縮編解碼器。支援的編解碼器:未壓縮、deflate、snappy、bzip2、xz 和 zstandard。預設編解碼器為 snappy。 | 2.4.0 |
spark.sql.avro.deflate.level | -1 | 用於寫入 AVRO 檔案的 deflate 編解碼器的壓縮層級。有效值必須在 1 到 9(含)或 -1 的範圍內。預設值為 -1,在目前的實作中對應到層級 6。 | 2.4.0 |
spark.sql.avro.datetimeRebaseModeInRead | 例外 |
date 、timestamp-micros 、timestamp-millis 邏輯類型值從儒略曆轉換為前儒略格里曆的重新定基模式
|
3.0.0 |
spark.sql.avro.datetimeRebaseModeInWrite | 例外 |
date 、timestamp-micros 、timestamp-millis 邏輯類型值從前儒略格里曆轉換為儒略曆的重新定基模式
|
3.0.0 |
spark.sql.avro.filterPushdown.enabled | true | 如果為 true,則啟用將篩選下推至 Avro 資料來源。 | 3.1.0 |
與 Databricks spark-avro 相容
此 Avro 資料來源模組最初來自且與 Databricks 的開放原始碼儲存庫 spark-avro 相容。
預設情況下,當 SQL 設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
已啟用,資料來源提供者 com.databricks.spark.avro
會對應到此內建 Avro 模組。對於在目錄中繼資料儲存庫中使用 Provider
屬性為 com.databricks.spark.avro
建立的 Spark 表格,如果您使用此內建 Avro 模組,則對應是載入這些表格的必要條件。
請注意,在 Databricks 的 spark-avro 中,隱含類別 AvroDataFrameWriter
和 AvroDataFrameReader
是為捷徑函式 .avro()
建立的。在此內建但外部模組中,兩個隱含類別都已移除。請改用 DataFrameWriter
或 DataFrameReader
中的 .format("avro")
,這應該足夠且乾淨。
如果您偏好使用自己建立的 spark-avro
jar 檔案,您可以簡單停用設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
,並在部署應用程式時使用選項 --jars
。請閱讀應用程式提交指南中的 進階相依性管理 章節以取得更多詳細資訊。
Avro -> Spark SQL 轉換支援的類型
目前 Spark 支援讀取 Avro 記錄中的所有 基本類型 和 複雜類型。
Avro 類型 | Spark SQL 類型 |
---|---|
布林 | 布林類型 |
整數 | 整數類型 |
長整數 | 長整數類型 |
浮點數 | 浮點數類型 |
雙精度浮點數 | 雙精度浮點數類型 |
字串 | 字串類型 |
列舉 | 字串類型 |
固定 | 二進位類型 |
位元組 | 二進位類型 |
記錄 | 結構類型 |
陣列 | 陣列類型 |
對應 | 對應類型 |
聯合 | 請參閱下方 |
除了上面列出的類型之外,它還支援讀取 union
類型。以下三種類型被視為基本 union
類型
union(int, long)
將會對應到 LongType。union(float, double)
將會對應到 DoubleType。union(something, null)
,其中 something 是任何受支援的 Avro 類型。這將會對應到與 something 相同的 Spark SQL 類型,其中 nullable 設定為 true。所有其他聯合類型都被視為複雜類型。它們將會對應到結構類型,其中欄位名稱為 member0、member1 等,依據聯合的成員而定。這與在 Avro 和 Parquet 之間進行轉換時的行為一致。
它也支援讀取以下 Avro 邏輯類型
Avro 邏輯類型 | Avro 類型 | Spark SQL 類型 |
---|---|---|
日期 | 整數 | 日期類型 |
時間戳記毫秒 | 長整數 | 時間戳記類型 |
時間戳記微秒 | 長整數 | 時間戳記類型 |
十進位數 | 固定 | 十進位數類型 |
十進位數 | 位元組 | 十進位數類型 |
目前,它會忽略 Avro 檔案中出現的說明文件、別名和其他屬性。
Spark SQL -> Avro 轉換支援的類型
Spark 支援將所有 Spark SQL 類型寫入 Avro。對於大多數類型,從 Spark 類型到 Avro 類型的對應很簡單(例如,整數類型會轉換為整數);但是,有幾個特殊情況列於下方
Spark SQL 類型 | Avro 類型 | Avro 邏輯類型 |
---|---|---|
位元組類型 | 整數 | |
短整數類型 | 整數 | |
二進位類型 | 位元組 | |
日期類型 | 整數 | 日期 |
時間戳記類型 | 長整數 | 時間戳記微秒 |
十進位數類型 | 固定 | 十進位數 |
您也可以使用選項 avroSchema
指定整個輸出 Avro 架構,以便將 Spark SQL 類型轉換為其他 Avro 類型。以下轉換不會預設套用,需要使用者指定的 Avro 架構
Spark SQL 類型 | Avro 類型 | Avro 邏輯類型 |
---|---|---|
二進位類型 | 固定 | |
字串類型 | 列舉 | |
時間戳記類型 | 長整數 | 時間戳記毫秒 |
十進位數類型 | 位元組 | 十進位數 |