入門
起點:SparkSession
Spark 中所有功能的進入點是 SparkSession
類別。若要建立基本 SparkSession
,只要使用 SparkSession.builder
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Spark 中所有功能的進入點是 SparkSession
類別。若要建立基本 SparkSession
,只要使用 SparkSession.builder()
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
Spark 中所有功能的進入點是 SparkSession
類別。若要建立基本 SparkSession
,只要使用 SparkSession.builder()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Spark 中所有功能的進入點是 SparkSession
類別。若要初始化基本 SparkSession
,只要呼叫 sparkR.session()
sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
請注意,sparkR.session()
在第一次呼叫時,會初始化全域 SparkSession
單例執行個體,並在後續呼叫時,總是傳回這個執行個體的參考。如此一來,使用者只需初始化 SparkSession
一次,然後 read.df
等 SparkR 函數就能隱式存取這個全域執行個體,而使用者不需要傳遞 SparkSession
執行個體。
Spark 2.0 中的 SparkSession
提供內建支援,支援 Hive 功能,包括使用 HiveQL 編寫查詢、存取 Hive UDF,以及從 Hive 表格讀取資料的能力。若要使用這些功能,您不需要現有的 Hive 設定。
建立資料框
有了 SparkSession
,應用程式可以從 現有的 RDD
、Hive 表格或 Spark 資料來源建立資料框。
例如,以下會根據 JSON 檔案的內容建立 DataFrame
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
有了 SparkSession
,應用程式可以從 現有的 RDD
、Hive 表格或 Spark 資料來源建立資料框。
例如,以下會根據 JSON 檔案的內容建立 DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
有了 SparkSession
,應用程式可以從 現有的 RDD
、Hive 表格或 Spark 資料來源建立資料框。
例如,以下會根據 JSON 檔案的內容建立 DataFrame
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
使用 SparkSession
,應用程式可以從本機 R data.frame、Hive 表格或 Spark 資料來源 建立 DataFrame。
例如,以下會根據 JSON 檔案的內容建立 DataFrame
df <- read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame
head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
## +----+-------+
## | age| name|
## +----+-------+
## |null|Michael|
## | 30| Andy|
## | 19| Justin|
## +----+-------+
未輸入資料集操作(又稱為資料框操作)
DataFrame 提供特定於網域的語言,用於在 Scala、Java、Python 和 R 中執行結構化資料處理。
如上所述,在 Spark 2.0 中,DataFrame 只是 Scala 和 Java API 中 Row
的 Dataset。這些作業也稱為「非類型轉換」,與具有強類型 Scala/Java Dataset 的「類型轉換」相反。
以下包含使用 Dataset 執行一些結構化資料處理的基本範例
在 Python 中,可以透過屬性 (df.age
) 或索引 (df['age']
) 來存取 DataFrame 的欄位。雖然前者方便於互動式資料探索,但強烈建議使用者使用後者,因為後者具有未來性,而且不會因欄位名稱也是 DataFrame 類別上的屬性而中斷。
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+
# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
如需可對 DataFrame 執行的作業類型完整清單,請參閱 API 文件。
除了簡單的欄位參考和運算式之外,DataFrame 還具有豐富的函式庫,包括字串處理、日期算術、常見數學運算等等。完整清單可在 DataFrame 函式參考 中取得。
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
如需可對 Dataset 執行的作業類型完整清單,請參閱 API 文件。
除了簡單的欄位參考和運算式之外,Dataset 還具有豐富的函式庫,包括字串處理、日期算術、常見數學運算等等。完整清單可在 DataFrame 函式參考 中取得。
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
如需可對 Dataset 執行的作業類型完整清單,請參閱 API 文件。
除了簡單的欄位參考和運算式之外,Dataset 還具有豐富的函式庫,包括字串處理、日期算術、常見數學運算等等。完整清單可在 DataFrame 函式參考 中取得。
# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame
head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Select only the "name" column
head(select(df, "name"))
## name
## 1 Michael
## 2 Andy
## 3 Justin
# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
## name (age + 1.0)
## 1 Michael NA
## 2 Andy 31
## 3 Justin 20
# Select people older than 21
head(where(df, df$age > 21))
## age name
## 1 30 Andy
# Count people by age
head(count(groupBy(df, "age")))
## age count
## 1 19 1
## 2 NA 1
## 3 30 1
有關可以在 DataFrame 上執行的作業類型完整清單,請參閱 API 文件。
除了簡單的欄位參照和運算式外,DataFrame 還有豐富的函式庫,包括字串處理、日期運算、常見數學運算等。完整清單可在 DataFrame 函式參考 中取得。
以程式方式執行 SQL 查詢
SparkSession
上的 sql
函式讓應用程式能夠以程式化方式執行 SQL 查詢,並將結果傳回為 DataFrame
。
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
SparkSession
上的 sql
函式讓應用程式能夠以程式化方式執行 SQL 查詢,並將結果傳回為 DataFrame
。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
SparkSession
上的 sql
函式讓應用程式能夠以程式化方式執行 SQL 查詢,並將結果傳回為 Dataset<Row>
。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
sql
函式讓應用程式能夠以程式化方式執行 SQL 查詢,並將結果傳回為 SparkDataFrame
。
df <- sql("SELECT * FROM table")
全域暫時檢視
Spark SQL 中的暫時檢視是會話範圍,如果建立它的會話終止,它們就會消失。如果您想要一個在所有會話之間共用且在 Spark 應用程式終止之前持續存在的暫時檢視,您可以建立一個全域暫時檢視。全域暫時檢視繫結到系統保留的資料庫 global_temp
,我們必須使用限定名稱來參照它,例如 SELECT * FROM global_temp.view1
。
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl
SELECT * FROM global_temp.temp_view
建立資料集
Dataset 類似於 RDD,但是,它們不會使用 Java 序列化或 Kryo,而是使用專門的 編碼器 來序列化物件以進行處理或透過網路傳輸。雖然編碼器和標準序列化都負責將物件轉換為位元組,但編碼器會動態產生程式碼,並使用一種格式,讓 Spark 能夠執行許多作業,例如過濾、排序和雜湊,而無需將位元組反序列化回物件。
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private long age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getAge() {
return age;
}
public void setAge(long age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Long> longEncoder = Encoders.LONG();
Dataset<Long> primitiveDS = spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);
Dataset<Long> transformedDS = primitiveDS.map(
(MapFunction<Long, Long>) value -> value + 1L,
longEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
與 RDD 互用
Spark SQL 支援兩種不同的方法來將現有的 RDD 轉換為 Dataset。第一種方法使用反射來推斷包含特定物件類型的 RDD 架構。這種基於反射的方法會產生更簡潔的程式碼,並且在撰寫 Spark 應用程式時已經知道架構時非常有用。
建立 Dataset 的第二種方法是透過程式化介面,讓您可以建構架構,然後將其套用至現有的 RDD。雖然這種方法較為冗長,但它允許您在執行階段之前不知道欄位及其類型時建構 Dataset。
使用反射推論架構
Spark SQL 可以將 Row 物件的 RDD 轉換為 DataFrame,並推斷資料類型。Rows 是透過將關鍵字/值對清單傳遞給 Row 類別的 kwargs 來建構的。此清單的鍵定義了表格的欄位名稱,而類型則透過抽取整個資料集來推斷,類似於對 JSON 檔案執行的推斷。
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
Spark SQL 的 Scala 介面支援自動將包含案例類別的 RDD 轉換為 DataFrame。案例類別定義了表格的結構。案例類別的引數名稱會使用反射來讀取,並成為欄位名稱。案例類別也可以是巢狀的,或包含複雜類型,例如 Seq
或 Array
。此 RDD 可以隱式轉換為 DataFrame,然後註冊為表格。表格可以在後續的 SQL 陳述式中使用。
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
Spark SQL 支援自動將 JavaBeans 的 RDD 轉換為 DataFrame。使用反射取得的 BeanInfo
定義了表格的結構。目前,Spark SQL 不支援包含 Map
欄位的 JavaBeans。不過,巢狀 JavaBeans 和 List
或 Array
欄位則獲得支援。您可以透過建立一個實作 Serializable 並為其所有欄位提供 getter 和 setter 的類別來建立 JavaBean。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
以程式方式指定架構
如果無法預先定義 kwargs 的字典(例如,記錄的結構編碼在字串中,或將剖析文字資料集,並且欄位會針對不同的使用者以不同的方式投影),則可以透過三個步驟以程式設計方式建立 DataFrame
。
- 從原始 RDD 建立元組或清單的 RDD;
- 建立由
StructType
表示的結構,其與步驟 1 中建立的 RDD 中的元組或清單結構相符。 - 透過
SparkSession
提供的createDataFrame
方法將結構套用至 RDD。
例如
# Import data types
from pyspark.sql.types import StringType, StructType, StructField
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
當案例類別無法事先定義(例如,記錄的結構編碼在字串中,或文字資料集將被剖析,且欄位將針對不同使用者以不同的方式投影),則可以使用三個步驟以程式設計方式建立 DataFrame
。
- 從原始 RDD 建立
Row
的 RDD; - 建立
StructType
所表示的架構,以符合步驟 1 中建立的 RDD 中Row
的結構。 - 透過
SparkSession
所提供的createDataFrame
方法,將架構套用至Row
的 RDD。
例如
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
當 JavaBean 類別無法事先定義(例如,記錄的結構編碼在字串中,或文字資料集將被剖析,且欄位將針對不同使用者以不同的方式投影),則可以使用三個步驟以程式設計方式建立 Dataset<Row>
。
- 從原始 RDD 建立
Row
的 RDD; - 建立
StructType
所表示的架構,以符合步驟 1 中建立的 RDD 中Row
的結構。 - 透過
SparkSession
所提供的createDataFrame
方法,將架構套用至Row
的 RDD。
例如
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
純量函數
純量函數是每列傳回單一值的函數,與聚合函數相反,聚合函數會傳回群組列的值。Spark SQL 支援各種 內建純量函數。它也支援 使用者定義純量函數。
聚合函數
聚合函數是針對群組列傳回單一值的函數。內建聚合函數 提供常見的聚合,例如 count()
、count_distinct()
、avg()
、max()
、min()
等。使用者不受限於預先定義的聚合函數,且可以建立自己的函數。如需使用者定義聚合函數的更多詳細資料,請參閱 使用者定義聚合函數 的文件。