最佳化 - 基於 RDD 的 API

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

數學描述

梯度下降

解決 $\min_{\wv \in\R^d} \; f(\wv)$ 形式最佳化問題的最簡單方法是 梯度下降。此類一階最佳化方法(包括梯度下降及其隨機變體)非常適合大規模和分布式運算。

梯度下降方法旨在透過反覆朝向最陡下降方向採取步驟,找出函數的局部最小值,而最陡下降方向是函數在當前點(即當前參數值)的導數(稱為 梯度)的負值。如果目標函數 $f$ 在所有參數中不可微分,但仍為凸函數,則次梯度是梯度的自然概化,並假設為步驟方向的角色。無論如何,計算 $f$ 的梯度或次梯度都是昂貴的,它需要完整遍歷整個資料集,才能計算所有損失項的貢獻。

隨機梯度下降(SGD)

目標函數 $f$ 寫成總和的最佳化問題特別適合使用隨機梯度下降(SGD)來解決。在我們的案例中,對於 監督式機器學習 中常用的最佳化公式,\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation} 這是特別自然的,因為損失寫成來自每個資料點的個別損失的平均值。

隨機次梯度是向量的隨機選擇,如此一來,在期望中,我們取得原始目標函數的真實次梯度。均勻隨機挑選一個資料點 $i\in[1..n]$,我們取得 $\eqref{eq:regPrimal}$ 的隨機次梯度,相對於 $\wv$ 如下:\[ f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ , \] 其中 $L'_{\wv,i} \in \R^d$ 是由第 $i$ 個資料點所決定的損失函數部分的次梯度,也就是 $L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$。此外,$R'_\wv$ 是正則化項 $R(\wv)$ 的次梯度,亦即 $R'_\wv \in \frac{\partial}{\partial \wv} R(\wv)$。項目 $R'_\wv$ 不依賴於挑選哪一個隨機資料點。顯然地,在隨機選擇 $i\in[1..n]$ 的期望中,我們有 $f'_{\wv,i}$ 是原始目標 $f$ 的次梯度,表示 $\E\left[f'_{\wv,i}\right] \in \frac{\partial}{\partial \wv} f(\wv)$

現在執行 SGD 只會變成朝著負隨機次梯度 $f'_{\wv,i}$ 的方向前進,也就是 \begin{equation}\label{eq:SGDupdate} \wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ . \end{equation} 步長。 參數 $\gamma$ 是步長,在預設實作中,選擇隨著反覆運算計數器的平方根遞減,也就是 $\gamma := \frac{s}{\sqrt{t}}$ 在第 $t$ 次反覆運算中,輸入參數 $s=$ stepSize。請注意,在實務上,為 SGD 方法選擇最佳步長通常很微妙,而且是積極研究的主題。

梯度。spark.mllib 中實作的機器學習方法的(次)梯度表格,可在 分類與回歸 區段取得。

鄰近更新。作為僅在步驟方向中使用正則化項的次梯度 $R'(\wv)$ 的替代方案,在某些情況下,可以使用鄰近算子來獲得改進的更新。對於 L1 正則化項,鄰近算子由軟閾值化給出,如在 L1Updater 中實作。

分布式 SGD 的更新方案

GradientDescent 中的 SGD 實作使用資料範例的簡單(分布式)抽樣。我們回顧一下最佳化問題 $\eqref{eq:regPrimal}$ 的損失部分為 $\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$,因此 $\frac1n \sum_{i=1}^n L'_{\wv,i}$ 將會是真正的(次)梯度。由於這需要存取完整的資料集,因此參數 miniBatchFraction 指定要使用完整資料的哪個部分。此子集上梯度的平均值,即 \[ \frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ , \] 是隨機梯度。其中 $S$ 是大小為 $|S|=$ miniBatchFraction $\cdot n$ 的抽樣子集。

在每個反覆運算中,分布式資料集 (RDD) 上的抽樣,以及來自每個工作機器的部分結果的總和的計算是由標準 Spark 常式執行的。

