JDBC 連接其他資料庫

Spark SQL 也包含一個資料來源,可以使用 JDBC 從其他資料庫讀取資料。此功能應優先於使用 JdbcRDD。這是因為結果以 DataFrame 的形式傳回,而且可以在 Spark SQL 中輕鬆處理或與其他資料來源結合。JDBC 資料來源也更容易從 Java 或 Python 使用,因為它不需要使用者提供 ClassTag。(請注意,這與 Spark SQL JDBC 伺服器不同,後者允許其他應用程式使用 Spark SQL 執行查詢)。

要開始使用,您需要在 spark classpath 上包含特定資料庫的 JDBC 驅動程式。例如,要從 Spark Shell 連線到 postgres,您會執行下列指令

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

資料來源選項

Spark 支援 JDBC 的下列不分大小寫選項。JDBC 的資料來源選項可透過下列方式設定

對於連線屬性,使用者可以在資料來源選項中指定 JDBC 連線屬性。userpassword 通常提供為連線屬性,以登入資料來源。

屬性名稱預設值意義範圍
url (無) 用於連線的 JDBC URL,格式為 jdbc:subprotocol:subname。來源特定的連線屬性可以在 URL 中指定。例如,jdbc:postgresql://127.0.0.1/test?user=fred&password=secret 讀取/寫入
dbtable (無) 應從中讀取或寫入的 JDBC 表格。請注意,在讀取路徑中使用它時,可以在 SQL 查詢的 FROM 子句中使用任何有效的內容。例如,除了完整表格之外,您還可以在括號中使用子查詢。不允許同時指定 dbtablequery 選項。 讀取/寫入
query (無) 用於將資料讀取到 Spark 的查詢。指定的查詢將加上括號,並作為 FROM 子句中的子查詢使用。Spark 也會將別名指定給子查詢子句。舉例來說,Spark 會對 JDBC 來源發出下列形式的查詢。

從 (<user_specified_query>) spark_gen_alias 選擇 <columns>

在使用此選項時,以下有幾個限制。
  1. 不允許同時指定 dbtablequery 選項。
  2. 不允許同時指定 querypartitionColumn 選項。當需要指定 partitionColumn 選項時,可以使用 dbtable 選項指定子查詢,而分割欄位可以使用 dbtable 中提供的子查詢別名進行限定。
    範例
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
讀取/寫入
prepareQuery (無) 前置詞會與 query 一起形成最終查詢。由於指定的 query 會在 FROM 子句中括號作為子查詢,而某些資料庫不支援子查詢中的所有子句,因此 prepareQuery 屬性提供了一種執行此類複雜查詢的方法。例如,Spark 會對 JDBC 來源發出以下形式的查詢。

<prepareQuery> 從 (<user_specified_query>) spark_gen_alias 選擇 <columns>

以下是一些範例。
  1. MSSQL Server 不接受子查詢中的 WITH 子句,但可以將此類查詢拆分為 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "WITH t AS (SELECT x, y FROM tbl)")
    .option("query", "SELECT * FROM t WHERE x > 10")
    .load()
  2. MSSQL Server 不接受子查詢中的臨時表格子句,但可以將此類查詢拆分為 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
    .option("query", "SELECT * FROM #TempTable")
    .load()
