ML 調整:模型選擇和超參數調整

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

本節說明如何使用 MLlib 的工具調整 ML 演算法和管線。內建的交叉驗證和其他工具可讓使用者最佳化演算法和管線中的超參數。

目錄

模型選擇(又稱超參數調整)

ML 中的一項重要任務是模型選擇,或使用資料找出特定任務的最佳模型或參數。這也稱為調整。調整可針對個別 Estimator(例如 LogisticRegression)或包含多個演算法、特徵化和其他步驟的完整 Pipeline 執行。使用者可以一次調整整個 Pipeline,而不是個別調整 Pipeline 中的每個元素。

MLlib 使用 CrossValidatorTrainValidationSplit 等工具支援模型選擇。這些工具需要下列項目

在高層級中,這些模型選擇工具的運作方式如下

對於回歸問題,Evaluator 可以是 RegressionEvaluator;對於二元資料,可以是 BinaryClassificationEvaluator;對於多類問題,可以是 MulticlassClassificationEvaluator;對於多標籤分類,可以是 MultilabelClassificationEvaluator;對於排名問題,可以是 RankingEvaluator。用於選取最佳 ParamMap 的預設指標可以由這些評估器中的每個評估器的 setMetricName 方法覆寫。

為了協助建構參數網格,使用者可以使用 ParamGridBuilder 工具程式。預設情況下,參數網格中的參數組會以序列方式評估。可以在使用 CrossValidatorTrainValidationSplit 執行模型選取前,設定 parallelism 的值為 2 或以上(1 的值會是序列方式)來平行執行參數評估。應該仔細選取 parallelism 的值,以最大化平行處理而不會超過叢集資源,而較大的值並不總是會導致效能提升。一般來說,對於大多數叢集而言,最高 10 的值就已足夠。

交叉驗證

CrossValidator 首先將資料集分割成一組摺疊,這些摺疊用作個別的訓練和測試資料集。例如,使用 $k=3$ 摺疊,CrossValidator 會產生 3 組(訓練、測試)資料集配對,每個配對都使用 2/3 的資料進行訓練,並使用 1/3 進行測試。為了評估特定 ParamMapCrossValidator 會計算 3 個 Model 的平均評估指標,這些 Model 是透過在 3 個不同的(訓練、測試)資料集配對上擬合 Estimator 而產生的。

在找出最佳 ParamMap 後,CrossValidator 最後使用最佳 ParamMap 和整個資料集重新調整 Estimator

範例:透過交叉驗證進行模型選擇

下列範例示範如何使用 CrossValidator 從參數網格中進行選擇。

請注意,針對參數網格進行交叉驗證的成本很高。例如,在下列範例中,參數網格有 3 個 hashingTF.numFeatures 值和 2 個 lr.regParam 值,而 CrossValidator 使用 2 個區塊。這會相乘為 $(3 \times 2) \times 2 = 12$ 個不同的模型進行訓練。在實際設定中,嘗試更多參數和使用更多區塊是很常見的($k=3$$k=10$ 很常見)。換句話說,使用 CrossValidator 的成本可能很高。然而,這也是一種選擇參數的公認方法,在統計上比試探法手動調整更可靠。

請參閱 CrossValidator Python 文件,以取得有關 API 的更多詳細資訊。

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)
在 Spark 回存區中,於「examples/src/main/python/ml/cross_validator.py」中尋找完整的範例程式碼。

請參閱 CrossValidator Scala 文件,以取得有關 API 的詳細資訊。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.Row

// Prepare training data from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0),
  (4L, "b spark who", 1.0),
  (5L, "g d a y", 0.0),
  (6L, "spark fly", 1.0),
  (7L, "was mapreduce", 0.0),
  (8L, "e spark program", 1.0),
  (9L, "a e c l", 0.0),
  (10L, "spark compile", 1.0),
  (11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
val paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .build()

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)  // Use 3+ in practice
  .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel

// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
在 Spark 回存區中,於「examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala」中尋找完整的範例程式碼。

請參閱 CrossValidator Java 文件,以取得有關 API 的詳細資訊。

