JDBC 連接其他資料庫
Spark SQL 也包含一個資料來源,可以使用 JDBC 從其他資料庫讀取資料。此功能應優先於使用 JdbcRDD。這是因為結果以 DataFrame 的形式傳回,而且可以在 Spark SQL 中輕鬆處理或與其他資料來源結合。JDBC 資料來源也更容易從 Java 或 Python 使用,因為它不需要使用者提供 ClassTag。(請注意,這與 Spark SQL JDBC 伺服器不同,後者允許其他應用程式使用 Spark SQL 執行查詢)。
要開始使用,您需要在 spark classpath 上包含特定資料庫的 JDBC 驅動程式。例如,要從 Spark Shell 連線到 postgres,您會執行下列指令
./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
資料來源選項
Spark 支援 JDBC 的下列不分大小寫選項。JDBC 的資料來源選項可透過下列方式設定
- DataFrameReader
DataFrameWriter
的
.option
/.options
方法
- CREATE TABLE USING DATA_SOURCE 中的
OPTIONS
子句
對於連線屬性,使用者可以在資料來源選項中指定 JDBC 連線屬性。user
和 password
通常提供為連線屬性,以登入資料來源。
屬性名稱 | 預設值 | 意義 | 範圍 |
---|---|---|---|
url |
(無) |
用於連線的 JDBC URL,格式為 jdbc:subprotocol:subname 。來源特定的連線屬性可以在 URL 中指定。例如,jdbc:postgresql://127.0.0.1/test?user=fred&password=secret
|
讀取/寫入 |
dbtable |
(無) |
應從中讀取或寫入的 JDBC 表格。請注意,在讀取路徑中使用它時,可以在 SQL 查詢的 FROM 子句中使用任何有效的內容。例如,除了完整表格之外,您還可以在括號中使用子查詢。不允許同時指定 dbtable 和 query 選項。
|
讀取/寫入 |
query |
(無) |
用於將資料讀取到 Spark 的查詢。指定的查詢將加上括號,並作為 FROM 子句中的子查詢使用。Spark 也會將別名指定給子查詢子句。舉例來說,Spark 會對 JDBC 來源發出下列形式的查詢。 從 (<user_specified_query>) spark_gen_alias 選擇 <columns> 在使用此選項時,以下有幾個限制。
|
讀取/寫入 |
prepareQuery |
(無) |
前置詞會與 query 一起形成最終查詢。由於指定的 query 會在 FROM 子句中括號作為子查詢,而某些資料庫不支援子查詢中的所有子句,因此 prepareQuery 屬性提供了一種執行此類複雜查詢的方法。例如,Spark 會對 JDBC 來源發出以下形式的查詢。<prepareQuery> 從 (<user_specified_query>) spark_gen_alias 選擇 <columns> 以下是一些範例。
|
讀取/寫入 |
driver |
(無) | 用於連線到此 URL 的 JDBC 驅動程式的類別名稱。 | 讀取/寫入 |
partitionColumn, lowerBound, upperBound |
(無) |
如果指定其中任何一個選項,則必須全部指定這些選項。此外,必須指定 numPartitions 。它們描述在從多個工作人員並行讀取時如何分割表格。partitionColumn 必須是問題表格中的數字、日期或時間戳記欄位。請注意,lowerBound 和 upperBound 僅用於決定分割步幅,而不是用於過濾表格中的列。因此,表格中的所有列都將被分割並傳回。此選項僅適用於讀取。範例
spark.read.format("jdbc")
|
read |
numPartitions |
(無) |
表格讀寫並行處理中可使用的最大分割數。這也決定了最大同時 JDBC 連線數。如果要寫入的分割數超過此限制,我們會在寫入前呼叫 coalesce(numPartitions) 將其減少到此限制。
|
讀取/寫入 |
queryTimeout |
0 |
驅動程式將等待 Statement 物件執行的秒數,直到給定的秒數。零表示沒有限制。在寫入路徑中,此選項取決於 JDBC 驅動程式如何實作 API setQueryTimeout ,例如 h2 JDBC 驅動程式會檢查每個查詢的逾時,而不是整個 JDBC 批次。
|
讀取/寫入 |
fetchsize |
0 |
JDBC 擷取大小,決定每回合要擷取多少列。這可以提升預設擷取大小較低的 JDBC 驅動程式的效能(例如,Oracle 為 10 列)。 | read |
batchsize |
1000 |
JDBC 批次大小,決定每回合要插入多少列。這可以提升 JDBC 驅動程式的效能。此選項僅適用於寫入。 | write |
isolationLevel |
READ_UNCOMMITTED |
交易隔離層級,適用於目前的連線。它可以是 NONE 、READ_COMMITTED 、READ_UNCOMMITTED 、REPEATABLE_READ 或 SERIALIZABLE 之一,對應於 JDBC 的 Connection 物件定義的標準交易隔離層級,預設為 READ_UNCOMMITTED 。請參閱 java.sql.Connection 中的文件。
|
write |
sessionInitStatement |
(無) |
在每個資料庫工作階段開啟到遠端資料庫並在開始讀取資料之前,此選項會執行自訂 SQL 陳述式(或 PL/SQL 區塊)。使用此選項來實作工作階段初始化程式碼。範例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
|
read |
截斷 |
false |
這是 JDBC 寫入器相關選項。當啟用 SaveMode.Overwrite 時,此選項會導致 Spark 截斷現有表格,而不是刪除並重新建立它。這可能會更有效率,並防止表格的元資料(例如索引)被移除。不過,在某些情況下它無法運作,例如當新資料有不同的架構時。在發生故障時,使用者應該關閉 truncate 選項,以再次使用 DROP TABLE 。此外,由於不同 DBMS 的 TRUNCATE TABLE 行為不同,因此使用它並不總是安全的。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支援此選項,而 PostgresDialect 和預設的 JDBCDirect 則不支援。對於未知且不支援的 JDBCDirect,使用者選項 truncate 會被忽略。
| write |
cascadeTruncate |
所討論的 JDBC 資料庫預設的串接式截斷行為,在每個 JDBCDialect 中的 isCascadeTruncate 中指定 |
這是 JDBC 寫入器相關選項。如果啟用且 JDBC 資料庫支援(目前為 PostgreSQL 和 Oracle),此選項允許執行 TRUNCATE TABLE t CASCADE (在 PostgreSQL 的情況下,會執行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截斷後代表格)。這將影響其他表格,因此應該小心使用。
|
write |
createTableOptions |
|
這是 JDBC 寫入器相關選項。如果指定,此選項允許在建立表格時設定特定於資料庫的表格和分割選項(例如 CREATE TABLE t (name string) ENGINE=InnoDB. )。
|
write |
createTableColumnTypes |
(無) |
在建立表格時,用來取代預設值的資料庫欄位資料類型。資料類型資訊應該以與 CREATE TABLE 欄位語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)") )。指定的類型應該是有效的 Spark SQL 資料類型。
|
write |
customSchema |
(無) |
用於從 JDBC 連接器讀取資料的自訂架構。例如,"id DECIMAL(38, 0), name STRING" 。您也可以指定部分欄位,而其他欄位則使用預設的類型對應。例如,"id DECIMAL(38, 0)" 。欄位名稱應該與 JDBC 表格的對應欄位名稱相同。使用者可以指定 Spark SQL 的對應資料類型,而不是使用預設值。
|
read |
pushDownPredicate |
true |
啟用或停用 JDBC 資料來源中的謂詞下推選項。預設值為 true,在此情況下,Spark 會盡可能將篩選器下推至 JDBC 資料來源。否則,如果設定為 false,不會將任何篩選器下推至 JDBC 資料來源,因此所有篩選器都將由 Spark 處理。當 Spark 執行謂詞篩選的速度比 JDBC 資料來源快時,通常會關閉謂詞下推。 | read |
pushDownAggregate |
true |
啟用或停用 V2 JDBC 資料來源中的彙總下推選項。預設值為 true,在此情況下,Spark 會將彙總下推至 JDBC 資料來源。否則,如果設定為 false,不會將彙總下推至 JDBC 資料來源。當 Spark 執行彙總的速度比 JDBC 資料來源快時,通常會關閉彙總下推。請注意,僅當所有彙總函數和相關篩選器都可以下推時,才能下推彙總。如果 numPartitions 等於 1 或群組依據金鑰與 partitionColumn 相同,Spark 會將彙總完全下推至資料來源,且不會套用最終彙總至資料來源輸出。否則,Spark 會套用最終彙總至資料來源輸出。
|
read |
pushDownLimit |
true |
啟用或停用 V2 JDBC 資料來源中的 LIMIT 下推選項。LIMIT 下推也包含 LIMIT + SORT,又稱為 Top N 算子。預設值為 true,在此情況下,Spark 會將 LIMIT 或 LIMIT 與 SORT 下推至 JDBC 資料來源。否則,如果設定為 false,不會將 LIMIT 或 LIMIT 與 SORT 下推至 JDBC 資料來源。如果 numPartitions 大於 1,即使已下推 LIMIT 或 LIMIT 與 SORT,Spark 仍會對來自資料來源的結果套用 LIMIT 或 LIMIT 與 SORT。否則,如果已下推 LIMIT 或 LIMIT 與 SORT,且 numPartitions 等於 1,Spark 就不會對來自資料來源的結果套用 LIMIT 或 LIMIT 與 SORT。
|
read |
pushDownOffset |
true |
啟用或停用 V2 JDBC 資料來源中的 OFFSET 下推選項。預設值為 true,在此情況下,Spark 會將 OFFSET 下推至 JDBC 資料來源。否則,如果設定為 false,Spark 就不會嘗試將 OFFSET 下推至 JDBC 資料來源。如果 pushDownOffset 為 true,且 numPartitions 等於 1,OFFSET 會下推至 JDBC 資料來源。否則,不會下推 OFFSET,且 Spark 仍會對來自資料來源的結果套用 OFFSET。
|
read |
pushDownTableSample |
true |
啟用或停用 TABLESAMPLE 推入 V2 JDBC 資料來源的選項。預設值為 true,在這種情況下,Spark 會將 TABLESAMPLE 推入 JDBC 資料來源。否則,如果將值設定為 false,TABLESAMPLE 將不會推入 JDBC 資料來源。 | read |
keytab |
(無) |
JDBC 客户端的 kerberos keytab 檔案位置(必須透過 spark-submit 的 --files 選項或手動預先上傳到所有節點)。如果找到路徑資訊,Spark 會將 keytab 視為手動分發,否則假設為 --files 。如果同時定義 keytab 和 principal ,Spark 會嘗試執行 kerberos 驗證。
|
讀取/寫入 |
principal |
(無) |
指定 JDBC 客户端的 kerberos 主體名稱。如果同時定義 keytab 和 principal ,Spark 會嘗試執行 kerberos 驗證。
|
讀取/寫入 |
refreshKrb5Config |
false |
此選項控制是否在建立新連線之前更新 JDBC 客户端的 kerberos 組態。如果您要更新組態,請設定為 true,否則設定為 false。預設值為 false。請注意,如果您將此選項設定為 true 並嘗試建立多個連線,可能會發生競爭條件。一個可能的情況如下。
|
讀取/寫入 |
connectionProvider |
(無) |
用於連線到此 URL 的 JDBC 連線提供者的名稱,例如 db2 、mssql 。必須是使用 JDBC 資料來源載入的其中一個提供者。當多個提供者可以處理指定的驅動程式和選項時,用於消除歧義。選取的提供者不得被 spark.sql.sources.disabledJdbcConnProviderList 停用。
|
讀取/寫入 |
preferTimestampNTZ |
false |
當選項設定為 true 時,所有時間戳記都推論為 TIMESTAMP WITHOUT TIME ZONE。否則,時間戳記會讀取為帶有當地時區的 TIMESTAMP。
|
read |
請注意,JDBC 驅動程式並不總是支援使用 keytab 進行 kerberos 驗證。
在使用 keytab
和 principal
組態選項之前,請確保符合下列需求
- 包含的 JDBC 驅動程式版本支援使用 keytab 進行 kerberos 驗證。
- 有一個內建連線提供者支援使用的資料庫。
下列資料庫有內建連線提供者
- DB2
- MariaDB
- MS Sql
- Oracle
- PostgreSQL
如果需求未達成,請考慮使用 JdbcConnectionProvider
開發人員 API 來處理自訂驗證。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
在 Spark repo 的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
在 Spark repo 的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
在 Spark repo 的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
在 Spark repo 的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
user 'username',
password 'password'
)
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable