線性方法 - 基於 RDD 的 API

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

數學公式

許多標準的機器學習方法可以表述為凸最佳化問題,也就是說,找出凸函數 $f$ 的最小值,該函數取決於變數向量 $\wv$(在程式碼中稱為 權重),其中有 $d$ 個項目。正式來說,我們可以將其寫成最佳化問題 $\min_{\wv \in\R^d} \; f(\wv)$,其中目標函數的形式為 \begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation} 在這裡,向量 $\x_i\in\R^d$ 是訓練資料範例,其中 $1\le i\le n$,而 $y_i\in\R$ 是其對應的標籤,我們想要預測這些標籤。如果 $L(\wv; \x, y)$ 可以表示為 $\wv^T x$ 和 $y$ 的函數,我們就稱這種方法為線性spark.mllib 的多種分類和迴歸演算法都屬於此類別,並在此加以探討。

目標函數 $f$ 有兩個部分:控制模型複雜度的正則化項,以及衡量模型在訓練資料上的誤差的損失。損失函數 $L(\wv;.)$ 通常是 $\wv$ 中的凸函數。固定的正則化參數 $\lambda \ge 0$(程式碼中的 regParam)定義了最小化損失(也就是訓練誤差)和最小化模型複雜度(也就是避免過度擬合)這兩個目標之間的權衡。

損失函數

下表總結了 spark.mllib 支援的方法的損失函數及其梯度或次梯度

損失函數 $L(\wv; \x, y)$梯度或次梯度
鉸鏈損失$\max \{0, 1-y \wv^T \x \}, \quad y \in \{-1, +1\}$ $\begin{cases}-y \cdot \x & \text{如果 $y \wv^T \x <1$}, \\ 0 & \text{否則}.\end{cases}$
邏輯損失$\log(1+\exp( -y \wv^T \x)), \quad y \in \{-1, +1\}$ $-y \left(1-\frac1{1+\exp(-y \wv^T \x)} \right) \cdot \x$
平方損失$\frac{1}{2} (\wv^T \x - y)^2, \quad y \in \R$ $(\wv^T \x - y) \cdot \x$

請注意,在上述數學公式中,二元標籤 $y$ 表示為 $+1$(正)或 $-1$(負),這對於公式而言較為方便。然而,在 spark.mllib 中,負標籤以 $0$ 表示,而非 $-1$,以符合多類標籤。

正則化項

正則化項 的目的是鼓勵簡單的模型並避免過度擬合。我們在 spark.mllib 中支援下列正則化項

正則化項 $R(\wv)$梯度或次梯度
零(未正則化)0$\0$
L2$\frac{1}{2}\|\wv\|_2^2$$\wv$
L1$\|\wv\|_1$$\mathrm{sign}(\wv)$
彈性網路$\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$$\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$

在此,$\mathrm{sign}(\wv)$ 是向量,包含 $\wv$ 中所有條目的符號($\pm1$)。

由於平滑,L2 正則化問題通常比 L1 正則化問題容易解決。然而,L1 正則化有助於提升權重的稀疏性,進而產生較小且較易解讀的模型,後者對於特徵選取很有用。彈性網路 結合了 L1 和 L2 正則化。不建議訓練沒有任何正則化的模型,特別是在訓練範例數量很少時。

最佳化

線性方法在幕後使用凸優化方法來最佳化目標函數。spark.mllib 使用兩種方法,SGD 和 L-BFGS,說明請參閱 最佳化區段。目前,大多數演算法 API 都支援隨機梯度下降法 (SGD),而少數支援 L-BFGS。請參閱 此最佳化區段,以取得有關在最佳化方法之間進行選擇的指南。

分類

分類旨在將項目分為類別。最常見的分類類型是二元分類,其中有兩個類別,通常稱為正類和負類。如果類別多於兩個,則稱為多類分類spark.mllib支援兩種用於分類的線性方法:線性支持向量機 (SVM) 和邏輯迴歸。線性 SVM 僅支援二元分類,而邏輯迴歸支援二元和多類分類問題。對於這兩種方法,spark.mllib支援 L1 和 L2 正則化變體。訓練資料集由 MLlib 中的標記點的 RDD 表示,其中標籤是從零開始的類別索引:$0, 1, 2, \ldots$。

