Parquet 檔案

Parquet 是一種欄位格式,由許多其他資料處理系統支援。Spark SQL 提供支援,可讀取和寫入 Parquet 檔案,並自動保留原始資料的 Schema。在讀取 Parquet 檔案時,所有欄位會自動轉換為可為空值,以符合相容性考量。

以程式方式載入資料

使用上述範例中的資料

peopleDF = spark.read.json("examples/src/main/resources/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+
在 Spark 儲存庫中的「examples/src/main/python/sql/datasource.py」中,尋找完整的範例程式碼。
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中,尋找完整的範例程式碼。
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中,尋找完整的範例程式碼。
df <- read.df("examples/src/main/resources/people.json", "json")

# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin

# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
  cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
在 Spark 儲存庫中的「examples/src/main/r/RSparkSQLExample.R」中,尋找完整的範例程式碼。
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

分割區偵測

資料表分割區是一種常見的最佳化方法,用於 Hive 等系統。在分割區資料表中,資料通常儲存在不同的目錄中,分割區欄位值編碼在每個分割區目錄的路徑中。所有內建檔案來源(包括文字/CSV/JSON/ORC/Parquet)都能自動偵測和推斷分割區資訊。例如,我們可以使用下列目錄結構,將先前使用的人口資料儲存在分割區資料表中,並使用兩個額外欄位,gendercountry 作為分割區欄位

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

path/to/table 傳遞給 SparkSession.read.parquetSparkSession.read.load,Spark SQL 會自動從路徑中擷取分割區資訊。現在,傳回 DataFrame 的 Schema 會變成

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

請注意,分割區欄位的資料類型會自動推斷。目前支援的資料類型包括數字資料類型、日期、時間戳記和字串類型。有時,使用者可能不希望自動推斷分割區欄位的資料類型。對於這些使用案例,可以透過 spark.sql.sources.partitionColumnTypeInference.enabled 設定自動類型推斷,其預設值為 true。當停用類型推斷時,分割區欄位將使用字串類型。

從 Spark 1.6.0 開始,分割區偵測預設只會尋找指定路徑下的分割區。對於上述範例,如果使用者將 path/to/table/gender=male 傳遞給 SparkSession.read.parquetSparkSession.read.loadgender 將不會被視為分割區欄位。如果使用者需要指定分割區偵測應從哪個基本路徑開始,他們可以在資料來源選項中設定 basePath。例如,當 path/to/table/gender=male 是資料的路徑,且使用者將 basePath 設定為 path/to/table/ 時,gender 將會是分割區欄位。

Schema 合併

與 Protocol Buffer、Avro 和 Thrift 類似,Parquet 也支援架構演進。使用者可以從一個簡單的架構開始,並視需要逐漸將更多欄位新增到架構中。如此一來,使用者可能會得到多個 Parquet 檔案,其架構不同,但彼此相容。Parquet 資料來源現在可以自動偵測這種情況,並合併所有這些檔案的架構。

由於架構合併是一個相對昂貴的操作,而且在大部分情況下並非必要,因此我們從 1.5.0 開始預設將其關閉。你可以透過下列方式啟用它

  1. 在讀取 Parquet 檔案時將資料來源選項 mergeSchema 設定為 true(如下面的範例所示),或
  2. 將全域 SQL 選項 spark.sql.parquet.mergeSchema 設定為 true
from pyspark.sql import Row

# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
#  |-- double: long (nullable = true)
#  |-- single: long (nullable = true)
#  |-- triple: long (nullable = true)
#  |-- key: integer (nullable = true)
在 Spark 儲存庫中的「examples/src/main/python/sql/datasource.py」中,尋找完整的範例程式碼。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中,尋找完整的範例程式碼。
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public static class Square implements Serializable {
  private int value;
  private int square;

  // Getters and setters...

}

public static class Cube implements Serializable {
  private int value;
  private int cube;

  // Getters and setters...

}

List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
  Square square = new Square();
  square.setValue(value);
  square.setSquare(value * value);
  squares.add(square);
}

// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");

List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
  Cube cube = new Cube();
  cube.setValue(value);
  cube.setCube(value * value * value);
  cubes.add(cube);
}

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");

// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中,尋找完整的範例程式碼。
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))

# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")

# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths
## root
##  |-- single: double (nullable = true)
##  |-- double: double (nullable = true)
##  |-- triple: double (nullable = true)
##  |-- key: integer (nullable = true)
在 Spark 儲存庫中的「examples/src/main/r/RSparkSQLExample.R」中,尋找完整的範例程式碼。

Hive metastore Parquet 資料表轉換

在從 Hive Metastore Parquet 表格讀取並寫入非分割 Hive Metastore Parquet 表格時,Spark SQL 會嘗試使用自己的 Parquet 支援,而不是 Hive SerDe,以獲得更好的效能。此行為由 spark.sql.hive.convertMetastoreParquet 設定控制,並預設開啟。

Hive/Parquet Schema 調解

從表格架構處理的角度來看,Hive 和 Parquet 之間有兩個主要差異。

  1. Hive 不區分大小寫,而 Parquet 區分大小寫
  2. Hive 認為所有欄位都是可為 Null,而 Parquet 中的可為 Null 很重要

由於這個原因,我們必須在將 Hive Metastore Parquet 表格轉換為 Spark SQL Parquet 表格時,調和 Hive Metastore 架構和 Parquet 架構。調和規則如下

  1. 在兩個架構中具有相同名稱的欄位,無論是否可為 Null,都必須具有相同的資料類型。調和後的欄位應具有 Parquet 端的資料類型,以便尊重可為 Null。

  2. 調和後的架構僅包含在 Hive Metastore 架構中定義的那些欄位。

    • 僅出現在 Parquet 架構中的任何欄位都會在調和後的架構中刪除。
    • 僅出現在 Hive Metastore 架構中的任何欄位都會在調和後的架構中新增為可為 Null 的欄位。

更新元資料

Spark SQL 會快取 Parquet 元資料以獲得更好的效能。當啟用 Hive Metastore Parquet 表格轉換時,也會快取那些已轉換表格的元資料。如果這些表格是由 Hive 或其他外部工具更新,你需要手動更新它們,以確保元資料一致。

# spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");
refreshTable("my_table")
REFRESH TABLE my_table;

欄位加密

自 Spark 3.2 開始,Parquet 表格支援使用 Apache Parquet 1.12+ 的欄位加密。

Parquet 使用信封加密實務,其中檔案部分會使用「資料加密金鑰」(DEK) 加密,而 DEK 則使用「主加密金鑰」(MEK) 加密。Parquet 會為每個加密檔案/欄位隨機產生 DEK。MEK 會在使用者選擇的密鑰管理服務 (KMS) 中產生、儲存和管理。Parquet Maven 儲存庫 有個 jar 檔,其中包含一個模擬 KMS 實作,讓使用者只要使用 spark-shell,就能執行欄位加密和解密,而不需要部署 KMS 伺服器(下載 parquet-hadoop-tests.jar 檔案,並將其放在 Spark jars 資料夾中)

# Set hadoop configuration properties, e.g. using configuration properties of
# the Spark job:
# --conf spark.hadoop.parquet.encryption.kms.client.class=\
#           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS"\
# --conf spark.hadoop.parquet.encryption.key.list=\
#           "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA=="\
# --conf spark.hadoop.parquet.crypto.factory.class=\
#           "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"

# Write encrypted dataframe files.
# Column "square" will be protected with master key "keyA".
# Parquet file footers will be protected with master key "keyB"
squaresDF.write\
   .option("parquet.encryption.column.keys" , "keyA:square")\
   .option("parquet.encryption.footer.key" , "keyB")\
   .parquet("/path/to/table.parquet.encrypted")

# Read encrypted dataframe files
df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
                           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
                   "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==")

// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
                   "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write.
   option("parquet.encryption.column.keys" , "keyA:square").
   option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted")

// Read encrypted dataframe files
val df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration().set("parquet.encryption.kms.client.class" ,
   "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS");

// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration().set("parquet.encryption.key.list" ,
   "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==");

// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration().set("parquet.crypto.factory.class" ,
   "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory");

// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write().
   option("parquet.encryption.column.keys" , "keyA:square").
   option("parquet.encryption.footer.key" , "keyB").
   parquet("/path/to/table.parquet.encrypted");