如果點 miniBatchFraction 的部分設定為 1(預設值),則每個反覆運算中產生的步驟將是精確的(次)梯度下降。在這種情況下,沒有隨機性,也沒有所使用步驟方向中的變異。在另一個極端情況下,如果 miniBatchFraction 選擇得很小,以至於只抽樣一個點,即 $|S|=$ miniBatchFraction $\cdot n = 1$,則該演算法等同於標準 SGD。在這種情況下,步驟方向取決於點的均勻隨機抽樣。

有限記憶 BFGS(L-BFGS)

L-BFGS 是一種最佳化演算法,屬於擬牛頓法,用於解決下列形式的最佳化問題 $\min_{\wv \in\R^d} \; f(\wv)$。L-BFGS 方法在局部將目標函數近似為二次函數,而不會評估目標函數的二階偏微分,以建構海森矩陣。海森矩陣會以先前的梯度評估值進行近似,因此在牛頓法中明確計算海森矩陣時,不會有垂直擴充性的問題(訓練特徵數)。因此,與其他一階最佳化方法相比,L-BFGS 通常能更快速收斂。

選擇最佳化方法

線性方法 在內部使用最佳化,而 spark.mllib 中的某些線性方法支援 SGD 和 L-BFGS。不同的最佳化方法可能會有不同的收斂保證,具體取決於目標函數的屬性,我們在此無法涵蓋所有文獻。一般而言,當 L-BFGS 可用時,我們建議使用它,而不是 SGD,因為 L-BFGS 往往能更快收斂(反覆運算次數較少)。

在 MLlib 中實作

梯度下降與隨機梯度下降

梯度下降法,包括隨機次梯度下降法 (SGD) 作為 MLlib 中的低階基本函數,各種 ML 演算法都是在此基礎上開發的,例如請參閱線性方法部分。

SGD 類別 GradientDescent 設定下列參數

L-BFGS

L-BFGS 目前僅為 MLlib 中的低階最佳化基本函數。如果您想在各種 ML 演算法中使用 L-BFGS,例如線性迴歸和邏輯迴歸,您必須傳遞目標函數的梯度和更新器到最佳化器,而不是使用 LogisticRegressionWithSGD 等訓練 API。請參閱以下範例。此問題將在下次版本中解決。

使用 L1Updater 進行的 L1 正則化無法運作,因為 L1Updater 中的軟閾值邏輯是針對梯度下降而設計的。請參閱開發人員注意事項。

L-BFGS 方法 LBFGS.runLBFGS 具有下列參數

return 是包含兩個元素的元組。第一個元素是包含每個特徵權重的列矩陣,第二個元素是包含每個反覆運算所計算損失的陣列。

以下是使用 L-BFGS 最佳化器訓練二元邏輯迴歸與 L2 正規化的範例。

請參閱 LBFGS Scala 文件SquaredL2Updater Scala 文件 以取得 API 詳細資料。

import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

// Append 1 into the training data as intercept.
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()

val test = splits(1)

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)

val model = new LogisticRegressionModel(
  Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
  weightsWithIntercept(weightsWithIntercept.size - 1))

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Loss of each step in training process")
loss.foreach(println)
println(s"Area under ROC = $auROC")
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/mllib/LBFGSExample.scala」中尋找完整的範例程式碼。

請參閱 LBFGS Java 文件SquaredL2Updater Java 文件 以取得 API 詳細資料。

import java.util.Arrays;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);

// Append 1 into the training data as intercept.
JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
  new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();

// Run training algorithm to build the model.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);

Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
  training.rdd(),
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();

LogisticRegressionModel model = new LogisticRegressionModel(
  Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
  (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();

System.out.println("Loss of each step in training process");
for (double l : loss) {
  System.out.println(l);
}
System.out.println("Area under ROC = " + auROC);
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java」中尋找完整的範例程式碼。

開發人員備忘

由於 Hessian 是從先前的梯度評估中近似建構的,因此目標函數無法在最佳化過程中變更。因此,隨機 L-BFGS 無法僅透過使用 miniBatch 天真地運作;因此,我們在有更深入的了解之前不會提供此功能。

Updater 是最初為梯度下降設計的類別,用於計算實際的梯度下降步驟。不過,我們能夠透過忽略僅適用於梯度下降的邏輯部分(例如自適應步長大小等),取得 L-BFGS 的目標函數梯度和損失。稍後,我們會將此重構為正規化器,以取代 updater,並將正規化和步驟更新的邏輯分開。