Apache Avro 資料來源指南

自 Spark 2.4 發行以來,Spark SQL 提供內建支援,可讀取和寫入 Apache Avro 資料。

部署

spark-avro 模組是外部的,預設不會包含在 spark-submitspark-shell 中。

與任何 Spark 應用程式一樣,spark-submit 用於啟動您的應用程式。 spark-avro_2.12 及其相依性可以使用 --packages 直接新增到 spark-submit,例如:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...

對於在 spark-shell 上進行實驗,您也可以使用 --packages 直接新增 org.apache.spark:spark-avro_2.12 及其相依性:

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.1 ...

請參閱 應用程式提交指南,以取得有關提交具有外部相依性的應用程式的更多詳細資訊。

載入和儲存函數

由於 spark-avro 模組是外部的,因此 DataFrameReaderDataFrameWriter 中沒有 .avro API。

若要載入/儲存 Avro 格式的資料,您需要將資料來源選項 format 指定為 avro(或 org.apache.spark.sql.avro)。

df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro() 和 from_avro()

Avro 套件提供函數 to_avro,以將欄位編碼為 Avro 格式的二進制,以及 from_avro(),以將 Avro 二進制資料解碼為欄位。這兩個函數都會將一個欄位轉換為另一個欄位,而輸入/輸出 SQL 資料類型可以是複雜類型或原始類型。

在從 Kafka 等串流來源讀取或寫入時,使用 Avro 記錄作為欄位非常有用。每個 Kafka 鍵值記錄都會增加一些元資料,例如,傳輸到 Kafka 的時間戳記、Kafka 中的偏移量等。

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"user")
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as $"value")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;

// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .select(to_avro(col("user.name")).as("value"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
  filter(
    select(df, alias(from_avro("value", jsonFormatSchema), "user")),
    column("user.favorite_color") == "red"
  ),
  alias(to_avro("user.name"), "value")
)

write.stream(
  output,
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  topic = "topic2"
)

資料來源選項

Avro 的資料來源選項可透過以下方式設定:

屬性名稱預設值意義範圍版本
avroSchema 使用者提供的 JSON 格式選用架構。
  • 在讀取 Avro 檔案或呼叫函數 from_avro 時,可以將此選項設定為已演化的架構,它與實際的 Avro 架構相容,但不同。反序列化架構將與已演化的架構一致。例如,如果我們設定一個包含一個額外欄位(具有預設值)的已演化的架構,則 Spark 中的讀取結果也會包含新的欄位。請注意,當將此選項與 from_avro 搭配使用時,您仍需要將實際的 Avro 架構傳遞為函數的參數。
  • 在寫入 Avro 時,如果預期的輸出 Avro 架構與 Spark 轉換的架構不符,則可以設定此選項。例如,預期的一個欄位的架構為「列舉」類型,而不是預設轉換架構中的「字串」類型。
讀取、寫入和函數 from_avro 2.4.0
recordName topLevelRecord 寫入結果中的頂層記錄名稱,這是 Avro 規格中所要求的。 寫入 2.4.0
recordNamespace "" 寫入結果中的記錄命名空間。 寫入 2.4.0
ignoreExtension true 此選項控制在讀取時忽略沒有 .avro 副檔名的檔案。
如果啟用此選項,則會載入所有檔案(有和沒有 .avro 副檔名)。
此選項已過時,且會在未來的版本中移除。請使用一般資料來源選項 pathGlobFilter 來過濾檔案名稱。
讀取 2.4.0
compression snappy compression 選項允許指定寫入時使用的壓縮編解碼器。
目前支援的編解碼器為 uncompressedsnappydeflatebzip2xzzstandard
如果未設定選項,則會考量設定 spark.sql.avro.compression.codec
寫入 2.4.0
mode FAILFAST mode 選項允許指定函數 from_avro 的剖析模式。
目前支援的模式為
  • FAILFAST:在處理損毀記錄時擲回例外狀況。
  • PERMISSIVE:損毀的記錄會以空值結果處理。因此,資料架構會強制設定為完全可為空值,這可能與使用者提供的不同。
函式 from_avro 2.4.0
datetimeRebaseMode spark.sql.avro.datetimeRebaseModeInRead 設定的值) datetimeRebaseMode 選項可指定 datetimestamp-microstimestamp-millis 邏輯類型值的重新設定基準模式,從儒略曆轉換為前儒略格里曆。
目前支援的模式為
  • EXCEPTION:讀取兩個曆法中含糊不清的古代日期/時間戳記時會失敗。
  • CORRECTED:載入日期/時間戳記,不重新設定基準。
  • LEGACY:執行從儒略曆到前儒略格里曆的古代日期/時間戳記重新設定基準。
讀取和函式 from_avro 3.2.0
positionalFieldMatching false 這可與 `avroSchema` 選項搭配使用,調整提供的 Avro 架構中欄位與 SQL 架構中欄位配對的行為。預設情況下,配對會使用欄位名稱執行,忽略其位置。如果將此選項設定為「true」,配對將根據欄位的位置執行。 讀取和寫入 3.2.0
enableStableIdentifiersForUnionType false 如果設定為 true,Avro 架構會反序列化為 Spark SQL 架構,而 Avro Union 類型會轉換為欄位名稱與其各自類型保持一致的結構。產生的欄位名稱會轉換為小寫,例如 member_int 或 member_string。如果兩個使用者定義的類型名稱或使用者定義的類型名稱和內建類型名稱相同(不分大小寫),系統會引發例外狀況。但在其他情況下,欄位名稱可以唯一識別。 讀取 3.5.0

設定

可以使用 SparkSession 上的 setConf 方法或使用 SQL 執行 SET key=value 指令來設定 Avro。