// Read encrypted dataframe files
Dataset<Row> df2 = spark.read().parquet("/path/to/table.parquet.encrypted");

KMS 客戶端

InMemoryKMS 類別僅供說明和簡單示範 Parquet 加密功能之用。不應在實際部署中使用。主加密金鑰必須保留並管理在使用者組織中部署的生產級 KMS 系統中。要推出具有 Parquet 加密的 Spark,需要為 KMS 伺服器實作一個用戶端類別。Parquet 提供一個用於開發此類類別的 介面

public interface KmsClient {
  // Wraps a key - encrypts it with the master key.
  public String wrapKey(byte[] keyBytes, String masterKeyIdentifier);

  // Decrypts (unwraps) a key with the master key.
  public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier);

  // Use of initialization parameters is optional.
  public void initialize(Configuration configuration, String kmsInstanceID,
                         String kmsInstanceURL, String accessToken);
}

parquet-mr 儲存庫中有一個 範例,說明如何針對開源 KMS 建立此類類別。生產 KMS 用戶端應與組織的安全管理員合作設計,並由具有存取控制管理經驗的開發人員建置。建立此類類別後,便可透過 parquet.encryption.kms.client.class 參數將其傳遞給應用程式,並讓一般 Spark 使用者利用,如上方的加密資料框寫入/讀取範例所示。

注意:預設情況下,Parquet 會實作「雙信封加密」模式,將 Spark 執行器與 KMS 伺服器的互動降至最低。在此模式中,DEK 會使用「金鑰加密金鑰」(KEK,由 Parquet 隨機產生) 加密。KEK 會在 KMS 中使用 MEK 加密;結果和 KEK 本身會快取在 Spark 執行器記憶體中。有興趣使用一般信封加密的使用者,可以將 parquet.encryption.double.wrapping 參數設定為 false 來切換。如需有關 Parquet 加密參數的更多詳細資訊,請參閱 parquet-hadoop 設定 頁面

資料來源選項

Parquet 的資料來源選項可透過下列方式設定:

屬性名稱預設意義範圍
datetimeRebaseMode spark.sql.parquet.datetimeRebaseModeInRead 組態的值) datetimeRebaseMode 選項允許指定 DATETIMESTAMP_MILLISTIMESTAMP_MICROS 邏輯類型值從儒略曆到公曆的換算模式。
目前支援的模式為
  • EXCEPTION:在讀取兩個曆法之間模稜兩可的古代日期/時間戳記時失敗。
  • CORRECTED:載入日期/時間戳記時不換算。
  • LEGACY:執行從儒略曆到公曆的古代日期/時間戳記換算。
讀取
int96RebaseMode spark.sql.parquet.int96RebaseModeInRead 組態的值) int96RebaseMode 選項允許指定從儒略曆到公曆的 INT96 時間戳記換算模式。
目前支援的模式為
  • EXCEPTION:在讀取兩個曆法之間模稜兩可的古代 INT96 時間戳記時失敗。
  • CORRECTED:載入 INT96 時間戳記時不換算。
  • LEGACY:執行從儒略曆到公曆的古代時間戳記換算。
讀取
mergeSchema spark.sql.parquet.mergeSchema 組態的值) 設定是否應合併從所有 Parquet 部分檔案收集的架構。這將覆寫 spark.sql.parquet.mergeSchema 讀取
壓縮 snappy 儲存到檔案時使用的壓縮編解碼器。這可以是已知的區分大小寫的簡短名稱之一(none、uncompressed、snappy、gzip、lzo、brotli、lz4 和 zstd)。這將覆寫 spark.sql.parquet.compression.codec 寫入

其他一般選項可在 一般檔案來源選項 中找到

設定

Parquet 的組態可以使用 SparkSession 上的 setConf 方法或使用 SQL 執行 SET key=value 指令來完成。

