CSV 檔案

Spark SQL 提供 spark.read().csv("file_name") 以將 CSV 格式的檔案或目錄讀入 Spark DataFrame,並提供 dataframe.write().csv("path") 以寫入 CSV 檔案。函數 option() 可用於自訂讀取或寫入的行為,例如控制標頭、分隔字元、字元集等行為。

# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
請在 Spark 儲存庫中的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"

val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
請在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";

Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");

// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
請在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。

資料來源選項

CSV 的資料來源選項可透過下列方式設定:

屬性名稱預設值意義範圍
sep , 設定每個欄位和值的區隔符號。此區隔符號可以是一個或多個字元。 讀取/寫入
encoding UTF-8 對於讀取,使用指定的編碼類型對 CSV 檔案進行解碼。對於寫入,指定已儲存 CSV 檔案的編碼 (字元集)。CSV 內建函數會忽略此選項。 讀取/寫入
quote " 設定用於跳脫引號值的單一字元,其中區隔符號可以是值的一部分。對於讀取,如果您想要關閉引號,您需要設定非 null 但為空字串。對於寫入,如果設定空字串,它會使用 u0000 (null 字元)。 讀取/寫入
quoteAll false 一個旗標,表示是否應始終將所有值置於引號中。預設僅會跳脫包含引號字元的數值。 寫入
跳脫 \ 設定用於跳脫已引號值中引號的單一字元。預設情況下,此功能已停用。 讀取/寫入
跳脫引號 true 一個旗標,表示是否應始終將包含引號的數值置於引號中。預設會跳脫所有包含引號字元的數值。 寫入
註解 設定用於略過以這個字元開頭的行的單一字元。預設情況下,此功能已停用。 讀取
標頭 false 用於讀取時,將第一行當作欄位名稱。用於寫入時,將欄位名稱寫入為第一行。請注意,如果給定的路徑是字串的 RDD,則此標頭選項會移除所有與標頭相同的行(如果存在的話)。CSV 內建函式會忽略此選項。 讀取/寫入
推論架構 false 自動從資料推論輸入架構。它需要額外執行一次資料傳遞。CSV 內建函式會忽略此選項。 讀取
偏好日期 true 在架構推論期間 (inferSchema),如果數值符合 dateFormat 選項或預設日期格式,則嘗試將包含日期的字串欄位推論為 Date。對於包含日期和時間戳記混合的欄位,如果未指定時間戳記格式,則嘗試將它們推論為 TimestampType;否則,將它們推論為 StringType 讀取
強制執行架構 true 如果設定為 true,指定的或推論的架構將強制套用至資料來源檔案,且會忽略 CSV 檔案中的標頭。如果將選項設定為 false,則會根據 CSV 檔案中的所有標頭驗證架構(如果 header 選項設定為 true)。架構中的欄位名稱和 CSV 標頭中的欄位名稱會根據它們的位置(考量 spark.sql.caseSensitive)進行檢查。雖然預設值為 true,但建議停用 enforceSchema 選項,以避免產生不正確的結果。CSV 內建函式會忽略此選項。 讀取
忽略前導空白 false(用於讀取),true(用於寫入) 一個旗標,表示是否應略過讀取/寫入值的開頭空白。 讀取/寫入
忽略尾隨空白 false(用於讀取),true(用於寫入) 一個旗標,表示是否應略過讀取/寫入值的尾隨空白。 讀取/寫入
空值 設定空值的字串表示法。自 2.0.1 版起,此 nullValue 參數適用於所有受支援的類型,包括字串類型。 讀取/寫入
非數字值 NaN 設定非數字值的字串表示法。 讀取
positiveInf Inf 設定正無限大值的字串表示。 讀取
negativeInf -Inf 設定負無限大值的字串表示。 讀取
dateFormat yyyy-MM-dd 設定表示日期格式的字串。自訂日期格式遵循 日期時間模式 中的格式。這適用於日期類型。 讀取/寫入
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 設定表示時間戳記格式的字串。自訂日期格式遵循 日期時間模式 中的格式。這適用於時間戳記類型。 讀取/寫入
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 設定表示不含時區的時間戳記格式的字串。自訂日期格式遵循 日期時間模式 中的格式。這適用於不含時區的時間戳記類型,請注意,在寫入或讀取此資料類型時,不支援時區偏移和時區元件。 讀取/寫入
enableDateTimeParsingFallback 如果時間解析器原則有舊有設定,或未提供自訂日期或時間戳記模式,則啟用。 允許在值與設定模式不符時,採用與舊版相容(Spark 1.x 和 2.0)的日期和時間戳記解析行為。 讀取
maxColumns 20480 定義記錄可包含的欄位數的上限。 讀取
maxCharsPerColumn -1 定義允許在讀取的任何特定值中出現的最大字元數。預設為 -1,表示長度不限 讀取
mode PERMISSIVE 允許在解析期間處理損毀記錄的模式。它支援下列不分大小寫的模式。請注意,Spark 嘗試在欄位修剪下只解析 CSV 中的必要欄位。因此,損毀記錄可能根據必要的欄位組而有所不同。此行為可由 spark.sql.csv.parser.columnPruning.enabled(預設啟用)控制。
  • PERMISSIVE:當遇到損毀記錄時,會將格式錯誤的字串放入由 columnNameOfCorruptRecord 設定的欄位,並將格式錯誤的欄位設定為 null。若要保留損毀記錄,使用者可以在使用者定義的架構中設定名為 columnNameOfCorruptRecord 的字串類型欄位。如果架構沒有該欄位,它會在解析期間捨棄損毀記錄。對於 CSV 來說,令牌少於/多於架構的記錄並非損毀記錄。當它遇到令牌少於架構長度的記錄時,會將 null 設定為額外的欄位。當記錄的令牌多於架構長度時,它會捨棄額外的令牌。
  • DROPMALFORMED:忽略整個損毀記錄。CSV 內建函式不支援此模式。
  • FAILFAST:當遇到損毀記錄時,會擲回例外狀況。