線性支援向量機 (SVM)

線性 SVM是大規模分類任務的標準方法。它是一種線性方法,如上文等式$\eqref{eq:regPrimal}$中所述,其中損失函數在公式中由鉸鏈損失給出

\[ L(\wv;\x,y) := \max \{0, 1-y \wv^T \x \}. \]預設情況下,線性 SVM 使用 L2 正則化進行訓練。我們還支援替代的 L1 正則化。在這種情況下,問題變為線性規劃

線性 SVM 演算法會輸出一個 SVM 模型。對於一個新的資料點,表示為 $\x$,模型會根據 $\wv^T \x$ 的值進行預測。預設情況下,如果 $\wv^T \x \geq 0$,則結果為正類,否則為負類。

範例

以下範例顯示如何載入範例資料集、建立 SVM 模型,並使用產生的模型進行預測以計算訓練誤差。

請參閱SVMWithSGD Python 文件SVMModel Python 文件以取得有關 API 的更多詳細資訊。

from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonSVMWithSGDModel")
sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")
在 Spark 回應程式中的「範例/src/main/python/mllib/svm_with_sgd_example.py」中尋找完整的範例程式碼。

以下程式碼片段說明如何載入範例資料集,使用演算法物件中的靜態方法對此訓練資料執行訓練演算法,並使用產生的模型進行預測以計算訓練誤差。

請參閱 SVMWithSGD Scala 文件SVMModel Scala 文件 以取得 API 詳細資料。

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println(s"Area under ROC = $auROC")

// Save and load model
model.save(sc, "target/tmp/scalaSVMWithSGDModel")
val sameModel = SVMModel.load(sc, "target/tmp/scalaSVMWithSGDModel")
在 Spark 儲存庫中,請在「examples/src/main/scala/org/apache/spark/examples/mllib/SVMWithSGDExample.scala」中尋找完整的範例程式碼。

預設情況下,SVMWithSGD.train() 方法執行 L2 正規化,正規化參數設定為 1.0。如果我們想要設定此演算法,我們可以透過直接建立新物件並呼叫設定方法,進一步自訂 SVMWithSGD。所有其他 spark.mllib 演算法也支援此方式的自訂。例如,以下程式碼產生 L1 正規化的 SVM 變體,正規化參數設定為 0.1,並執行訓練演算法 200 次反覆運算。

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater)
val modelL1 = svmAlg.run(training)

MLlib 的所有方法都使用 Java 友善類型,因此您可以像在 Scala 中一樣匯入並呼叫它們。唯一的注意事項是方法採用 Scala RDD 物件,而 Spark Java API 使用單獨的 JavaRDD 類別。您可以透過在 JavaRDD 物件上呼叫 .rdd(),將 Java RDD 轉換為 Scala RDD。以下是與 Scala 中提供的範例等效的自給應用程式範例

請參閱 SVMWithSGD Java 文件SVMModel Java 文件 以取得 API 詳細資料。

import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);

// Run training algorithm to build the model.
int numIterations = 100;
SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();

System.out.println("Area under ROC = " + auROC);

// Save and load model
model.save(sc, "target/tmp/javaSVMWithSGDModel");
SVMModel sameModel = SVMModel.load(sc, "target/tmp/javaSVMWithSGDModel");
在 Spark 儲存庫中,請在「examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java」中尋找完整的範例程式碼。

預設情況下,SVMWithSGD.train() 方法執行 L2 正規化,正規化參數設定為 1.0。如果我們想要設定此演算法,我們可以透過直接建立新物件並呼叫設定方法,進一步自訂 SVMWithSGD。所有其他 spark.mllib 演算法也支援此方式的自訂。例如,以下程式碼產生 L1 正規化的 SVM 變體,正規化參數設定為 0.1,並執行訓練演算法 200 次反覆運算。

import org.apache.spark.mllib.optimization.L1Updater;

SVMWithSGD svmAlg = new SVMWithSGD();
svmAlg.optimizer()
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater());
SVMModel modelL1 = svmAlg.run(training.rdd());

若要執行上述應用程式,請遵循 Spark 快速入門指南的 自給應用程式 部分中提供的說明。請務必將 spark-mllib 也包含在您的建置檔案中,作為相依性。

邏輯迴歸