import java.util.Arrays;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
  new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
  new JavaLabeledDocument(1L, "b d", 0.0),
  new JavaLabeledDocument(2L,"spark f g h", 1.0),
  new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0),
  new JavaLabeledDocument(4L, "b spark who", 1.0),
  new JavaLabeledDocument(5L, "g d a y", 0.0),
  new JavaLabeledDocument(6L, "spark fly", 1.0),
  new JavaLabeledDocument(7L, "was mapreduce", 0.0),
  new JavaLabeledDocument(8L, "e spark program", 1.0),
  new JavaLabeledDocument(9L, "a e c l", 0.0),
  new JavaLabeledDocument(10L, "spark compile", 1.0),
  new JavaLabeledDocument(11L, "hadoop software", 0.0)
), JavaLabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
ParamMap[] paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures(), new int[] {10, 100, 1000})
  .addGrid(lr.regParam(), new double[] {0.1, 0.01})
  .build();

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
CrossValidator cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)  // Use 3+ in practice
  .setParallelism(2);  // Evaluate up to 2 parameter settings in parallel

// Run cross-validation, and choose the best set of parameters.
CrossValidatorModel cvModel = cv.fit(training);

// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
  new JavaDocument(4L, "spark i j k"),
  new JavaDocument(5L, "l m n"),
  new JavaDocument(6L, "mapreduce spark"),
  new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
Dataset<Row> predictions = cvModel.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
在 Spark 回存區中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java」中尋找完整的範例程式碼。

訓練驗證分割

除了 CrossValidator 之外,Spark 也提供 TrainValidationSplit 來進行超參數調整。與 CrossValidatork 次的情況下,TrainValidationSplit 只會評估每個參數組合一次。因此,成本較低,但當訓練資料集不夠大的時候,產生的結果不可靠。

CrossValidator 不同,TrainValidationSplit 會建立一個單一的(訓練、測試)資料集配對。它使用 trainRatio 參數將資料集分割成這兩個部分。例如,使用 $trainRatio=0.75$TrainValidationSplit 會產生一個訓練和測試資料集配對,其中 75% 的資料用於訓練,25% 用於驗證。

CrossValidator 相同,TrainValidationSplit 最後使用最佳 ParamMap 和整個資料集調整 Estimator

範例:透過訓練驗證分割進行模型選擇

請參閱 TrainValidationSplit Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.9, 0.1], seed=12345)

lr = LinearRegression(maxIter=10)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()
在 Spark 回存區中,於「examples/src/main/python/ml/train_validation_split.py」中尋找完整的範例程式碼。

請參閱 TrainValidationSplit Scala 文件,以取得 API 詳細資料。

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

// Prepare training and test data.
val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)

val lr = new LinearRegression()
    .setMaxIter(10)

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .addGrid(lr.fitIntercept)
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
val trainValidationSplit = new TrainValidationSplit()
  .setEstimator(lr)
  .setEvaluator(new RegressionEvaluator)
  .setEstimatorParamMaps(paramGrid)
  // 80% of the data will be used for training and the remaining 20% for validation.
  .setTrainRatio(0.8)
  // Evaluate up to 2 parameter settings in parallel
  .setParallelism(2)

// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)

// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
  .select("features", "label", "prediction")
  .show()
在 Spark 回存區中,於「examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala」中尋找完整的範例程式碼。

請參閱 TrainValidationSplit Java 文件,以取得 API 詳細資料。

import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

// Prepare training and test data.
Dataset<Row>[] splits = data.randomSplit(new double[] {0.9, 0.1}, 12345);
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

LinearRegression lr = new LinearRegression();

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
ParamMap[] paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam(), new double[] {0.1, 0.01})
  .addGrid(lr.fitIntercept())
  .addGrid(lr.elasticNetParam(), new double[] {0.0, 0.5, 1.0})
  .build();

// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
  .setEstimator(lr)
  .setEvaluator(new RegressionEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8)  // 80% for training and the remaining 20% for validation
  .setParallelism(2);  // Evaluate up to 2 parameter settings in parallel

// Run train validation split, and choose the best set of parameters.
TrainValidationSplitModel model = trainValidationSplit.fit(training);

// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
  .select("features", "label", "prediction")
  .show();
在 Spark 回存區中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java」中尋找完整的範例程式碼。