機器學習管線

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

在本節中,我們將介紹 機器學習管線 的概念。機器學習管線提供一組統一的高階 API,建構在 資料框 之上,協助使用者建立和調整實用的機器學習管線。

目錄

管線中的主要概念

MLlib 標準化機器學習演算法的 API,讓使用者可以更輕鬆地將多個演算法結合到單一管線或工作流程中。本節涵蓋管線 API 引入的主要概念,其中管線概念主要受到 scikit-learn 專案的啟發。

資料框

機器學習可套用於各種資料類型,例如向量、文字、影像和結構化資料。此 API 採用 Spark SQL 中的 DataFrame 以支援各種資料類型。

DataFrame 支援許多基本和結構化類型;請參閱 Spark SQL 資料類型參考 以取得支援類型清單。除了 Spark SQL 指南中所列的類型外,DataFrame 還可以使用 ML Vector 類型。

可以從一般 RDD 隱式或明確建立 DataFrame。請參閱下列程式碼範例和 Spark SQL 程式設計指南 以取得範例。

DataFrame 中的欄位有名稱。下列程式碼範例使用「text」、「features」和「label」等名稱。

管線元件

轉換器

Transformer 是一個抽象概念,包含特徵轉換器和已學習模型。技術上來說,Transformer 會實作一個方法 transform(),將一個 DataFrame 轉換成另一個,通常是透過附加一個或多個欄位。例如

估計器

Estimator 抽象了學習演算法或任何針對資料進行擬合或訓練的演算法的概念。技術上來說,Estimator 會實作一個方法 fit(),接受一個 DataFrame,並產生一個 Model,這是一個 Transformer。例如,LogisticRegression 等學習演算法是一個 Estimator,而呼叫 fit() 會訓練一個 LogisticRegressionModel,這是一個 Model,因此也是一個 Transformer

管線元件的屬性

Transformer.transform()Estimator.fit() 都是無狀態的。未來,可能會透過其他概念來支援有狀態演算法。

每個 TransformerEstimator 的執行個體都有獨特的 ID,這在指定參數時很有用(如下所述)。

管線

在機器學習中,執行一系列演算法來處理資料並從中學習是很常見的。例如,一個簡單的文字文件處理工作流程可能包含幾個階段:

MLlib 將此類工作流程表示為 Pipeline,它包含一系列 PipelineStageTransformerEstimator),這些階段會以特定順序執行。我們將在本節中使用這個簡單的工作流程作為執行範例。

運作方式

Pipeline 被指定為一系列階段,每個階段都是 TransformerEstimator。這些階段會依序執行,輸入 DataFrame 會在通過每個階段時進行轉換。對於 Transformer 階段,會在 DataFrame 上呼叫 transform() 方法。對於 Estimator 階段,會呼叫 fit() 方法來產生 Transformer(它會成為 PipelineModel 或已擬合的 Pipeline 的一部分),並且會在 DataFrame 上呼叫該 Transformertransform() 方法。

我們以簡單的文字文件工作流程來說明這一點。下圖是 Pipeline訓練時間用法。

ML Pipeline Example

上方,最上面一列代表一個包含三個階段的 Pipeline。前兩個(TokenizerHashingTF)是 Transformer(藍色),而第三個(LogisticRegression)是 Estimator(紅色)。最下面一列代表資料流經管線,其中圓柱體表示 DataFrame。在原始 DataFrame(其中包含原始文字文件和標籤)上呼叫 Pipeline.fit() 方法。 Tokenizer.transform() 方法會將原始文字文件分割成單字,並將一個包含單字的新欄位新增到 DataFrameHashingTF.transform() 方法會將單字欄位轉換成特徵向量,並將一個包含這些向量的欄位新增到 DataFrame。現在,由於 LogisticRegressionEstimator,因此 Pipeline 會先呼叫 LogisticRegression.fit() 以產生 LogisticRegressionModel。如果 Pipeline 有更多 Estimator,它會在將 DataFrame 傳遞到下一個階段之前,在 DataFrame 上呼叫 LogisticRegressionModeltransform() 方法。

PipelineEstimator。因此,在 Pipelinefit() 方法執行後,它會產生 PipelineModel,而 PipelineModelTransformer。此 PipelineModel 會在測試時間使用;下方的圖示說明此用法。

ML PipelineModel Example

在上方圖示中,PipelineModel 的階段數目與原始 Pipeline 相同,但原始 Pipeline 中的所有 Estimator 都已變成 Transformer。當在測試資料集上呼叫 PipelineModeltransform() 方法時,資料會依序傳遞到已配適的管線中。每個階段的 transform() 方法會更新資料集,並將其傳遞到下一個階段。

PipelinePipelineModel 可協助確保訓練資料和測試資料會經歷相同的特徵處理步驟。

詳細資訊

DAG 管道管道的階段指定為已排序陣列。這裡提供的範例都是線性管道,亦即每個階段使用前一階段產生資料的管道。只要資料流程圖形成有向無環圖 (DAG),就可以建立非線性管道。目前此圖表是根據每個階段的輸入和輸出欄位名稱(通常指定為參數)隱含指定。如果管道形成 DAG,則必須依拓撲順序指定階段。