屬性名稱預設意義版本
spark.sql.parquet.binaryAsString false 一些其他產生 Parquet 的系統,特別是 Impala、Hive 和舊版本的 Spark SQL,在寫出 Parquet 架構時不會區分二進位資料和字串。此旗標指示 Spark SQL 將二進位資料解釋為字串,以與這些系統相容。 1.1.1
spark.sql.parquet.int96AsTimestamp true 一些產生 Parquet 的系統,特別是 Impala 和 Hive,會將時間戳記儲存在 INT96 中。此旗標指示 Spark SQL 將 INT96 資料解釋為時間戳記,以與這些系統相容。 1.3.0
spark.sql.parquet.int96TimestampConversion false 此設定控制是否在將 Impala 寫入的資料轉換為時間戳記時,套用時間戳記調整。這是必要的,因為 Impala 儲存 INT96 資料的方式,與 Hive 和 Spark 的時區偏移不同。 2.3.0
spark.sql.parquet.outputTimestampType INT96 設定 Spark 將資料寫入 Parquet 檔案時,要使用的 Parquet 時間戳記類型。INT96 是 Parquet 中的非標準,但常用的時間戳記類型。TIMESTAMP_MICROS 是 Parquet 中的標準時間戳記類型,用於儲存自 Unix 紀元開始的微秒數。TIMESTAMP_MILLIS 也是標準的,但精度為毫秒,這表示 Spark 必須截斷其時間戳記值的微秒部分。 2.3.0
spark.sql.parquet.compression.codec snappy 設定在寫入 Parquet 檔案時,所使用的壓縮編解碼器。如果在特定於表格的選項/屬性中指定了 compressionparquet.compression,則優先順序為 compressionparquet.compressionspark.sql.parquet.compression.codec。可接受的值包括:none、uncompressed、snappy、gzip、lzo、brotli、lz4、zstd。請注意,brotli 需要安裝 BrotliCodec 1.1.1
spark.sql.parquet.filterPushdown true 設定為 true 時,會啟用 Parquet 篩選下推最佳化。 1.2.0
spark.sql.parquet.aggregatePushdown false 如果為 true,則會將聚合下推到 Parquet 以進行最佳化。支援 MIN、MAX 和 COUNT 作為聚合表達式。對於 MIN/MAX,支援布林、整數、浮點數和日期類型。對於 COUNT,支援所有資料類型。如果任何 Parquet 檔案尾端缺少統計資料,則會擲回例外。 3.3.0
spark.sql.hive.convertMetastoreParquet true 設定為 false 時,Spark SQL 會使用 Hive SerDe 處理 Parquet 表格,而不是使用內建支援。 1.1.1
spark.sql.parquet.mergeSchema false

如果為 true,則 Parquet 資料來源會合併從所有資料檔案收集的架構,否則會從摘要檔案或隨機資料檔案中選取架構(如果沒有摘要檔案)。

