Parquet 檔案
Parquet 是一種欄位格式,由許多其他資料處理系統支援。Spark SQL 提供支援,可讀取和寫入 Parquet 檔案,並自動保留原始資料的 Schema。在讀取 Parquet 檔案時,所有欄位會自動轉換為可為空值,以符合相容性考量。
以程式方式載入資料
使用上述範例中的資料
peopleDF = spark.read.json("examples/src/main/resources/people.json")
# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");
// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
df <- read.df("examples/src/main/resources/people.json", "json")
# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path "examples/src/main/resources/people.parquet"
)
SELECT * FROM parquetTable
分割區偵測
資料表分割區是一種常見的最佳化方法,用於 Hive 等系統。在分割區資料表中,資料通常儲存在不同的目錄中,分割區欄位值編碼在每個分割區目錄的路徑中。所有內建檔案來源(包括文字/CSV/JSON/ORC/Parquet)都能自動偵測和推斷分割區資訊。例如,我們可以使用下列目錄結構,將先前使用的人口資料儲存在分割區資料表中,並使用兩個額外欄位,gender
和 country
作為分割區欄位
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
將 path/to/table
傳遞給 SparkSession.read.parquet
或 SparkSession.read.load
,Spark SQL 會自動從路徑中擷取分割區資訊。現在,傳回 DataFrame 的 Schema 會變成
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
請注意,分割區欄位的資料類型會自動推斷。目前支援的資料類型包括數字資料類型、日期、時間戳記和字串類型。有時,使用者可能不希望自動推斷分割區欄位的資料類型。對於這些使用案例,可以透過 spark.sql.sources.partitionColumnTypeInference.enabled
設定自動類型推斷,其預設值為 true
。當停用類型推斷時,分割區欄位將使用字串類型。
從 Spark 1.6.0 開始,分割區偵測預設只會尋找指定路徑下的分割區。對於上述範例,如果使用者將 path/to/table/gender=male
傳遞給 SparkSession.read.parquet
或 SparkSession.read.load
,gender
將不會被視為分割區欄位。如果使用者需要指定分割區偵測應從哪個基本路徑開始,他們可以在資料來源選項中設定 basePath
。例如,當 path/to/table/gender=male
是資料的路徑,且使用者將 basePath
設定為 path/to/table/
時,gender
將會是分割區欄位。
Schema 合併
與 Protocol Buffer、Avro 和 Thrift 類似,Parquet 也支援架構演進。使用者可以從一個簡單的架構開始,並視需要逐漸將更多欄位新增到架構中。如此一來,使用者可能會得到多個 Parquet 檔案,其架構不同,但彼此相容。Parquet 資料來源現在可以自動偵測這種情況,並合併所有這些檔案的架構。
由於架構合併是一個相對昂貴的操作,而且在大部分情況下並非必要,因此我們從 1.5.0 開始預設將其關閉。你可以透過下列方式啟用它
- 在讀取 Parquet 檔案時將資料來源選項
mergeSchema
設定為true
(如下面的範例所示),或 - 將全域 SQL 選項
spark.sql.parquet.mergeSchema
設定為true
。
from pyspark.sql import Row
# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
.map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")
# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- double: long (nullable = true)
# |-- single: long (nullable = true)
# |-- triple: long (nullable = true)
# |-- key: integer (nullable = true)
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// Getters and setters...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// Getters and setters...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths
## root
## |-- single: double (nullable = true)
## |-- double: double (nullable = true)
## |-- triple: double (nullable = true)
## |-- key: integer (nullable = true)
Hive metastore Parquet 資料表轉換
在從 Hive Metastore Parquet 表格讀取並寫入非分割 Hive Metastore Parquet 表格時,Spark SQL 會嘗試使用自己的 Parquet 支援,而不是 Hive SerDe,以獲得更好的效能。此行為由 spark.sql.hive.convertMetastoreParquet
設定控制,並預設開啟。
Hive/Parquet Schema 調解
從表格架構處理的角度來看,Hive 和 Parquet 之間有兩個主要差異。
- Hive 不區分大小寫,而 Parquet 區分大小寫
- Hive 認為所有欄位都是可為 Null,而 Parquet 中的可為 Null 很重要
由於這個原因,我們必須在將 Hive Metastore Parquet 表格轉換為 Spark SQL Parquet 表格時,調和 Hive Metastore 架構和 Parquet 架構。調和規則如下
-
在兩個架構中具有相同名稱的欄位,無論是否可為 Null,都必須具有相同的資料類型。調和後的欄位應具有 Parquet 端的資料類型,以便尊重可為 Null。
-
調和後的架構僅包含在 Hive Metastore 架構中定義的那些欄位。
- 僅出現在 Parquet 架構中的任何欄位都會在調和後的架構中刪除。
- 僅出現在 Hive Metastore 架構中的任何欄位都會在調和後的架構中新增為可為 Null 的欄位。
更新元資料
Spark SQL 會快取 Parquet 元資料以獲得更好的效能。當啟用 Hive Metastore Parquet 表格轉換時,也會快取那些已轉換表格的元資料。如果這些表格是由 Hive 或其他外部工具更新,你需要手動更新它們,以確保元資料一致。
# spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");
refreshTable("my_table")
REFRESH TABLE my_table;
欄位加密
自 Spark 3.2 開始,Parquet 表格支援使用 Apache Parquet 1.12+ 的欄位加密。
Parquet 使用信封加密實務,其中檔案部分會使用「資料加密金鑰」(DEK) 加密,而 DEK 則使用「主加密金鑰」(MEK) 加密。Parquet 會為每個加密檔案/欄位隨機產生 DEK。MEK 會在使用者選擇的密鑰管理服務 (KMS) 中產生、儲存和管理。Parquet Maven 儲存庫 有個 jar 檔,其中包含一個模擬 KMS 實作,讓使用者只要使用 spark-shell,就能執行欄位加密和解密,而不需要部署 KMS 伺服器(下載 parquet-hadoop-tests.jar
檔案,並將其放在 Spark jars
資料夾中)
# Set hadoop configuration properties, e.g. using configuration properties of
# the Spark job:
# --conf spark.hadoop.parquet.encryption.kms.client.class=\
# "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS"\
# --conf spark.hadoop.parquet.encryption.key.list=\
# "keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA=="\
# --conf spark.hadoop.parquet.crypto.factory.class=\
# "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"
# Write encrypted dataframe files.
# Column "square" will be protected with master key "keyA".
# Parquet file footers will be protected with master key "keyB"
squaresDF.write\
.option("parquet.encryption.column.keys" , "keyA:square")\
.option("parquet.encryption.footer.key" , "keyB")\
.parquet("/path/to/table.parquet.encrypted")
# Read encrypted dataframe files
df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
"keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==")
// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write.
option("parquet.encryption.column.keys" , "keyA:square").
option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted")
// Read encrypted dataframe files
val df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration().set("parquet.encryption.kms.client.class" ,
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS");
// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration().set("parquet.encryption.key.list" ,
"keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==");
// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration().set("parquet.crypto.factory.class" ,
"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory");
// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write().
option("parquet.encryption.column.keys" , "keyA:square").
option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted");
// Read encrypted dataframe files
Dataset<Row> df2 = spark.read().parquet("/path/to/table.parquet.encrypted");
KMS 客戶端
InMemoryKMS 類別僅供說明和簡單示範 Parquet 加密功能之用。不應在實際部署中使用。主加密金鑰必須保留並管理在使用者組織中部署的生產級 KMS 系統中。要推出具有 Parquet 加密的 Spark,需要為 KMS 伺服器實作一個用戶端類別。Parquet 提供一個用於開發此類類別的 介面,
public interface KmsClient {
// Wraps a key - encrypts it with the master key.
public String wrapKey(byte[] keyBytes, String masterKeyIdentifier);
// Decrypts (unwraps) a key with the master key.
public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier);
// Use of initialization parameters is optional.
public void initialize(Configuration configuration, String kmsInstanceID,
String kmsInstanceURL, String accessToken);
}
parquet-mr 儲存庫中有一個 範例,說明如何針對開源 KMS 建立此類類別。生產 KMS 用戶端應與組織的安全管理員合作設計,並由具有存取控制管理經驗的開發人員建置。建立此類類別後,便可透過 parquet.encryption.kms.client.class
參數將其傳遞給應用程式,並讓一般 Spark 使用者利用,如上方的加密資料框寫入/讀取範例所示。
注意:預設情況下,Parquet 會實作「雙信封加密」模式,將 Spark 執行器與 KMS 伺服器的互動降至最低。在此模式中,DEK 會使用「金鑰加密金鑰」(KEK,由 Parquet 隨機產生) 加密。KEK 會在 KMS 中使用 MEK 加密;結果和 KEK 本身會快取在 Spark 執行器記憶體中。有興趣使用一般信封加密的使用者,可以將 parquet.encryption.double.wrapping
參數設定為 false
來切換。如需有關 Parquet 加密參數的更多詳細資訊,請參閱 parquet-hadoop 設定 頁面。
資料來源選項
Parquet 的資料來源選項可透過下列方式設定:
.option
/.options
方法DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
OPTIONS
子句,請參閱 使用資料來源建立資料表
屬性名稱 | 預設 | 意義 | 範圍 |
---|---|---|---|
datetimeRebaseMode |
(spark.sql.parquet.datetimeRebaseModeInRead 組態的值) |
datetimeRebaseMode 選項允許指定 DATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 邏輯類型值從儒略曆到公曆的換算模式。目前支援的模式為
|
讀取 |
int96RebaseMode |
(spark.sql.parquet.int96RebaseModeInRead 組態的值) |
int96RebaseMode 選項允許指定從儒略曆到公曆的 INT96 時間戳記換算模式。目前支援的模式為
|
讀取 |
mergeSchema |
(spark.sql.parquet.mergeSchema 組態的值) |
設定是否應合併從所有 Parquet 部分檔案收集的架構。這將覆寫 spark.sql.parquet.mergeSchema 。 |
讀取 |
壓縮 |
snappy |
儲存到檔案時使用的壓縮編解碼器。這可以是已知的區分大小寫的簡短名稱之一(none、uncompressed、snappy、gzip、lzo、brotli、lz4 和 zstd)。這將覆寫 spark.sql.parquet.compression.codec 。 |
寫入 |
其他一般選項可在 一般檔案來源選項 中找到
設定
Parquet 的組態可以使用 SparkSession
上的 setConf
方法或使用 SQL 執行 SET key=value
指令來完成。
屬性名稱 | 預設 | 意義 | 版本 |
---|---|---|---|
spark.sql.parquet.binaryAsString |
false | 一些其他產生 Parquet 的系統,特別是 Impala、Hive 和舊版本的 Spark SQL,在寫出 Parquet 架構時不會區分二進位資料和字串。此旗標指示 Spark SQL 將二進位資料解釋為字串,以與這些系統相容。 | 1.1.1 |
spark.sql.parquet.int96AsTimestamp |
true | 一些產生 Parquet 的系統,特別是 Impala 和 Hive,會將時間戳記儲存在 INT96 中。此旗標指示 Spark SQL 將 INT96 資料解釋為時間戳記,以與這些系統相容。 | 1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | 此設定控制是否在將 Impala 寫入的資料轉換為時間戳記時,套用時間戳記調整。這是必要的,因為 Impala 儲存 INT96 資料的方式,與 Hive 和 Spark 的時區偏移不同。 | 2.3.0 |
spark.sql.parquet.outputTimestampType |
INT96 | 設定 Spark 將資料寫入 Parquet 檔案時,要使用的 Parquet 時間戳記類型。INT96 是 Parquet 中的非標準,但常用的時間戳記類型。TIMESTAMP_MICROS 是 Parquet 中的標準時間戳記類型,用於儲存自 Unix 紀元開始的微秒數。TIMESTAMP_MILLIS 也是標準的,但精度為毫秒,這表示 Spark 必須截斷其時間戳記值的微秒部分。 | 2.3.0 |
spark.sql.parquet.compression.codec |
snappy |
設定在寫入 Parquet 檔案時,所使用的壓縮編解碼器。如果在特定於表格的選項/屬性中指定了 compression 或 parquet.compression ,則優先順序為 compression 、parquet.compression 、spark.sql.parquet.compression.codec 。可接受的值包括:none、uncompressed、snappy、gzip、lzo、brotli、lz4、zstd。請注意,brotli 需要安裝 BrotliCodec 。
|
1.1.1 |
spark.sql.parquet.filterPushdown |
true | 設定為 true 時,會啟用 Parquet 篩選下推最佳化。 | 1.2.0 |
spark.sql.parquet.aggregatePushdown |
false | 如果為 true,則會將聚合下推到 Parquet 以進行最佳化。支援 MIN、MAX 和 COUNT 作為聚合表達式。對於 MIN/MAX,支援布林、整數、浮點數和日期類型。對於 COUNT,支援所有資料類型。如果任何 Parquet 檔案尾端缺少統計資料,則會擲回例外。 | 3.3.0 |
spark.sql.hive.convertMetastoreParquet |
true | 設定為 false 時,Spark SQL 會使用 Hive SerDe 處理 Parquet 表格,而不是使用內建支援。 | 1.1.1 |
spark.sql.parquet.mergeSchema |
false |
如果為 true,則 Parquet 資料來源會合併從所有資料檔案收集的架構,否則會從摘要檔案或隨機資料檔案中選取架構(如果沒有摘要檔案)。 |
1.5.0 |
spark.sql.parquet.respectSummaryFiles |
false | 如果為 true,則我們假設 Parquet 的所有部分檔案都與摘要檔案一致,而且我們會在合併架構時略過它們。否則,如果為 false(預設值),則我們會合併所有部分檔案。這應視為僅供專家使用的選項,而且不應在完全了解其含義之前啟用。 | 1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | 如果為 true,則資料會以 Spark 1.4 及更早版本的格式寫入。例如,十進位值會以 Apache Parquet 的定長位元組陣列格式寫入,而其他系統(例如 Apache Hive 和 Apache Impala)會使用此格式。如果為 false,則會使用 Parquet 中較新的格式。例如,十進位值會以基於整數的格式寫入。如果 Parquet 輸出要與不支援此較新格式的系統一起使用,請設定為 true。 | 1.6.0 |
spark.sql.parquet.enableVectorizedReader |
true | 啟用向量化 Parquet 解碼。 | 2.0.0 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true |
啟用巢狀欄位的向量化 Parquet 解碼(例如結構、清單、地圖)。需要啟用 spark.sql.parquet.enableVectorizedReader 。
|
3.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false |
如果為 true,將使用推入式篩選器啟用 Parquet 的原生記錄層級篩選。此組態僅在啟用 spark.sql.parquet.filterPushdown 且未使用的向量化讀取器時有效。您可以將 spark.sql.parquet.enableVectorizedReader 設為 false 來確保未使用的向量化讀取器。
|
2.3.0 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | 要包含在 Parquet 向量化讀取器批次中的列數。應仔細選擇此數字,以將開銷降至最低並避免在讀取資料時發生 OOM。 | 2.4.0 |
spark.sql.parquet.fieldId.write.enabled |
true | 欄位 ID 是 Parquet schema 規範的原生欄位。啟用後,Parquet 寫入器會將 Spark schema 中的欄位 ID 元資料(如果存在)填入 Parquet schema。 | 3.3.0 |
spark.sql.parquet.fieldId.read.enabled |
false | 欄位 ID 是 Parquet schema 規範的原生欄位。啟用後,Parquet 讀取器會使用請求的 Spark schema 中的欄位 ID(如果存在)來查詢 Parquet 欄位,而不是使用欄位名稱。 | 3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | 當 Parquet 檔案沒有任何欄位 ID,但 Spark 讀取 schema 使用欄位 ID 進行讀取時,如果啟用此旗標,我們會靜默傳回 null,否則會傳回錯誤。 | 3.3.0 |
spark.sql.parquet.timestampNTZ.enabled |
true |
啟用 Parquet 讀取和寫入的 TIMESTAMP_NTZ 支援。啟用後,TIMESTAMP_NTZ 值會寫入為 Parquet timestamp 欄位,其註解 isAdjustedToUTC = false,並以類似方式推斷。停用後,此類值會讀取為 TIMESTAMP_LTZ ,且必須轉換為 TIMESTAMP_LTZ 才能寫入。
|
3.4.0 |
spark.sql.parquet.datetimeRebaseModeInRead | EXCEPTION |
DATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 邏輯類型值從儒略曆重新設定為前儒略曆的重新設定模式
|
3.0.0 |
spark.sql.parquet.datetimeRebaseModeInWrite | EXCEPTION |
DATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 邏輯型態值從前儒略曆轉換為儒略曆的重新定基模式
|
3.0.0 |
spark.sql.parquet.int96RebaseModeInRead | EXCEPTION |
INT96 時間戳記型態值從儒略曆轉換為前儒略曆的重新定基模式
|
3.1.0 |
spark.sql.parquet.int96RebaseModeInWrite | EXCEPTION |
INT96 時間戳記型態值從前儒略曆轉換為儒略曆的重新定基模式
|
3.1.0 |