Hive 表格
Spark SQL 也支援讀取和寫入儲存在 Apache Hive 中的資料。然而,由於 Hive 有大量的相依性,因此這些相依性未包含在預設的 Spark 發行版中。如果可以在類別路徑中找到 Hive 相依性,Spark 會自動載入它們。請注意,這些 Hive 相依性也必須存在於所有工作節點中,因為它們需要存取 Hive 序列化和反序列化函式庫 (SerDes) 才能存取儲存在 Hive 中的資料。
Hive 的組態是透過將 hive-site.xml
、core-site.xml
(用於安全性組態) 和 hdfs-site.xml
(用於 HDFS 組態) 檔案放置在 conf/
中來完成的。
在使用 Hive 時,必須實例化支援 Hive 的 SparkSession
,包括連線至持續性 Hive metastore、支援 Hive serdes 和 Hive 使用者定義函式。尚未部署 Hive 的使用者仍然可以啟用 Hive 支援。當未由 hive-site.xml
組態時,內容會自動在目前目錄中建立 metastore_db
,並建立由 spark.sql.warehouse.dir
組態的目錄,預設為 Spark 應用程式啟動時目前目錄中的 spark-warehouse
目錄。請注意,自 Spark 2.0.0 起,hive-site.xml
中的 hive.metastore.warehouse.dir
屬性已過時。請改用 spark.sql.warehouse.dir
來指定資料庫在倉庫中的預設位置。您可能需要授予啟動 Spark 應用程式的使用者寫入權限。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.
// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...
在使用 Hive 時,必須實例化支援 Hive 的 SparkSession
。這會新增支援在 MetaStore 中尋找資料表,以及使用 HiveQL 編寫查詢。
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
指定 Hive 資料表的儲存格式
當您建立 Hive 資料表時,您需要定義此資料表如何從檔案系統讀取/寫入資料,也就是「輸入格式」和「輸出格式」。您也需要定義此資料表如何將資料反序列化成列,或將列序列化成資料,也就是「serde」。下列選項可用於指定儲存格式(「serde」、「輸入格式」、「輸出格式」),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
。預設情況下,我們會將資料表檔案讀取為純文字。請注意,建立資料表時尚未支援 Hive 儲存處理常式,您可以在 Hive 端使用儲存處理常式建立資料表,並使用 Spark SQL 讀取它。
屬性名稱 | 意義 |
---|---|
fileFormat |
fileFormat 是一種儲存格式規格套件,包含「serde」、「輸入格式」和「輸出格式」。目前我們支援 6 種 fileFormat:「sequencefile」、「rcfile」、「orc」、「parquet」、「textfile」和「avro」。 |
inputFormat, outputFormat |
這 2 個選項指定對應的 InputFormat 和 OutputFormat 類別的名稱,為字串文字,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 。這 2 個選項必須成對出現,如果您已指定 fileFormat 選項,則無法指定它們。
|
serde |
此選項指定 serde 類別的名稱。當指定 fileFormat 選項時,如果指定的 fileFormat 已包含 serde 的資訊,則不要指定此選項。目前「sequencefile」、「textfile」和「rcfile」不包含 serde 資訊,您可以將此選項與這 3 種 fileFormat 搭配使用。
|
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim |
這些選項只能與「textfile」fileFormat 搭配使用。它們定義如何將分隔檔案讀取成列。 |
使用 OPTIONS
定義的所有其他屬性都將視為 Hive serde 屬性。
與不同版本的 Hive Metastore 互動
Spark SQL 對 Hive 支援最重要的部分之一就是與 Hive metastore 的互動,這讓 Spark SQL 能夠存取 Hive 資料表的元資料。從 Spark 1.4.0 開始,單一 Spark SQL 的二進位組建可以用來查詢不同版本的 Hive metastore,使用以下所述的組態。請注意,無論用來與 metastore 通訊的 Hive 版本為何,Spark SQL 內部會針對內建的 Hive 編譯,並使用這些類別進行內部執行(serdes、UDF、UDAF 等)。
下列選項可用於組態用來擷取元資料的 Hive 版本
屬性名稱 | 預設 | 意義 | 自版本 |
---|---|---|---|
spark.sql.hive.metastore.version |
2.3.9 |
Hive metastore 的版本。可用的選項從 0.12.0 到 2.3.9 ,以及從 3.0.0 到 3.1.3 。
|
1.4.0 |
spark.sql.hive.metastore.jars |
內建 |
用來實例化 HiveMetastoreClient 的 jar 的位置。此屬性可以是下列四個選項之一
-Phive 時,會與 Spark 組件捆綁在一起。當選擇此選項時,spark.sql.hive.metastore.version 必須是 2.3.9 或未定義。
spark.sql.hive.metastore.jars.path 組態的 Hive jar。支援本機或遠端路徑。提供的 jar 應該與 spark.sql.hive.metastore.version 相同版本。
|
1.4.0 |
spark.sql.hive.metastore.jars.path |
(空白) |
用來實例化 HiveMetastoreClient 的 jar 的逗號分隔路徑。此組態僅在 spark.sql.hive.metastore.jars 設定為 path 時才有用。
路徑可以是下列任何格式
|
3.1.0 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一個以逗號分隔的類別前綴清單,應使用 Spark SQL 與特定版本的 Hive 共用的類別載入器載入。需要共用的類別範例是與儲存庫對話所需的 JDBC 驅動程式。其他需要共用的類別是與已共用的類別互動的類別。例如,log4j 使用的自訂附加程式。 |
1.4.0 |
spark.sql.hive.metastore.barrierPrefixes |
(空白) |
一個以逗號分隔的類別前綴清單,應針對 Spark SQL 進行通訊的每個 Hive 版本明確重新載入。例如,在通常會共用的前綴中宣告的 Hive UDF(例如 |
1.4.0 |