機器學習管線
\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]
在本節中,我們將介紹 機器學習管線 的概念。機器學習管線提供一組統一的高階 API,建構在 資料框 之上,協助使用者建立和調整實用的機器學習管線。
目錄
管線中的主要概念
MLlib 標準化機器學習演算法的 API,讓使用者可以更輕鬆地將多個演算法結合到單一管線或工作流程中。本節涵蓋管線 API 引入的主要概念,其中管線概念主要受到 scikit-learn 專案的啟發。
-
資料框
:此機器學習 API 使用 Spark SQL 中的資料框
作為機器學習資料集,其中可以儲存各種資料類型。例如,資料框
可以有不同的欄位,儲存文字、特徵向量、真實標籤和預測。 -
轉換器
:轉換器
是一種演算法,可以將一個資料框
轉換成另一個資料框
。例如,機器學習模型是一種轉換器
,可以將具有特徵的資料框
轉換成具有預測的資料框
。 -
估計器
:估計器
是一種演算法,可以根據資料框
進行擬合,以產生轉換器
。例如,學習演算法是一種估計器
,可以根據資料框
進行訓練並產生模型。 -
管線
:管線
將多個轉換器
和估計器
串連在一起,以指定機器學習工作流程。 -
參數
:所有Transformer
和Estimator
現在共用一個用於指定參數的通用 API。
資料框
機器學習可套用於各種資料類型,例如向量、文字、影像和結構化資料。此 API 採用 Spark SQL 中的 DataFrame
以支援各種資料類型。
DataFrame
支援許多基本和結構化類型;請參閱 Spark SQL 資料類型參考 以取得支援類型清單。除了 Spark SQL 指南中所列的類型外,DataFrame
還可以使用 ML Vector
類型。
可以從一般 RDD
隱式或明確建立 DataFrame
。請參閱下列程式碼範例和 Spark SQL 程式設計指南 以取得範例。
DataFrame
中的欄位有名稱。下列程式碼範例使用「text」、「features」和「label」等名稱。
管線元件
轉換器
Transformer
是一個抽象概念,包含特徵轉換器和已學習模型。技術上來說,Transformer
會實作一個方法 transform()
,將一個 DataFrame
轉換成另一個,通常是透過附加一個或多個欄位。例如
- 特徵轉換器可能會採用一個
DataFrame
,讀取一個欄位(例如文字),將其對應到一個新欄位(例如特徵向量),並輸出一個附有對應欄位的新DataFrame
。 - 學習模型可能會採用一個
DataFrame
,讀取包含特徵向量的欄位,預測每個特徵向量的標籤,並輸出一個附有預測標籤的新DataFrame
。
估計器
Estimator
抽象了學習演算法或任何針對資料進行擬合或訓練的演算法的概念。技術上來說,Estimator
會實作一個方法 fit()
,接受一個 DataFrame
,並產生一個 Model
,這是一個 Transformer
。例如,LogisticRegression
等學習演算法是一個 Estimator
,而呼叫 fit()
會訓練一個 LogisticRegressionModel
,這是一個 Model
,因此也是一個 Transformer
。
管線元件的屬性
Transformer.transform()
和 Estimator.fit()
都是無狀態的。未來,可能會透過其他概念來支援有狀態演算法。
每個 Transformer
或 Estimator
的執行個體都有獨特的 ID,這在指定參數時很有用(如下所述)。
管線
在機器學習中,執行一系列演算法來處理資料並從中學習是很常見的。例如,一個簡單的文字文件處理工作流程可能包含幾個階段:
- 將每個文件的文字分割成單字。
- 將每個文件的單字轉換成數值特徵向量。
- 使用特徵向量和標籤學習預測模型。
MLlib 將此類工作流程表示為 Pipeline
,它包含一系列 PipelineStage
(Transformer
和 Estimator
),這些階段會以特定順序執行。我們將在本節中使用這個簡單的工作流程作為執行範例。
運作方式
Pipeline
被指定為一系列階段,每個階段都是 Transformer
或 Estimator
。這些階段會依序執行,輸入 DataFrame
會在通過每個階段時進行轉換。對於 Transformer
階段,會在 DataFrame
上呼叫 transform()
方法。對於 Estimator
階段,會呼叫 fit()
方法來產生 Transformer
(它會成為 PipelineModel
或已擬合的 Pipeline
的一部分),並且會在 DataFrame
上呼叫該 Transformer
的 transform()
方法。
我們以簡單的文字文件工作流程來說明這一點。下圖是 Pipeline
的訓練時間用法。
上方,最上面一列代表一個包含三個階段的 Pipeline
。前兩個(Tokenizer
和 HashingTF
)是 Transformer
(藍色),而第三個(LogisticRegression
)是 Estimator
(紅色)。最下面一列代表資料流經管線,其中圓柱體表示 DataFrame
。在原始 DataFrame
(其中包含原始文字文件和標籤)上呼叫 Pipeline.fit()
方法。 Tokenizer.transform()
方法會將原始文字文件分割成單字,並將一個包含單字的新欄位新增到 DataFrame
。 HashingTF.transform()
方法會將單字欄位轉換成特徵向量,並將一個包含這些向量的欄位新增到 DataFrame
。現在,由於 LogisticRegression
是 Estimator
,因此 Pipeline
會先呼叫 LogisticRegression.fit()
以產生 LogisticRegressionModel
。如果 Pipeline
有更多 Estimator
,它會在將 DataFrame
傳遞到下一個階段之前,在 DataFrame
上呼叫 LogisticRegressionModel
的 transform()
方法。
Pipeline
是 Estimator
。因此,在 Pipeline
的 fit()
方法執行後,它會產生 PipelineModel
,而 PipelineModel
是 Transformer
。此 PipelineModel
會在測試時間使用;下方的圖示說明此用法。
在上方圖示中,PipelineModel
的階段數目與原始 Pipeline
相同,但原始 Pipeline
中的所有 Estimator
都已變成 Transformer
。當在測試資料集上呼叫 PipelineModel
的 transform()
方法時,資料會依序傳遞到已配適的管線中。每個階段的 transform()
方法會更新資料集,並將其傳遞到下一個階段。
Pipeline
和 PipelineModel
可協助確保訓練資料和測試資料會經歷相同的特徵處理步驟。
詳細資訊
DAG 管道
:管道
的階段指定為已排序陣列。這裡提供的範例都是線性管道
,亦即每個階段使用前一階段產生資料的管道
。只要資料流程圖形成有向無環圖 (DAG),就可以建立非線性管道
。目前此圖表是根據每個階段的輸入和輸出欄位名稱(通常指定為參數)隱含指定。如果管道
形成 DAG,則必須依拓撲順序指定階段。
執行時間檢查:由於管道
可以在具有不同類型的資料框
上運作,因此無法使用編譯時間類型檢查。管道
和管道模型
會在實際執行管道
之前進行執行時間檢查。此類型檢查使用資料框
架構執行,這是資料框
中欄位資料類型的描述。
唯一的管道階段:管道
的階段應該是唯一的執行個體。例如,不應將同一個執行個體myHashingTF
插入管道
兩次,因為管道
階段必須具有唯一的 ID。但是,不同的執行個體myHashingTF1
和myHashingTF2
(類型均為HashingTF
)可以放入同一個管道
,因為不同的執行個體會以不同的 ID 建立。
參數
MLlib 估計器
和轉換器
使用統一的 API 來指定參數。
參數
是具有獨立說明文件的名稱參數。參數對應
是一組(參數、值)配對。
有兩種主要方式可以將參數傳遞給演算法
- 設定執行個體的參數。例如,如果
lr
是LogisticRegression
的執行個體,可以呼叫lr.setMaxIter(10)
,讓lr.fit()
最多使用 10 次反覆運算。此 API 類似於spark.mllib
套件中使用的 API。 - 傳遞
ParamMap
給fit()
或transform()
。在ParamMap
中的任何參數都會覆寫先前透過 setter 方法指定的參數。
參數屬於 Estimator
和 Transformer
的特定執行個體。例如,如果我們有兩個 LogisticRegression
執行個體 lr1
和 lr2
,那麼我們可以使用兩個 maxIter
參數建立 ParamMap
:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)
。如果 Pipeline
中有兩個演算法使用 maxIter
參數,這將很有用。
機器學習持久性:儲存和載入管線
通常值得將模型或管道儲存到磁碟,以供以後使用。在 Spark 1.6 中,將模型匯入/匯出功能新增到 Pipeline API。自 Spark 2.3 起,spark.ml
和 pyspark.ml
中的 DataFrame-based API 已涵蓋所有內容。
ML 持久性適用於 Scala、Java 和 Python。但是,R 目前使用修改後的格式,因此在 R 中儲存的模型只能在 R 中載入回來;這應該會在未來修復,並在 SPARK-15572 中進行追蹤。
機器學習持久性的向下相容性
一般來說,MLlib 會維護 ML 持久性的向下相容性。也就是說,如果您在 Spark 的一個版本中儲存 ML 模型或 Pipeline,那麼您應該能夠在 Spark 的未來版本中載入並使用它。但是,有少數例外,如下所述。
模型持久性:在 Spark 版本 X 中使用 Apache Spark ML 持久性儲存的模型或 Pipeline 是否可以在 Spark 版本 Y 中載入?
- 主要版本:沒有保證,但會盡力而為。
- 次要版本和修補程式版本:可以;這些版本向下相容。
- 關於格式的注意事項:沒有保證可以提供穩定的持久性格式,但模型載入本身設計為向下相容。
模型行為:Spark 版本 X 中的模型或 Pipeline 是否在 Spark 版本 Y 中表現相同?
- 主要版本:沒有保證,但會盡力而為。
- 次要版本和修補程式版本:除了錯誤修正之外,行為相同。
對於模型持久性和模型行為,任何次要版本或修補程式的重大變更都會在 Spark 版本的版本說明中報告。如果版本說明中未報告中斷,則應將其視為要修復的錯誤。
程式碼範例
本節提供程式碼範例,說明上面討論的功能。如需更多資訊,請參閱 API 文件 (Scala、Java 和 Python)。
範例:估計器、轉換器和參數
此範例涵蓋 Estimator
、Transformer
和 Param
的概念。
請參閱 Estimator
Python 文件、Transformer
Python 文件 和 Params
Python 文件,以取得更多 API 詳細資料。
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # type: ignore
# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2) # type: ignore
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())
# Prepare test data
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
.collect()
for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
請參閱 Estimator
Scala 文件、Transformer
Scala 文件 和 Params
Scala 文件,以取得 API 詳細資料。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")
// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
請參閱 Estimator
Java 文件、Transformer
Java 文件 和 Params
Java 文件,以取得 API 詳細資料。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Prepare training data.
List<Row> dataTraining = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);
// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
.put(lr.maxIter().w(20)) // Specify 1 Param.
.put(lr.maxIter(), 30) // This overwrites the original maxIter.
.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
.put(lr.probabilityCol().w("myProbability")); // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
// Prepare test documents.
List<Row> dataTest = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
範例:管線
此範例遵循上圖所示的簡單文字文件 Pipeline
。
請參閱 Pipeline
Python 文件,以取得更多 API 詳細資料。
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print(
"(%d, %s) --> prob=%s, prediction=%f" % (
rid, text, str(prob), prediction # type: ignore
)
)
請參閱 Pipeline
Scala 文件,以取得 API 詳細資料。
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
請參閱 Pipeline
Java 文件,以取得 API 詳細資料。
import java.util.Arrays;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
new JavaLabeledDocument(1L, "b d", 0.0),
new JavaLabeledDocument(2L, "spark f g h", 1.0),
new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);
// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
new JavaDocument(4L, "spark i j k"),
new JavaDocument(5L, "l m n"),
new JavaDocument(6L, "spark hadoop spark"),
new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);
// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
模型選擇(超參數調整)
使用 ML 管線的一大好處是超參數最佳化。請參閱 ML 調整指南,以取得有關自動模型選擇的更多資訊。