屬性名稱預設值意義版本
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果設定為 true,資料來源提供者 com.databricks.spark.avro 會對應到內建但外部的 Avro 資料來源模組,以維持向後相容性。
注意:SQL 設定已在 Spark 3.2 中棄用,未來可能會移除。
2.4.0
spark.sql.avro.compression.codec snappy 用於寫入 AVRO 檔案的壓縮編解碼器。支援的編解碼器:未壓縮、deflate、snappy、bzip2、xz 和 zstandard。預設編解碼器為 snappy。 2.4.0
spark.sql.avro.deflate.level -1 用於寫入 AVRO 檔案的 deflate 編解碼器的壓縮層級。有效值必須在 1 到 9(含)或 -1 的範圍內。預設值為 -1,在目前的實作中對應到層級 6。 2.4.0
spark.sql.avro.datetimeRebaseModeInRead 例外 datetimestamp-microstimestamp-millis 邏輯類型值從儒略曆轉換為前儒略格里曆的重新定基模式
  • 例外:如果 Spark 看到兩個日曆之間含糊不清的古代日期/時間戳記,它將無法讀取。
  • 已更正:Spark 將不會重新定基,而是按原樣讀取日期/時間戳記。
  • 舊版:讀取 Avro 檔案時,Spark 會將日期/時間戳記從舊版混合(儒略曆 + 格里曆)日曆重新定基到前儒略格里曆。
僅當 Avro 檔案的寫入資訊(如 Spark、Hive)未知時,此設定才會生效。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite 例外 datetimestamp-microstimestamp-millis 邏輯類型值從前儒略格里曆轉換為儒略曆的重新定基模式
  • 例外:如果 Spark 看到兩個日曆之間含糊不清的古代日期/時間戳記,它將無法寫入。
  • 已更正:Spark 將不會重新定基,而是按原樣寫入日期/時間戳記。
  • 舊版:寫入 Avro 檔案時,Spark 會將日期/時間戳記從前儒略格里曆重新定基到舊版混合(儒略曆 + 格里曆)日曆。
3.0.0
spark.sql.avro.filterPushdown.enabled true 如果為 true,則啟用將篩選下推至 Avro 資料來源。 3.1.0

與 Databricks spark-avro 相容

此 Avro 資料來源模組最初來自且與 Databricks 的開放原始碼儲存庫 spark-avro 相容。

預設情況下,當 SQL 設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 已啟用,資料來源提供者 com.databricks.spark.avro 會對應到此內建 Avro 模組。對於在目錄中繼資料儲存庫中使用 Provider 屬性為 com.databricks.spark.avro 建立的 Spark 表格,如果您使用此內建 Avro 模組,則對應是載入這些表格的必要條件。

請注意,在 Databricks 的 spark-avro 中,隱含類別 AvroDataFrameWriterAvroDataFrameReader 是為捷徑函式 .avro() 建立的。在此內建但外部模組中,兩個隱含類別都已移除。請改用 DataFrameWriterDataFrameReader 中的 .format("avro"),這應該足夠且乾淨。

如果您偏好使用自己建立的 spark-avro jar 檔案,您可以簡單停用設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled,並在部署應用程式時使用選項 --jars。請閱讀應用程式提交指南中的 進階相依性管理 章節以取得更多詳細資訊。

Avro -> Spark SQL 轉換支援的類型

目前 Spark 支援讀取 Avro 記錄中的所有 基本類型複雜類型

Avro 類型Spark SQL 類型
布林 布林類型
整數 整數類型
長整數 長整數類型
浮點數 浮點數類型
雙精度浮點數 雙精度浮點數類型
字串 字串類型
列舉 字串類型
固定 二進位類型
位元組 二進位類型
記錄 結構類型
陣列 陣列類型
對應 對應類型
聯合 請參閱下方

除了上面列出的類型之外,它還支援讀取 union 類型。以下三種類型被視為基本 union 類型

  1. union(int, long) 將會對應到 LongType。
  2. union(float, double) 將會對應到 DoubleType。
  3. union(something, null),其中 something 是任何受支援的 Avro 類型。這將會對應到與 something 相同的 Spark SQL 類型,其中 nullable 設定為 true。所有其他聯合類型都被視為複雜類型。它們將會對應到結構類型,其中欄位名稱為 member0、member1 等,依據聯合的成員而定。這與在 Avro 和 Parquet 之間進行轉換時的行為一致。

它也支援讀取以下 Avro 邏輯類型

Avro 邏輯類型Avro 類型Spark SQL 類型
日期 整數 日期類型
時間戳記毫秒 長整數 時間戳記類型
時間戳記微秒 長整數 時間戳記類型
十進位數 固定 十進位數類型
十進位數 位元組 十進位數類型

目前,它會忽略 Avro 檔案中出現的說明文件、別名和其他屬性。

Spark SQL -> Avro 轉換支援的類型

Spark 支援將所有 Spark SQL 類型寫入 Avro。對於大多數類型,從 Spark 類型到 Avro 類型的對應很簡單(例如,整數類型會轉換為整數);但是,有幾個特殊情況列於下方

Spark SQL 類型Avro 類型Avro 邏輯類型
位元組類型 整數
短整數類型 整數
二進位類型 位元組
日期類型 整數 日期
時間戳記類型 長整數 時間戳記微秒
十進位數類型 固定 十進位數

您也可以使用選項 avroSchema 指定整個輸出 Avro 架構,以便將 Spark SQL 類型轉換為其他 Avro 類型。以下轉換不會預設套用,需要使用者指定的 Avro 架構

Spark SQL 類型Avro 類型Avro 邏輯類型
二進位類型 固定
字串類型 列舉
時間戳記類型 長整數 時間戳記毫秒
十進位數類型 位元組 十進位數