1.5.0
spark.sql.parquet.respectSummaryFiles false 如果為 true,則我們假設 Parquet 的所有部分檔案都與摘要檔案一致,而且我們會在合併架構時略過它們。否則,如果為 false(預設值),則我們會合併所有部分檔案。這應視為僅供專家使用的選項,而且不應在完全了解其含義之前啟用。 1.5.0
spark.sql.parquet.writeLegacyFormat false 如果為 true,則資料會以 Spark 1.4 及更早版本的格式寫入。例如,十進位值會以 Apache Parquet 的定長位元組陣列格式寫入,而其他系統(例如 Apache Hive 和 Apache Impala)會使用此格式。如果為 false,則會使用 Parquet 中較新的格式。例如,十進位值會以基於整數的格式寫入。如果 Parquet 輸出要與不支援此較新格式的系統一起使用,請設定為 true。 1.6.0
spark.sql.parquet.enableVectorizedReader true 啟用向量化 Parquet 解碼。 2.0.0
spark.sql.parquet.enableNestedColumnVectorizedReader true 啟用巢狀欄位的向量化 Parquet 解碼(例如結構、清單、地圖)。需要啟用 spark.sql.parquet.enableVectorizedReader 3.3.0
spark.sql.parquet.recordLevelFilter.enabled false 如果為 true,將使用推入式篩選器啟用 Parquet 的原生記錄層級篩選。此組態僅在啟用 spark.sql.parquet.filterPushdown 且未使用的向量化讀取器時有效。您可以將 spark.sql.parquet.enableVectorizedReader 設為 false 來確保未使用的向量化讀取器。 2.3.0
spark.sql.parquet.columnarReaderBatchSize 4096 要包含在 Parquet 向量化讀取器批次中的列數。應仔細選擇此數字,以將開銷降至最低並避免在讀取資料時發生 OOM。 2.4.0
spark.sql.parquet.fieldId.write.enabled true 欄位 ID 是 Parquet schema 規範的原生欄位。啟用後,Parquet 寫入器會將 Spark schema 中的欄位 ID 元資料(如果存在)填入 Parquet schema。 3.3.0
spark.sql.parquet.fieldId.read.enabled false 欄位 ID 是 Parquet schema 規範的原生欄位。啟用後,Parquet 讀取器會使用請求的 Spark schema 中的欄位 ID(如果存在)來查詢 Parquet 欄位,而不是使用欄位名稱。 3.3.0
spark.sql.parquet.fieldId.read.ignoreMissing false 當 Parquet 檔案沒有任何欄位 ID,但 Spark 讀取 schema 使用欄位 ID 進行讀取時,如果啟用此旗標,我們會靜默傳回 null,否則會傳回錯誤。 3.3.0
spark.sql.parquet.timestampNTZ.enabled true 啟用 Parquet 讀取和寫入的 TIMESTAMP_NTZ 支援。啟用後,TIMESTAMP_NTZ 值會寫入為 Parquet timestamp 欄位,其註解 isAdjustedToUTC = false,並以類似方式推斷。停用後,此類值會讀取為 TIMESTAMP_LTZ,且必須轉換為 TIMESTAMP_LTZ 才能寫入。 3.4.0
spark.sql.parquet.datetimeRebaseModeInRead EXCEPTION DATETIMESTAMP_MILLISTIMESTAMP_MICROS 邏輯類型值從儒略曆重新設定為前儒略曆的重新設定模式
  • EXCEPTION:如果 Spark 看到兩個日曆之間模稜兩可的古代日期/時間戳,它將會讀取失敗。
  • CORRECTED:Spark 將不會重新設定,並按原樣讀取日期/時間戳。
  • LEGACY:Spark 會在讀取 Parquet 檔案時,將日期/時間戳從舊式混合(儒略曆 + 公曆)日曆重新設定為前儒略曆。
此組態僅在 Parquet 檔案的寫入器資訊(例如 Spark、Hive)未知時才會有效。
3.0.0
spark.sql.parquet.datetimeRebaseModeInWrite EXCEPTION DATETIMESTAMP_MILLISTIMESTAMP_MICROS 邏輯型態值從前儒略曆轉換為儒略曆的重新定基模式
  • EXCEPTION:如果 Spark 看見在兩個日曆之間模稜兩可的古代日期/時間戳記,它將會寫入失敗。
  • CORRECTED:Spark 將不會重新定基,並照原樣寫入日期/時間戳記。
  • LEGACY:在寫入 Parquet 檔案時,Spark 會將日期/時間戳記從前儒略曆重新定基到舊式混合(儒略曆 + 公曆)日曆。
3.0.0
spark.sql.parquet.int96RebaseModeInRead EXCEPTION INT96 時間戳記型態值從儒略曆轉換為前儒略曆的重新定基模式
  • EXCEPTION:如果 Spark 看見在兩個日曆之間模稜兩可的古代 INT96 時間戳記,它將會讀取失敗。
  • CORRECTED:Spark 將不會重新設定,並按原樣讀取日期/時間戳。
  • LEGACY:在讀取 Parquet 檔案時,Spark 會將 INT96 時間戳記從舊式混合(儒略曆 + 公曆)日曆重新定基到前儒略曆。
此組態僅在 Parquet 檔案的寫入器資訊(例如 Spark、Hive)未知時才會有效。
3.1.0
spark.sql.parquet.int96RebaseModeInWrite EXCEPTION INT96 時間戳記型態值從前儒略曆轉換為儒略曆的重新定基模式
  • EXCEPTION:如果 Spark 看見在兩個日曆之間模稜兩可的古代時間戳記,它將會寫入失敗。
  • CORRECTED:Spark 將不會重新定基,並照原樣寫入日期/時間戳記。
  • LEGACY:在寫入 Parquet 檔案時,Spark 會將 INT96 時間戳記從前儒略曆重新定基到舊式混合(儒略曆 + 公曆)日曆。
3.1.0