邏輯迴歸 廣泛用於預測二元回應。它是一種線性方法,如上文等式 $\eqref{eq:regPrimal}$ 中所述,損失函數在公式中由邏輯損失給出:\[ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)). \]

對於二元分類問題,演算法會輸出一個二元邏輯迴歸模型。給定一個新的資料點,表示為 $\x$,此模型會透過套用邏輯函數 \[ \mathrm{f}(z) = \frac{1}{1 + e^{-z}} \] 來進行預測,其中 $z = \wv^T \x$。預設情況下,如果 $\mathrm{f}(\wv^T x) > 0.5$,結果為正,否則為負,儘管與線性 SVM 不同,邏輯迴歸模型的原始輸出 $\mathrm{f}(z)$ 具有機率詮釋(即 $\x$ 為正的機率)。

二元邏輯迴歸可以廣義化為 多項邏輯迴歸,以訓練和預測多類別分類問題。例如,對於 $K$ 個可能的結果,其中一個結果可以選為「樞紐」,而其他 $K - 1$ 個結果可以分別對應樞紐結果進行迴歸。在 spark.mllib 中,第一類別 $0$ 選為「樞紐」類別。請參閱 統計學習基礎 的第 4.4 節以取得相關參考。以下是 詳細的數學推導

對於多類別分類問題,演算法會輸出一個多項邏輯迴歸模型,其中包含 $K - 1$ 個對應第一類別進行迴歸的二元邏輯迴歸模型。給定一個新的資料點,將執行 $K - 1$ 個模型,並選取機率最大的類別作為預測類別。

我們實作了兩個演算法來解決邏輯迴歸問題:迷你批次梯度下降和 L-BFGS。我們建議使用 L-BFGS,而不是迷你批次梯度下降,以加快收斂速度。

範例

以下範例說明如何載入範例資料集、建立邏輯迴歸模型,以及使用產生的模型進行預測,以計算訓練誤差。

請注意,Python API 目前尚不支援多類別分類和模型儲存/載入,但未來將會支援。

請參閱 LogisticRegressionWithLBFGS Python 文件LogisticRegressionModel Python 文件 以取得更多關於 API 的詳細資訊。

from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")
在 Spark 儲存庫中的「examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py」中,尋找完整的範例程式碼。

以下程式碼說明如何載入範例多類別資料集、將其分割成訓練和測試,並使用 LogisticRegressionWithLBFGS 來擬合邏輯迴歸模型。然後,會針對測試資料集評估模型並將其儲存至磁碟。

請參閱 LogisticRegressionWithLBFGS Scala 文件LogisticRegressionModel Scala 文件,以取得 API 詳細資料。

import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training)

// Compute raw scores on the test set.
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
  val prediction = model.predict(features)
  (prediction, label)
}

// Get evaluation metrics.
val metrics = new MulticlassMetrics(predictionAndLabels)
val accuracy = metrics.accuracy
println(s"Accuracy = $accuracy")

// Save and load model
model.save(sc, "target/tmp/scalaLogisticRegressionWithLBFGSModel")
val sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/scalaLogisticRegressionWithLBFGSModel")
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/mllib/LogisticRegressionWithLBFGSExample.scala」中,尋找完整的範例程式碼。

以下程式碼說明如何載入範例多類別資料集、將其分割成訓練和測試,並使用 LogisticRegressionWithLBFGS 來擬合邏輯迴歸模型。然後,會針對測試資料集評估模型並將其儲存至磁碟。

請參閱 LogisticRegressionWithLBFGS Java 文件LogisticRegressionModel Java 文件,以取得 API 詳細資料。

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];

// Run training algorithm to build the model.
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training.rdd());

// Compute raw scores on the test set.
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();
System.out.println("Accuracy = " + accuracy);

// Save and load model
model.save(sc, "target/tmp/javaLogisticRegressionWithLBFGSModel");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/javaLogisticRegressionWithLBFGSModel");
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java」中,尋找完整的範例程式碼。

迴歸

線性最小平方、Lasso 和 Ridge 迴歸

線性最小平方法是迴歸問題最常見的公式。它是一種線性方法,如上文公式 $\eqref{eq:regPrimal}$ 中所述,公式中的損失函數由平方損失給出:\[ L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2. \]

