Protobuf 資料來源指南

自 Spark 3.4.0 版本起,Spark SQL 提供內建支援,用於讀取和寫入 Protobuf 資料。

部署

spark-protobuf 模組是外部的,預設情況下未包含在 spark-submitspark-shell 中。

與任何 Spark 應用程式一樣,spark-submit 用於啟動應用程式。可以使用 --packagesspark-protobuf_2.12 及其相依性直接新增至 spark-submit,例如:

./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...

對於在 spark-shell 上進行實驗,您也可以使用 --packages 直接新增 org.apache.spark:spark-protobuf_2.12 及其相依性,

./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...

請參閱應用程式提交指南,以進一步瞭解如何提交具有外部相依性的應用程式。

to_protobuf() 和 from_protobuf()

spark-protobuf 套件提供函式 to_protobuf,用於將欄位編碼為 Protobuf 格式的二進位,以及 from_protobuf(),用於將 Protobuf 二進位資料解碼成欄位。這兩個函式都會將一欄位轉換為另一欄位,而輸入/輸出 SQL 資料類型可以是複雜類型或原始類型。

在從 Kafka 等串流來源讀取或寫入資料時,使用 Protobuf 訊息作為欄位非常有用。每筆 Kafka 鍵值記錄都會新增一些元資料,例如輸入 Kafka 的時間戳記、Kafka 中的偏移量等。

Spark SQL 架構是根據傳遞給 from_protobufto_protobuf 的 protobuf 描述檔或 protobuf 類別產生的。指定的 protobuf 類別或 protobuf 描述檔必須與資料相符,否則行為未定義:可能會失敗或傳回任意結果。

此 div 僅用於讓 markdown 編輯器/檢視器正常運作,不會顯示在網頁上 ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
#   string name = 1;
#   int64 id = 2;
#   string context = 3;
# }
df = spark
  .readStream
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
  .select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
  .where('event.name == "alice"')
  .select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))

# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
  .select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
  .where('event.name == "alice"')

output.printSchema()
# root
#  |--event: struct (nullable = true)
#  |   |-- name : string (nullable = true)
#  |   |-- id: long (nullable = true)
#  |   |-- context: string (nullable = true)

output = output
  .select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")
  .start()
```
此 div 僅用於讓 markdown 編輯器/檢視器正常運作,不會顯示在網頁上 ```scala
import org.apache.spark.sql.protobuf.functions._

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
  .select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
  .where("event.name == \"alice\"")
  .select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
  .select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
```
此 div 僅用於讓 markdown 編輯器/檢視器正常運作,不會顯示在網頁上 ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
  .select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
  .where("event.name == \"alice\"")
  .select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
  .select(
    from_protobuf(col("value"),
    "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(
  to_protobuf(col("event"),
  "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
```

Protobuf -> Spark SQL 轉換支援的類型

目前 Spark 支援讀取 Protobuf 訊息中的 protobuf 標量類型列舉類型巢狀類型映射類型。除了這些類型之外,spark-protobuf 也支援 Protobuf OneOf 欄位。這讓您可以處理可能有多個可能的欄位組的訊息,但一次只能存在一個組。這對於您處理的資料格式並不總是相同,而且您需要能夠處理具有不同欄位組的訊息而不會遇到錯誤的情況時很有用。

Protobuf 類型Spark SQL 類型
布林 布林類型
整數 整數類型
長整數 長整數類型
浮點數 浮點數類型
雙精度浮點數 雙精度浮點數類型
字串 字串類型
列舉 字串類型
位元組 二進位類型
訊息 結構類型
重複 陣列類型
映射 映射類型
OneOf 結構

它也支援讀取以下 Protobuf 類型 時間戳記持續時間

Protobuf 邏輯類型Protobuf 架構Spark SQL 類型
持續時間 訊息類型{秒數:長整數,奈秒:整數} 日時間間隔類型
時間戳記 訊息類型{秒數:長整數,奈秒:整數} 時間戳記類型

Spark SQL -> Protobuf 轉換支援的類型

Spark 支援將所有 Spark SQL 類型寫入 Protobuf。對於大多數類型,從 Spark 類型到 Protobuf 類型的對應很簡單(例如,整數類型會轉換為整數);

Spark SQL 類型Protobuf 類型
布林類型 布林
整數類型 整數
長整數類型 長整數
浮點數類型 浮點數
雙精度浮點數類型 雙精度浮點數
字串類型 字串
字串類型 列舉
二進位類型 位元組
結構類型 訊息
陣列類型 重複
映射類型 映射

處理 Protobuf 欄位的循環參考

使用 Protobuf 資料時,可能會出現一個常見問題,就是存在循環參照。在 Protobuf 中,當欄位參照回它自己或參照回另一個參照回原始欄位的欄位時,就會發生循環參照。這可能會在剖析資料時造成問題,因為它可能會導致無限迴圈或其他意外行為。為了解決這個問題,spark-protobuf 的最新版本引進了一個新功能:透過欄位類型檢查循環參照的能力。這允許使用者使用 recursive.fields.max.depth 選項,以指定剖析架構時允許的遞迴層級數。預設情況下,spark-protobuf 會將 recursive.fields.max.depth 設為 -1,以禁止遞迴欄位。不過,您可以在需要時將此選項設為 0 到 10。

recursive.fields.max.depth 設為 0 會捨棄所有遞迴欄位,設為 1 會允許遞迴一次,設為 2 會允許遞迴兩次。不允許 recursive.fields.max.depth 值大於 10,因為它可能會導致效能問題,甚至堆疊溢位。

以下 protobuf 訊息的 SQL 架構會根據 recursive.fields.max.depth 的值而有所不同。

此 div 僅用於讓 markdown 編輯器/檢視器正常運作,不會顯示在網頁上 ```protobuf
syntax = "proto3"
message Person {
  string name = 1;
  Person bff = 2
}

// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.

0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
```