讀取/寫入
driver (無) 用於連線到此 URL 的 JDBC 驅動程式的類別名稱。 讀取/寫入
partitionColumn, lowerBound, upperBound (無) 如果指定其中任何一個選項,則必須全部指定這些選項。此外,必須指定 numPartitions。它們描述在從多個工作人員並行讀取時如何分割表格。partitionColumn 必須是問題表格中的數字、日期或時間戳記欄位。請注意,lowerBoundupperBound 僅用於決定分割步幅,而不是用於過濾表格中的列。因此,表格中的所有列都將被分割並傳回。此選項僅適用於讀取。
範例
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
read
numPartitions (無) 表格讀寫並行處理中可使用的最大分割數。這也決定了最大同時 JDBC 連線數。如果要寫入的分割數超過此限制,我們會在寫入前呼叫 coalesce(numPartitions) 將其減少到此限制。 讀取/寫入
queryTimeout 0 驅動程式將等待 Statement 物件執行的秒數,直到給定的秒數。零表示沒有限制。在寫入路徑中,此選項取決於 JDBC 驅動程式如何實作 API setQueryTimeout,例如 h2 JDBC 驅動程式會檢查每個查詢的逾時,而不是整個 JDBC 批次。 讀取/寫入
fetchsize 0 JDBC 擷取大小,決定每回合要擷取多少列。這可以提升預設擷取大小較低的 JDBC 驅動程式的效能(例如,Oracle 為 10 列)。 read
batchsize 1000 JDBC 批次大小,決定每回合要插入多少列。這可以提升 JDBC 驅動程式的效能。此選項僅適用於寫入。 write
isolationLevel READ_UNCOMMITTED 交易隔離層級,適用於目前的連線。它可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READSERIALIZABLE 之一,對應於 JDBC 的 Connection 物件定義的標準交易隔離層級,預設為 READ_UNCOMMITTED。請參閱 java.sql.Connection 中的文件。 write
sessionInitStatement (無) 在每個資料庫工作階段開啟到遠端資料庫並在開始讀取資料之前,此選項會執行自訂 SQL 陳述式(或 PL/SQL 區塊)。使用此選項來實作工作階段初始化程式碼。範例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") read
截斷 false 這是 JDBC 寫入器相關選項。當啟用 SaveMode.Overwrite 時,此選項會導致 Spark 截斷現有表格,而不是刪除並重新建立它。這可能會更有效率,並防止表格的元資料(例如索引)被移除。不過,在某些情況下它無法運作,例如當新資料有不同的架構時。在發生故障時,使用者應該關閉 truncate 選項,以再次使用 DROP TABLE。此外,由於不同 DBMS 的 TRUNCATE TABLE 行為不同,因此使用它並不總是安全的。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支援此選項,而 PostgresDialect 和預設的 JDBCDirect 則不支援。對於未知且不支援的 JDBCDirect,使用者選項 truncate 會被忽略。 write
cascadeTruncate 所討論的 JDBC 資料庫預設的串接式截斷行為,在每個 JDBCDialect 中的 isCascadeTruncate 中指定 這是 JDBC 寫入器相關選項。如果啟用且 JDBC 資料庫支援(目前為 PostgreSQL 和 Oracle),此選項允許執行 TRUNCATE TABLE t CASCADE(在 PostgreSQL 的情況下,會執行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截斷後代表格)。這將影響其他表格,因此應該小心使用。 write
createTableOptions 這是 JDBC 寫入器相關選項。如果指定,此選項允許在建立表格時設定特定於資料庫的表格和分割選項(例如 CREATE TABLE t (name string) ENGINE=InnoDB.)。 write
createTableColumnTypes (無) 在建立表格時,用來取代預設值的資料庫欄位資料類型。資料類型資訊應該以與 CREATE TABLE 欄位語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)"))。指定的類型應該是有效的 Spark SQL 資料類型。 write
customSchema (無) 用於從 JDBC 連接器讀取資料的自訂架構。例如,"id DECIMAL(38, 0), name STRING"。您也可以指定部分欄位,而其他欄位則使用預設的類型對應。例如,"id DECIMAL(38, 0)"。欄位名稱應該與 JDBC 表格的對應欄位名稱相同。使用者可以指定 Spark SQL 的對應資料類型,而不是使用預設值。 read
pushDownPredicate true 啟用或停用 JDBC 資料來源中的謂詞下推選項。預設值為 true,在此情況下,Spark 會盡可能將篩選器下推至 JDBC 資料來源。否則,如果設定為 false,不會將任何篩選器下推至 JDBC 資料來源,因此所有篩選器都將由 Spark 處理。當 Spark 執行謂詞篩選的速度比 JDBC 資料來源快時,通常會關閉謂詞下推。 read
pushDownAggregate true 啟用或停用 V2 JDBC 資料來源中的彙總下推選項。預設值為 true,在此情況下,Spark 會將彙總下推至 JDBC 資料來源。否則,如果設定為 false,不會將彙總下推至 JDBC 資料來源。當 Spark 執行彙總的速度比 JDBC 資料來源快時,通常會關閉彙總下推。請注意,僅當所有彙總函數和相關篩選器都可以下推時,才能下推彙總。如果 numPartitions 等於 1 或群組依據金鑰與 partitionColumn 相同,Spark 會將彙總完全下推至資料來源,且不會套用最終彙總至資料來源輸出。否則,Spark 會套用最終彙總至資料來源輸出。 read
pushDownLimit true 啟用或停用 V2 JDBC 資料來源中的 LIMIT 下推選項。LIMIT 下推也包含 LIMIT + SORT,又稱為 Top N 算子。預設值為 true,在此情況下,Spark 會將 LIMIT 或 LIMIT 與 SORT 下推至 JDBC 資料來源。否則,如果設定為 false,不會將 LIMIT 或 LIMIT 與 SORT 下推至 JDBC 資料來源。如果 numPartitions 大於 1,即使已下推 LIMIT 或 LIMIT 與 SORT,Spark 仍會對來自資料來源的結果套用 LIMIT 或 LIMIT 與 SORT。否則,如果已下推 LIMIT 或 LIMIT 與 SORT,且 numPartitions 等於 1,Spark 就不會對來自資料來源的結果套用 LIMIT 或 LIMIT 與 SORT。 read
pushDownOffset true 啟用或停用 V2 JDBC 資料來源中的 OFFSET 下推選項。預設值為 true,在此情況下,Spark 會將 OFFSET 下推至 JDBC 資料來源。否則,如果設定為 false,Spark 就不會嘗試將 OFFSET 下推至 JDBC 資料來源。如果 pushDownOffset 為 true,且 numPartitions 等於 1,OFFSET 會下推至 JDBC 資料來源。否則,不會下推 OFFSET,且 Spark 仍會對來自資料來源的結果套用 OFFSET。 read
pushDownTableSample true 啟用或停用 TABLESAMPLE 推入 V2 JDBC 資料來源的選項。預設值為 true,在這種情況下,Spark 會將 TABLESAMPLE 推入 JDBC 資料來源。否則,如果將值設定為 false,TABLESAMPLE 將不會推入 JDBC 資料來源。 read
keytab (無) JDBC 客户端的 kerberos keytab 檔案位置(必須透過 spark-submit 的 --files 選項或手動預先上傳到所有節點)。如果找到路徑資訊,Spark 會將 keytab 視為手動分發,否則假設為 --files。如果同時定義 keytabprincipal,Spark 會嘗試執行 kerberos 驗證。 讀取/寫入
principal (無) 指定 JDBC 客户端的 kerberos 主體名稱。如果同時定義 keytabprincipal,Spark 會嘗試執行 kerberos 驗證。 讀取/寫入
refreshKrb5Config false 此選項控制是否在建立新連線之前更新 JDBC 客户端的 kerberos 組態。如果您要更新組態,請設定為 true,否則設定為 false。預設值為 false。請注意,如果您將此選項設定為 true 並嘗試建立多個連線,可能會發生競爭條件。一個可能的情況如下。
  1. refreshKrb5Config 旗標已設定為安全性內容 1
  2. JDBC 連線提供者用於對應的 DBMS
  3. krb5.conf 已修改,但 JVM 尚未意識到必須重新載入它
  4. Spark 已針對安全性內容 1 成功驗證
  5. JVM 從已修改的 krb5.conf 載入安全性內容 2
  6. Spark 還原先前儲存的安全性內容 1
  7. 已修改的 krb5.conf 內容已消失