執行時間檢查:由於管道可以在具有不同類型的資料框上運作,因此無法使用編譯時間類型檢查。管道管道模型會在實際執行管道之前進行執行時間檢查。此類型檢查使用資料框架構執行,這是資料框中欄位資料類型的描述。

唯一的管道階段管道的階段應該是唯一的執行個體。例如,不應將同一個執行個體myHashingTF插入管道兩次,因為管道階段必須具有唯一的 ID。但是,不同的執行個體myHashingTF1myHashingTF2(類型均為HashingTF)可以放入同一個管道,因為不同的執行個體會以不同的 ID 建立。

參數

MLlib 估計器轉換器使用統一的 API 來指定參數。

參數是具有獨立說明文件的名稱參數。參數對應是一組(參數、值)配對。

有兩種主要方式可以將參數傳遞給演算法

  1. 設定執行個體的參數。例如,如果 lrLogisticRegression 的執行個體,可以呼叫 lr.setMaxIter(10),讓 lr.fit() 最多使用 10 次反覆運算。此 API 類似於 spark.mllib 套件中使用的 API。
  2. 傳遞 ParamMapfit()transform()。在 ParamMap 中的任何參數都會覆寫先前透過 setter 方法指定的參數。

參數屬於 EstimatorTransformer 的特定執行個體。例如,如果我們有兩個 LogisticRegression 執行個體 lr1lr2,那麼我們可以使用兩個 maxIter 參數建立 ParamMapParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。如果 Pipeline 中有兩個演算法使用 maxIter 參數,這將很有用。

機器學習持久性:儲存和載入管線

通常值得將模型或管道儲存到磁碟,以供以後使用。在 Spark 1.6 中,將模型匯入/匯出功能新增到 Pipeline API。自 Spark 2.3 起,spark.mlpyspark.ml 中的 DataFrame-based API 已涵蓋所有內容。

ML 持久性適用於 Scala、Java 和 Python。但是,R 目前使用修改後的格式,因此在 R 中儲存的模型只能在 R 中載入回來;這應該會在未來修復,並在 SPARK-15572 中進行追蹤。

機器學習持久性的向下相容性

一般來說,MLlib 會維護 ML 持久性的向下相容性。也就是說,如果您在 Spark 的一個版本中儲存 ML 模型或 Pipeline,那麼您應該能夠在 Spark 的未來版本中載入並使用它。但是,有少數例外,如下所述。

模型持久性:在 Spark 版本 X 中使用 Apache Spark ML 持久性儲存的模型或 Pipeline 是否可以在 Spark 版本 Y 中載入?

模型行為:Spark 版本 X 中的模型或 Pipeline 是否在 Spark 版本 Y 中表現相同?

對於模型持久性和模型行為,任何次要版本或修補程式的重大變更都會在 Spark 版本的版本說明中報告。如果版本說明中未報告中斷,則應將其視為要修復的錯誤。

程式碼範例

本節提供程式碼範例,說明上面討論的功能。如需更多資訊,請參閱 API 文件 (ScalaJavaPython)。

範例:估計器、轉換器和參數

此範例涵蓋 EstimatorTransformerParam 的概念。

請參閱 Estimator Python 文件Transformer Python 文件Params Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore

# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))
在 Spark 儲存庫中,於「examples/src/main/python/ml/estimator_transformer_param_example.py」中找到完整的範例程式碼。

請參閱 Estimator Scala 文件Transformer Scala 文件Params Scala 文件,以取得 API 詳細資料。

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala」中找到完整的範例程式碼。

請參閱 Estimator Java 文件Transformer Java 文件Params Java 文件,以取得 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Prepare training data.
List<Row> dataTraining = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
    RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");

// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);

// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());

// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
  .put(lr.maxIter().w(20))  // Specify 1 Param.
  .put(lr.maxIter(), 30)  // This overwrites the original maxIter.
  .put(lr.regParam().w(0.1), lr.threshold().w(0.55));  // Specify multiple Params.

// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
  .put(lr.probabilityCol().w("myProbability"));  // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());

// Prepare test documents.
List<Row> dataTest = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
    RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
    RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaEstimatorTransformerParamExample.java」中找到完整的範例程式碼。

範例:管線

此範例遵循上圖所示的簡單文字文件 Pipeline

請參閱 Pipeline Python 文件,以取得更多 API 詳細資料。

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )
在 Spark 儲存庫中,於「examples/src/main/python/ml/pipeline_example.py」中找到完整的範例程式碼。

請參閱 Pipeline Scala 文件,以取得 API 詳細資料。

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala」中找到完整的範例程式碼。

請參閱 Pipeline Java 文件,以取得 API 詳細資料。

import java.util.Arrays;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
  new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
  new JavaLabeledDocument(1L, "b d", 0.0),
  new JavaLabeledDocument(2L, "spark f g h", 1.0),
  new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);

// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
  new JavaDocument(4L, "spark i j k"),
  new JavaDocument(5L, "l m n"),
  new JavaDocument(6L, "spark hadoop spark"),
  new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);

// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
在 Spark 儲存庫的「範例/src/main/java/org/apache/spark/examples/ml/JavaPipelineExample.java」中找到完整的範例程式碼。

模型選擇(超參數調整)

使用 ML 管線的一大好處是超參數最佳化。請參閱 ML 調整指南,以取得有關自動模型選擇的更多資訊。