迴歸 - 基於 RDD 的 API
等值迴歸
等值迴歸屬於迴歸演算法家族。正式來說,等值迴歸是一個問題,其中給定一組有限實數 $Y = {y_1, y_2, ..., y_n}$
,表示觀察到的回應,以及 $X = {x_1, x_2, ..., x_n}$
要擬合的未知回應值,找出一個函數來最小化
\begin{equation} f(x) = \sum_{i=1}^n w_i (y_i - x_i)^2 \end{equation}
關於受 $x_1\le x_2\le ...\le x_n$
約束的完整順序,其中 $w_i$
是正權重。產生的函數稱為等值迴歸,並且是唯一的。它可以視為在順序限制下的最小平方問題。基本上,等值迴歸是一個 單調函數,最適合原始資料點。
spark.mllib
支援 池鄰近違規演算法,它使用一種方法來 平行化等值迴歸。訓練輸入是一個 RDD,其中包含三個雙精度值的元組,它們依序表示標籤、特徵和權重。如果有多個元組具有相同特徵,則這些元組會聚合為一個元組,如下所示
- 聚合標籤是所有標籤的加權平均值。
- 聚合特徵是唯一的特徵值。
- 聚合權重是所有權重的總和。
此外,等值迴歸演算法有一個名為 $isotonic$ 的可選參數,預設為 true。此引數指定等值迴歸是等值(單調遞增)還是反等值(單調遞減)。
訓練會傳回一個等值迴歸模型,可用於預測已知和未知特徵的標籤。等值迴歸的結果被視為分段線性函數。因此,預測規則是
- 如果預測輸入與訓練特徵完全匹配,則傳回關聯預測。
- 如果預測輸入低於或高於所有訓練特徵,則分別傳回具有最低或最高特徵的預測。
- 如果預測輸入介於兩個訓練特徵之間,則預測會被視為分段線性函數,且會從兩個最接近特徵的預測中計算出內插值。
範例
資料會從檔案中讀取,其中每一行都有標籤和特徵的格式,例如 4710.28,500.00。資料會分割成訓練和測試集。模型會使用訓練集建立,且會從測試集中預測的標籤和真實標籤計算出平均平方誤差。
請參閱 IsotonicRegression
Python 文件 和 IsotonicRegressionModel
Python 文件,以取得更多 API 詳細資料。
import math
from pyspark.mllib.regression import IsotonicRegression, IsotonicRegressionModel
from pyspark.mllib.util import MLUtils
# Load and parse the data
def parsePoint(labeledData):
return (labeledData.label, labeledData.features[0], 1.0)
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_isotonic_regression_libsvm_data.txt")
# Create label, feature, weight tuples from input data with weight set to default value 1.0.
parsedData = data.map(parsePoint)
# Split data into training (60%) and test (40%) sets.
training, test = parsedData.randomSplit([0.6, 0.4], 11)
# Create isotonic regression model from training data.
# Isotonic parameter defaults to true so it is only shown for demonstration
model = IsotonicRegression.train(training)
# Create tuples of predicted and real labels.
predictionAndLabel = test.map(lambda p: (model.predict(p[1]), p[0]))
# Calculate mean squared error between predicted and real labels.
meanSquaredError = predictionAndLabel.map(lambda pl: math.pow((pl[0] - pl[1]), 2)).mean()
print("Mean Squared Error = " + str(meanSquaredError))
# Save and load model
model.save(sc, "target/tmp/myIsotonicRegressionModel")
sameModel = IsotonicRegressionModel.load(sc, "target/tmp/myIsotonicRegressionModel")
資料會從檔案中讀取,其中每一行都有標籤和特徵的格式,例如 4710.28,500.00。資料會分割成訓練和測試集。模型會使用訓練集建立,且會從測試集中預測的標籤和真實標籤計算出平均平方誤差。
請參閱 IsotonicRegression
Scala 文件 和 IsotonicRegressionModel
Scala 文件,以取得 API 詳細資料。
import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel}
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc,
"data/mllib/sample_isotonic_regression_libsvm_data.txt").cache()
// Create label, feature, weight tuples from input data with weight set to default value 1.0.
val parsedData = data.map { labeledPoint =>
(labeledPoint.label, labeledPoint.features(0), 1.0)
}
// Split data into training (60%) and test (40%) sets.
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
val test = splits(1)
// Create isotonic regression model from training data.
// Isotonic parameter defaults to true so it is only shown for demonstration
val model = new IsotonicRegression().setIsotonic(true).run(training)
// Create tuples of predicted and real labels.
val predictionAndLabel = test.map { point =>
val predictedLabel = model.predict(point._2)
(predictedLabel, point._1)
}
// Calculate mean squared error between predicted and real labels.
val meanSquaredError = predictionAndLabel.map { case (p, l) => math.pow((p - l), 2) }.mean()
println(s"Mean Squared Error = $meanSquaredError")
// Save and load model
model.save(sc, "target/tmp/myIsotonicRegressionModel")
val sameModel = IsotonicRegressionModel.load(sc, "target/tmp/myIsotonicRegressionModel")
資料會從檔案中讀取,其中每一行都有標籤和特徵的格式,例如 4710.28,500.00。資料會分割成訓練和測試集。模型會使用訓練集建立,且會從測試集中預測的標籤和真實標籤計算出平均平方誤差。
請參閱 IsotonicRegression
Java 文件 和 IsotonicRegressionModel
Java 文件,以取得 API 詳細資料。
import scala.Tuple2;
import scala.Tuple3;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.regression.IsotonicRegression;
import org.apache.spark.mllib.regression.IsotonicRegressionModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(
jsc.sc(), "data/mllib/sample_isotonic_regression_libsvm_data.txt").toJavaRDD();
// Create label, feature, weight tuples from input data with weight set to default value 1.0.
JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map(point ->
new Tuple3<>(point.label(), point.features().apply(0), 1.0));
// Split data into training (60%) and test (40%) sets.
JavaRDD<Tuple3<Double, Double, Double>>[] splits =
parsedData.randomSplit(new double[]{0.6, 0.4}, 11L);
JavaRDD<Tuple3<Double, Double, Double>> training = splits[0];
JavaRDD<Tuple3<Double, Double, Double>> test = splits[1];
// Create isotonic regression model from training data.
// Isotonic parameter defaults to true so it is only shown for demonstration
IsotonicRegressionModel model = new IsotonicRegression().setIsotonic(true).run(training);
// Create tuples of predicted and real labels.
JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair(point ->
new Tuple2<>(model.predict(point._2()), point._1()));
// Calculate mean squared error between predicted and real labels.
double meanSquaredError = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Mean Squared Error = " + meanSquaredError);
// Save and load model
model.save(jsc.sc(), "target/tmp/myIsotonicRegressionModel");
IsotonicRegressionModel sameModel =
IsotonicRegressionModel.load(jsc.sc(), "target/tmp/myIsotonicRegressionModel");