通用載入/儲存函數

在最簡單的形式中,預設資料來源 (parquet 除非另行由 spark.sql.sources.default 設定) 將用於所有作業。

df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
在 Spark 回存區的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。

手動指定選項

您也可以手動指定將使用的資料來源,以及您想傳遞給資料來源的任何額外選項。資料來源由其完全限定名稱指定 (即 org.apache.spark.sql.parquet),但對於內建來源,您也可以使用其簡短名稱 (jsonparquetjdbcorclibsvmcsvtext)。從任何資料來源類型載入的資料框可以使用此語法轉換成其他類型。

請參閱 API 文件以取得內建來源的可用選項,例如 org.apache.spark.sql.DataFrameReaderorg.apache.spark.sql.DataFrameWriter。文件中記錄的選項也應適用於非 Scala Spark API (例如 PySpark)。對於其他格式,請參閱特定格式的 API 文件。

若要載入 JSON 檔案,您可以使用

df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
在 Spark 回存區的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。

若要載入 CSV 檔案,您可以使用

df = spark.read.load("examples/src/main/resources/people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
Dataset<Row> peopleDFCsv = spark.read().format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
在 Spark 回存區的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。

額外的選項也會在寫入操作中使用。例如,您可以控制 ORC 資料來源的布隆過濾器和字典編碼。以下的 ORC 範例將建立布隆過濾器,並僅對 favorite_color 使用字典編碼。Parquet 也有 parquet.bloom.filter.enabledparquet.enable.dictionary。若要找出有關額外 ORC/Parquet 選項的詳細資訊,請瀏覽官方 Apache ORC / Parquet 網站。

ORC 資料來源

df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .option("orc.column.encoding.direct", "name")
    .save("users_with_options.orc"))
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
usersDF.write().format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
在 Spark 回存區的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING ORC
OPTIONS (
  orc.bloom.filter.columns 'favorite_color',
  orc.dictionary.key.threshold '1.0',
  orc.column.encoding.direct 'name'
)

Parquet 資料來源

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet"))
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
usersDF.write.format("parquet")
  .option("parquet.bloom.filter.enabled#favorite_color", "true")
  .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
  .option("parquet.enable.dictionary", "true")
  .option("parquet.page.write-checksum.enabled", "false")
  .save("users_with_options.parquet")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
usersDF.write().format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
在 Spark 回存區的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
OPTIONS (
  `parquet.bloom.filter.enabled#favorite_color` true,
  `parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
  parquet.enable.dictionary true,
  parquet.page.write-checksum.enabled true
)

直接對檔案執行 SQL

除了使用讀取 API 將檔案載入 DataFrame 並查詢它之外,您也可以使用 SQL 直接查詢該檔案。

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
在 Spark 回存區的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。

儲存模式

儲存操作可以選擇採用 SaveMode,它會指定如何處理現有的資料(如果有的話)。了解這些儲存模式不使用任何鎖定且不是原子性的非常重要。此外,在執行 Overwrite 時,資料會在寫出新資料之前刪除。

Scala/Java任何語言意義
SaveMode.ErrorIfExists(預設) "error" 或 "errorifexists"(預設) 將 DataFrame 儲存到資料來源時,如果資料已存在,則預期會擲回例外。
SaveMode.Append "append" 將 DataFrame 儲存到資料來源時,如果資料/表格已存在,則預期 DataFrame 的內容會附加到現有資料。
SaveMode.Overwrite "overwrite" 覆寫模式表示將 DataFrame 儲存到資料來源時,如果資料/表格已存在,則預期現有資料會被 DataFrame 的內容覆寫。
SaveMode.Ignore "ignore" 忽略模式表示將 DataFrame 儲存到資料來源時,如果資料已存在,則預期儲存操作不會儲存 DataFrame 的內容,也不會變更現有資料。這類似於 SQL 中的 CREATE TABLE IF NOT EXISTS

儲存至持續性資料表

DataFrames 也可以使用 saveAsTable 指令儲存為 Hive Metastore 中的持續性表格。請注意,不需要現有的 Hive 部署來使用此功能。Spark 會為您建立預設的本機 Hive Metastore(使用 Derby)。與 createOrReplaceTempView 指令不同,saveAsTable 會實體化 DataFrame 的內容,並在 Hive Metastore 中建立指向資料的指標。只要您維持與同一個 Metastore 的連線,持續性表格即使在 Spark 程式重新啟動後也會繼續存在。持續性表格的 DataFrame 可以透過在 SparkSession 上呼叫 table 方法,並提供表格名稱來建立。

對於基於檔案的資料來源,例如文字、parquet、json 等,您可以透過 path 選項指定自訂表格路徑,例如 df.write.option("path", "/some/path").saveAsTable("t")。當表格刪除時,自訂表格路徑不會被移除,而表格資料仍然存在。如果未指定自訂表格路徑,Spark 會將資料寫入倉庫目錄下的預設表格路徑。當表格刪除時,預設表格路徑也會被移除。

從 Spark 2.1 開始,持續性資料來源表格會將每個分區的元資料儲存在 Hive 元資料儲存庫中。這帶來許多好處

請注意,在建立外部資料來源表格(那些具有 path 選項的表格)時,預設不會收集分區資訊。若要同步元資料儲存庫中的分區資訊,您可以呼叫 MSCK REPAIR TABLE

分桶、排序和分割

對於基於檔案的資料來源,也可以對輸出進行分桶、排序或分區。分桶和排序僅適用於持續性表格

df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
CREATE TABLE users_bucketed_by_name(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;

當使用資料集 API 時,分區可以用於 savesaveAsTable

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
CREATE TABLE users_by_favorite_color(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);

可以對單一表格同時使用分區和分桶

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("users_partitioned_bucketed"))
在 Spark 回存區的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")
在 Spark 回存區的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
usersDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed");
在 Spark 回存區的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
CREATE TABLE users_bucketed_and_partitioned(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;

partitionBy 會建立目錄結構,如 分區發現 區段所述。因此,它對具有高基數的欄位適用性有限。相較之下,bucketBy 會將資料分佈在固定數量的分桶中,且可用於唯一值數量無限時。