讀取
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord 設定的值) 允許重新命名由 PERMISSIVE 模式建立的,包含格式錯誤字串的新欄位。這會覆寫 spark.sql.columnNameOfCorruptRecord 讀取
multiLine false 每個檔案解析一個記錄,該記錄可能跨越多行。CSV 內建函式會忽略此選項。 讀取
charToEscapeQuoteEscaping escape\0 設定用於轉譯引號字元轉譯的單一字元。預設值為轉譯字元(當轉譯字元和引號字元不同時),否則為 \0 讀取/寫入
samplingRatio 1.0 定義用於推論架構的行數分數。CSV 內建函式會忽略此選項。 讀取
emptyValue (用於讀取)、""(用於寫入) 設定空值的字串表示形式。 讀取/寫入
locale en-US 設定 IETF BCP 47 格式的語言標籤作為地區設定。例如,這用於解析日期和時間戳記時。 讀取
lineSep \r\r\n\n(用於讀取)、\n(用於寫入) 定義用於解析/寫入的行分隔符號。最大長度為 1 個字元。CSV 內建函式會忽略此選項。 讀取/寫入
unescapedQuoteHandling STOP_AT_DELIMITER 定義 CsvParser 如何處理未轉譯引號的值。
  • STOP_AT_CLOSING_QUOTE:如果在輸入中找到未轉譯引號,請累積引號字元,並繼續將值解析為引號值,直到找到結束引號。
  • BACK_TO_DELIMITER:如果在輸入中找到未轉譯引號,請將值視為未引號值。這將使解析器累積目前已解析值的全部字元,直到找到分隔符號。如果在值中找不到分隔符號,解析器將繼續累積輸入中的字元,直到找到分隔符號或行尾。
  • STOP_AT_DELIMITER:如果在輸入中找到未轉譯引號,請將值視為未引號值。這將使解析器累積全部字元,直到在輸入中找到分隔符號或行尾。
  • SKIP_VALUE:如果在輸入中找到未跳脫的引號,將會略過為指定值解析的內容,並產生設定在 nullValue 中的值。
  • RAISE_ERROR:如果在輸入中找到未跳脫的引號,將會擲回 TextParsingException。
讀取
壓縮 (無) 儲存到檔案時要使用的壓縮編解碼器。這可以是已知的非大小寫縮寫名稱之一(nonebzip2gziplz4snappydeflate)。CSV 內建函式會忽略此選項。 寫入

其他一般選項可以在 一般檔案來源選項 中找到。