Protobuf 資料來源指南
- 部署
- to_protobuf() 和 from_protobuf()
- Protobuf -> Spark SQL 轉換支援的類型
- Spark SQL -> Protobuf 轉換支援的類型
- 處理 Protobuf 欄位的循環參考
自 Spark 3.4.0 版本起,Spark SQL 提供內建支援,用於讀取和寫入 Protobuf 資料。
部署
spark-protobuf
模組是外部的,預設情況下未包含在 spark-submit
或 spark-shell
中。
與任何 Spark 應用程式一樣,spark-submit
用於啟動應用程式。可以使用 --packages
將 spark-protobuf_2.12
及其相依性直接新增至 spark-submit
,例如:
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...
對於在 spark-shell
上進行實驗,您也可以使用 --packages
直接新增 org.apache.spark:spark-protobuf_2.12
及其相依性,
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...
請參閱應用程式提交指南,以進一步瞭解如何提交具有外部相依性的應用程式。
to_protobuf() 和 from_protobuf()
spark-protobuf 套件提供函式 to_protobuf
,用於將欄位編碼為 Protobuf 格式的二進位,以及 from_protobuf()
,用於將 Protobuf 二進位資料解碼成欄位。這兩個函式都會將一欄位轉換為另一欄位,而輸入/輸出 SQL 資料類型可以是複雜類型或原始類型。
在從 Kafka 等串流來源讀取或寫入資料時,使用 Protobuf 訊息作為欄位非常有用。每筆 Kafka 鍵值記錄都會新增一些元資料,例如輸入 Kafka 的時間戳記、Kafka 中的偏移量等。
- 如果包含您資料的「值」欄位在 Protobuf 中,您可以使用
from_protobuf()
萃取資料、豐富資料、清除資料,然後再次將資料推送到 Kafka 或寫入不同的接收器。 to_protobuf()
可用於將結構轉換為 Protobuf 訊息。當您想在將資料寫入 Kafka 時將多個欄位重新編碼為單一欄位時,此方法特別有用。
Spark SQL 架構是根據傳遞給 from_protobuf
和 to_protobuf
的 protobuf 描述檔或 protobuf 類別產生的。指定的 protobuf 類別或 protobuf 描述檔必須與資料相符,否則行為未定義:可能會失敗或傳回任意結果。
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')
output.printSchema()
# root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
Protobuf -> Spark SQL 轉換支援的類型
目前 Spark 支援讀取 Protobuf 訊息中的 protobuf 標量類型、列舉類型、巢狀類型 和 映射類型。除了這些類型之外,spark-protobuf
也支援 Protobuf OneOf
欄位。這讓您可以處理可能有多個可能的欄位組的訊息,但一次只能存在一個組。這對於您處理的資料格式並不總是相同,而且您需要能夠處理具有不同欄位組的訊息而不會遇到錯誤的情況時很有用。
Protobuf 類型 | Spark SQL 類型 |
---|---|
布林 | 布林類型 |
整數 | 整數類型 |
長整數 | 長整數類型 |
浮點數 | 浮點數類型 |
雙精度浮點數 | 雙精度浮點數類型 |
字串 | 字串類型 |
列舉 | 字串類型 |
位元組 | 二進位類型 |
訊息 | 結構類型 |
重複 | 陣列類型 |
映射 | 映射類型 |
OneOf | 結構 |
它也支援讀取以下 Protobuf 類型 時間戳記 和 持續時間
Protobuf 邏輯類型 | Protobuf 架構 | Spark SQL 類型 |
---|---|---|
持續時間 | 訊息類型{秒數:長整數,奈秒:整數} | 日時間間隔類型 |
時間戳記 | 訊息類型{秒數:長整數,奈秒:整數} | 時間戳記類型 |
Spark SQL -> Protobuf 轉換支援的類型
Spark 支援將所有 Spark SQL 類型寫入 Protobuf。對於大多數類型,從 Spark 類型到 Protobuf 類型的對應很簡單(例如,整數類型會轉換為整數);
Spark SQL 類型 | Protobuf 類型 |
---|---|
布林類型 | 布林 |
整數類型 | 整數 |
長整數類型 | 長整數 |
浮點數類型 | 浮點數 |
雙精度浮點數類型 | 雙精度浮點數 |
字串類型 | 字串 |
字串類型 | 列舉 |
二進位類型 | 位元組 |
結構類型 | 訊息 |
陣列類型 | 重複 |
映射類型 | 映射 |
處理 Protobuf 欄位的循環參考
使用 Protobuf 資料時,可能會出現一個常見問題,就是存在循環參照。在 Protobuf 中,當欄位參照回它自己或參照回另一個參照回原始欄位的欄位時,就會發生循環參照。這可能會在剖析資料時造成問題,因為它可能會導致無限迴圈或其他意外行為。為了解決這個問題,spark-protobuf 的最新版本引進了一個新功能:透過欄位類型檢查循環參照的能力。這允許使用者使用 recursive.fields.max.depth
選項,以指定剖析架構時允許的遞迴層級數。預設情況下,spark-protobuf
會將 recursive.fields.max.depth
設為 -1,以禁止遞迴欄位。不過,您可以在需要時將此選項設為 0 到 10。
將 recursive.fields.max.depth
設為 0 會捨棄所有遞迴欄位,設為 1 會允許遞迴一次,設為 2 會允許遞迴兩次。不允許 recursive.fields.max.depth
值大於 10,因為它可能會導致效能問題,甚至堆疊溢位。
以下 protobuf 訊息的 SQL 架構會根據 recursive.fields.max.depth
的值而有所不同。
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
}
// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.
0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...