讀取/寫入
connectionProvider (無) 用於連線到此 URL 的 JDBC 連線提供者的名稱,例如 db2mssql。必須是使用 JDBC 資料來源載入的其中一個提供者。當多個提供者可以處理指定的驅動程式和選項時,用於消除歧義。選取的提供者不得被 spark.sql.sources.disabledJdbcConnProviderList 停用。 讀取/寫入
preferTimestampNTZ false 當選項設定為 true 時,所有時間戳記都推論為 TIMESTAMP WITHOUT TIME ZONE。否則,時間戳記會讀取為帶有當地時區的 TIMESTAMP。 read

請注意,JDBC 驅動程式並不總是支援使用 keytab 進行 kerberos 驗證。
在使用 keytabprincipal 組態選項之前,請確保符合下列需求

下列資料庫有內建連線提供者

如果需求未達成,請考慮使用 JdbcConnectionProvider 開發人員 API 來處理自訂驗證。

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
在 Spark repo 的「examples/src/main/python/sql/datasource.py」中尋找完整的範例程式碼。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
在 Spark repo 的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」中尋找完整的範例程式碼。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
在 Spark repo 的「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」中尋找完整的範例程式碼。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
在 Spark repo 的「examples/src/main/r/RSparkSQLExample.R」中尋找完整的範例程式碼。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable