通用檔案來源選項
這些一般選項/組態僅在使用基於檔案的來源時有效:parquet、orc、avro、json、csv、text。
請注意,以下範例中使用的目錄階層為
dir1/
├── dir2/
│ └── file2.parquet (schema: <file: string>, content: "file2.parquet")
└── file1.parquet (schema: <file, string>, content: "file1.parquet")
└── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")
忽略損毀檔案
Spark 允許您使用組態 spark.sql.files.ignoreCorruptFiles
或資料來源選項 ignoreCorruptFiles
,在從檔案讀取資料時忽略損毀檔案。當設定為 true 時,Spark 工作會在遇到損毀檔案時繼續執行,且已讀取的內容仍會傳回。
若要在讀取資料檔案時忽略損毀檔案,您可以使用
# enable ignore corrupt files via the data source option
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\
.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df0.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
# enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df1.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
// enable ignore corrupt files via the data source option
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF0 = spark.read.option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF0.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF1 = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF1.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// enable ignore corrupt files via the data source option
// dir1/file3.json is corrupt from parquet's view
Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF0.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir1/file3.json is corrupt from parquet's view
Dataset<Row> testCorruptDF1 = spark.read().parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF1.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
# enable ignore corrupt files via the data source option
# dir1/file3.json is corrupt from parquet's view
testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true")
head(testCorruptDF0)
# file
# 1 file1.parquet
# 2 file2.parquet
# enable ignore corrupt files via the configuration
sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"))
head(testCorruptDF1)
# file
# 1 file1.parquet
# 2 file2.parquet
忽略遺失檔案
Spark 允許您使用組態 spark.sql.files.ignoreMissingFiles
或資料來源選項 ignoreMissingFiles
,在從檔案讀取資料時忽略遺失檔案。在此,遺失檔案實際上是指在建構 DataFrame
之後,目錄下已刪除的檔案。當設定為 true 時,Spark 工作會在遇到遺失檔案時繼續執行,且已讀取的內容仍會傳回。
路徑 Glob 篩選器
pathGlobFilter
用於僅包含檔案名稱與模式相符的檔案。語法遵循 org.apache.hadoop.fs.GlobFilter
。它不會變更分割區偵測的行為。
若要載入路徑與給定 glob 模式相符的檔案,同時保留分割區偵測的行為,您可以使用
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", pathGlobFilter="*.parquet")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1");
testGlobFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*.parquet")
# file
# 1 file1.parquet
遞迴檔案查詢
recursiveFileLookup
用於遞迴載入檔案,並且會停用分割區推論。其預設值為 false
。如果資料來源在 recursiveFileLookup
為 true 時明確指定 partitionSpec
,則會擲回例外。
若要遞迴載入所有檔案,您可以使用
recursive_loaded_df = spark.read.format("parquet")\
.option("recursiveFileLookup", "true")\
.load("examples/src/main/resources/dir1")
recursive_loaded_df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
val recursiveLoadedDF = spark.read.format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1");
recursiveLoadedDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
recursiveLoadedDF <- read.df("examples/src/main/resources/dir1", "parquet", recursiveFileLookup = "true")
head(recursiveLoadedDF)
# file
# 1 file1.parquet
# 2 file2.parquet
修改時間路徑篩選器
modifiedBefore
和 modifiedAfter
是可以一起或分開套用的選項,以便在 Spark 批次查詢期間載入哪些檔案時能有更精細的控制。(請注意,結構化串流檔案來源不支援這些選項。)
modifiedBefore
:一個選用的時間戳記,用於僅包含修改時間出現在指定時間之前的檔案。提供的時間戳記必須採用下列格式:YYYY-MM-DDTHH:mm:ss(例如 2020-06-01T13:00:00)modifiedAfter
:一個選用的時間戳記,用於僅包含修改時間出現在指定時間之後的檔案。提供的時間戳記必須採用下列格式:YYYY-MM-DDTHH:mm:ss(例如 2020-06-01T13:00:00)
如果未提供時區選項,則會根據 Spark 會話時區 (spark.sql.session.timeZone
) 來詮釋時間戳記。
若要載入路徑與給定修改時間範圍相符的檔案,您可以使用
# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
# Only load files modified after 06/01/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# +-------------+
val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// Only load files modified before 7/1/2020 at 05:30
.option("modifiedBefore", "2020-07-01T05:30:00")
// Only load files modified after 6/1/2020 at 05:30
.option("modifiedAfter", "2020-06-01T05:30:00")
// Interpret both times above relative to CST timezone
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
# file
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
# file