CSV 檔案
Spark SQL 提供 spark.read().csv("file_name")
以將 CSV 格式的檔案或目錄讀入 Spark DataFrame,並提供 dataframe.write().csv("path")
以寫入 CSV 檔案。函數 option()
可用於自訂讀取或寫入的行為,例如控制標頭、分隔字元、字元集等行為。
# spark is from the previous example
sc = spark.sparkContext
# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"
df = spark.read.csv(path)
df.show()
# +------------------+
# | _c0|
# +------------------+
# | name;age;job|
# |Jorge;30;Developer|
# | Bob;32;Developer|
# +------------------+
# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# | _c0|_c1| _c2|
# +-----+---+---------+
# | name|age| job|
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age| job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)
# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# | _c0|
# +-----------+
# |238val_238|
# | 86val_86|
# |311val_311|
# | 27val_27|
# |165val_165|
# +-----------+
請在 Spark 儲存庫中的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"
val df = spark.read.csv(path)
df.show()
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
請在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";
Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");
// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
請在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
資料來源選項
CSV 的資料來源選項可透過下列方式設定:
- DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
的
.option
/.options
方法
- 以下內建函數:
from_csv
to_csv
schema_of_csv
OPTIONS
子句,請參閱 使用資料來源建立資料表
屬性名稱 | 預設值 | 意義 | 範圍 |
---|---|---|---|
sep |
, | 設定每個欄位和值的區隔符號。此區隔符號可以是一個或多個字元。 | 讀取/寫入 |
encoding |
UTF-8 | 對於讀取,使用指定的編碼類型對 CSV 檔案進行解碼。對於寫入,指定已儲存 CSV 檔案的編碼 (字元集)。CSV 內建函數會忽略此選項。 | 讀取/寫入 |
quote |
" | 設定用於跳脫引號值的單一字元,其中區隔符號可以是值的一部分。對於讀取,如果您想要關閉引號,您需要設定非 null 但為空字串。對於寫入,如果設定空字串,它會使用 u0000 (null 字元)。 |
讀取/寫入 |
quoteAll |
false | 一個旗標,表示是否應始終將所有值置於引號中。預設僅會跳脫包含引號字元的數值。 | 寫入 |
跳脫 |
\ | 設定用於跳脫已引號值中引號的單一字元。預設情況下,此功能已停用。 | 讀取/寫入 |
跳脫引號 |
true | 一個旗標,表示是否應始終將包含引號的數值置於引號中。預設會跳脫所有包含引號字元的數值。 | 寫入 |
註解 |
設定用於略過以這個字元開頭的行的單一字元。預設情況下,此功能已停用。 | 讀取 | |
標頭 |
false | 用於讀取時,將第一行當作欄位名稱。用於寫入時,將欄位名稱寫入為第一行。請注意,如果給定的路徑是字串的 RDD,則此標頭選項會移除所有與標頭相同的行(如果存在的話)。CSV 內建函式會忽略此選項。 | 讀取/寫入 |
推論架構 |
false | 自動從資料推論輸入架構。它需要額外執行一次資料傳遞。CSV 內建函式會忽略此選項。 | 讀取 |
偏好日期 |
true | 在架構推論期間 (inferSchema ),如果數值符合 dateFormat 選項或預設日期格式,則嘗試將包含日期的字串欄位推論為 Date 。對於包含日期和時間戳記混合的欄位,如果未指定時間戳記格式,則嘗試將它們推論為 TimestampType ;否則,將它們推論為 StringType 。 |
讀取 |
強制執行架構 |
true | 如果設定為 true ,指定的或推論的架構將強制套用至資料來源檔案,且會忽略 CSV 檔案中的標頭。如果將選項設定為 false ,則會根據 CSV 檔案中的所有標頭驗證架構(如果 header 選項設定為 true )。架構中的欄位名稱和 CSV 標頭中的欄位名稱會根據它們的位置(考量 spark.sql.caseSensitive )進行檢查。雖然預設值為 true,但建議停用 enforceSchema 選項,以避免產生不正確的結果。CSV 內建函式會忽略此選項。 |
讀取 |
忽略前導空白 |
false (用於讀取),true (用於寫入) |
一個旗標,表示是否應略過讀取/寫入值的開頭空白。 | 讀取/寫入 |
忽略尾隨空白 |
false (用於讀取),true (用於寫入) |
一個旗標,表示是否應略過讀取/寫入值的尾隨空白。 | 讀取/寫入 |
空值 |
設定空值的字串表示法。自 2.0.1 版起,此 nullValue 參數適用於所有受支援的類型,包括字串類型。 |
讀取/寫入 | |
非數字值 |
NaN | 設定非數字值的字串表示法。 | 讀取 |
positiveInf |
Inf | 設定正無限大值的字串表示。 | 讀取 |
negativeInf |
-Inf | 設定負無限大值的字串表示。 | 讀取 |
dateFormat |
yyyy-MM-dd | 設定表示日期格式的字串。自訂日期格式遵循 日期時間模式 中的格式。這適用於日期類型。 | 讀取/寫入 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] | 設定表示時間戳記格式的字串。自訂日期格式遵循 日期時間模式 中的格式。這適用於時間戳記類型。 | 讀取/寫入 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 設定表示不含時區的時間戳記格式的字串。自訂日期格式遵循 日期時間模式 中的格式。這適用於不含時區的時間戳記類型,請注意,在寫入或讀取此資料類型時,不支援時區偏移和時區元件。 | 讀取/寫入 |
enableDateTimeParsingFallback |
如果時間解析器原則有舊有設定,或未提供自訂日期或時間戳記模式,則啟用。 | 允許在值與設定模式不符時,採用與舊版相容(Spark 1.x 和 2.0)的日期和時間戳記解析行為。 | 讀取 |
maxColumns |
20480 | 定義記錄可包含的欄位數的上限。 | 讀取 |
maxCharsPerColumn |
-1 | 定義允許在讀取的任何特定值中出現的最大字元數。預設為 -1,表示長度不限 | 讀取 |
mode |
PERMISSIVE | 允許在解析期間處理損毀記錄的模式。它支援下列不分大小寫的模式。請注意,Spark 嘗試在欄位修剪下只解析 CSV 中的必要欄位。因此,損毀記錄可能根據必要的欄位組而有所不同。此行為可由 spark.sql.csv.parser.columnPruning.enabled (預設啟用)控制。
|
讀取 |
columnNameOfCorruptRecord |
(spark.sql.columnNameOfCorruptRecord 設定的值) |
允許重新命名由 PERMISSIVE 模式建立的,包含格式錯誤字串的新欄位。這會覆寫 spark.sql.columnNameOfCorruptRecord 。 |
讀取 |
multiLine |
false | 每個檔案解析一個記錄,該記錄可能跨越多行。CSV 內建函式會忽略此選項。 | 讀取 |
charToEscapeQuoteEscaping |
escape 或 \0 |
設定用於轉譯引號字元轉譯的單一字元。預設值為轉譯字元(當轉譯字元和引號字元不同時),否則為 \0 。 |
讀取/寫入 |
samplingRatio |
1.0 | 定義用於推論架構的行數分數。CSV 內建函式會忽略此選項。 | 讀取 |
emptyValue |
(用於讀取)、"" (用於寫入) |
設定空值的字串表示形式。 | 讀取/寫入 |
locale |
en-US | 設定 IETF BCP 47 格式的語言標籤作為地區設定。例如,這用於解析日期和時間戳記時。 | 讀取 |
lineSep |
\r 、\r\n 和 \n (用於讀取)、\n (用於寫入) |
定義用於解析/寫入的行分隔符號。最大長度為 1 個字元。CSV 內建函式會忽略此選項。 | 讀取/寫入 |
unescapedQuoteHandling |
STOP_AT_DELIMITER | 定義 CsvParser 如何處理未轉譯引號的值。
|
讀取 |
壓縮 |
(無) | 儲存到檔案時要使用的壓縮編解碼器。這可以是已知的非大小寫縮寫名稱之一(none 、bzip2 、gzip 、lz4 、snappy 和 deflate )。CSV 內建函式會忽略此選項。 |
寫入 |
其他一般選項可以在 一般檔案來源選項 中找到。