透過使用不同類型的正則化,可以推導出各種相關的迴歸方法:普通最小平方法線性最小平方法 不使用正則化;脊迴歸 使用 L2 正則化;套索 使用 L1 正則化。對於所有這些模型,平均損失或訓練誤差,$\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$,稱為 均方誤差

串流線性迴歸

當資料以串流方式傳送時,使用線上擬合迴歸模型很有用,因為當新資料傳送時,可以更新模型參數。spark.mllib 目前使用普通最小平方法支援串流線性迴歸。擬合類似於離線執行的擬合,但擬合會在每個資料批次中發生,因此模型會持續更新以反映串流中的資料。

範例

下列範例說明如何從兩個不同的文字檔輸入串流載入訓練和測試資料,將串流解析為標籤點,將線性回歸模型線上擬合到第一個串流,並對第二個串流進行預測。

首先,我們匯入必要的類別來解析輸入資料和建立模型。

然後,我們為訓練和測試資料建立輸入串流。我們假設已建立 StreamingContext ssc,請參閱 Spark 串流程式設計指南 以取得更多資訊。在此範例中,我們在訓練和測試串流中使用標籤點,但在實際應用中,您可能希望對測試資料使用未標籤的向量。

我們透過將權重初始化為 0 來建立模型。

現在,我們註冊訓練和測試串流,然後開始執行工作。

我們現在可以將包含資料的文字檔儲存到訓練或測試資料夾。每一行都應該是格式化為 (y,[x1,x2,x3]) 的資料點,其中 y 是標籤,而 x1,x2,x3 是特徵。只要將文字檔放置在 sys.argv[1] 中,模型就會更新。只要將文字檔放置在 sys.argv[2] 中,您就會看到預測。當您提供更多資料給訓練目錄時,預測就會變得更好!

以下是完整的範例

import sys

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
testData = ssc.textFileStream(sys.argv[2]).map(parse)

numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])

model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

ssc.start()
ssc.awaitTermination()
在 Spark 儲存庫的「examples/src/main/python/mllib/streaming_linear_regression_example.py」中找到完整的範例程式碼。

首先,我們匯入必要的類別來解析輸入資料和建立模型。

然後,我們為訓練和測試資料建立輸入串流。我們假設已建立 StreamingContext ssc,請參閱 Spark 串流程式設計指南 以取得更多資訊。在此範例中,我們在訓練和測試串流中使用標籤點,但在實際應用中,您可能希望對測試資料使用未標籤的向量。

我們透過將權重初始化為 0 來建立模型,並註冊訓練和測試串流,然後開始執行工作。將預測與真實標籤列印出來,讓我們可以輕鬆看到結果。

最後,我們可以將包含資料的文字檔儲存到訓練或測試資料夾。每一行都應該是格式化為 (y,[x1,x2,x3]) 的資料點,其中 y 是標籤,而 x1,x2,x3 是特徵。只要將文字檔放置在 args(0) 中,模型就會更新。只要將文字檔放置在 args(1) 中,您就會看到預測。當您提供更多資料給訓練目錄時,預測就會變得更好!

以下是完整的範例

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
  .setInitialWeights(Vectors.zeros(numFeatures))

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
在 Spark 儲存庫的「examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegressionExample.scala」中找到完整的範例程式碼。

實作(開發人員)

在幕後,spark.mllib 實作了隨機梯度下降 (SGD) 的簡單分散式版本,建立在基礎的梯度下降原語上(如 最佳化 區段所述)。所有提供的演算法都將正規化參數 (regParam) 作為輸入,以及與隨機梯度下降相關的各種參數 (stepSizenumIterationsminiBatchFraction)。對於其中每一個,我們支援所有三種可能的正規化(無、L1 或 L2)。

對於邏輯迴歸,L-BFGS 版本實作在 LogisticRegressionWithLBFGS 之下,而且此版本支援二元和多項邏輯迴歸,而 SGD 版本只支援二元邏輯迴歸。然而,L-BFGS 版本不支援 L1 正規化,但 SGD 版本支援 L1 正規化。當不需要 L1 正規化時,強烈建議使用 L-BFGS 版本,因為它使用準牛頓法近似逆海森矩陣,因此收斂得更快、更準確。

演算法都以 Scala 實作