降維 - 基於 RDD 的 API
降維是減少考慮變數數目的程序。它可以用於從原始且雜訊的變數中萃取潛在特徵,或在維持結構的同時壓縮資料。spark.mllib
提供對 RowMatrix 類別的降維支援。
奇異值分解 (SVD)
奇異值分解 (SVD) 將矩陣因式分解為三個矩陣:$U$、$\Sigma$ 和 $V$,使得
\[ A = U \Sigma V^T, \]
其中
- $U$ 是正交矩陣,其欄位稱為左奇異向量,
- $\Sigma$ 是對角線矩陣,其非負對角線按遞減順序排列,其對角線稱為奇異值,
- $V$ 是正交矩陣,其欄位稱為右奇異向量。
對於大型矩陣,我們通常不需要完整的因式分解,而只需要頂端的奇異值及其相關的奇異向量。這可以節省儲存空間、消除雜訊並還原矩陣的低階結構。
如果我們保留頂端的 $k$ 個奇異值,則產生的低階矩陣的維度將為
$U$
:$m \times k$
,$\Sigma$
:$k \times k$
,$V$
:$n \times k$
。
效能
我們假設 $n$ 小於 $m$。奇異值和右奇異向量來自於 Gramian 矩陣 $A^T A$ 的特徵值和特徵向量。儲存左奇異向量 $U$ 的矩陣,是透過矩陣乘法計算而得,為 $U = A (V S^{-1})$,如果使用者透過 computeU 參數要求的話。實際使用的方法會根據運算成本自動決定
- 如果 $n$ 很小 ($n < 100$) 或 $k$ 與 $n$ 相比很大 ($k > n / 2$),我們會先計算 Gramian 矩陣,然後在驅動程式上計算其最大的特徵值和特徵向量。這需要在每個執行器和驅動程式上進行一次傳遞,儲存量為 $O(n^2)$,在驅動程式上花費的時間為 $O(n^2 k)$。
- 否則,我們會以分配式的方式計算 $(A^T A) v$,並將其傳送至 ARPACK,在驅動程式節點上計算 $(A^T A)$ 的最大特徵值和特徵向量。這需要 $O(k)$ 次傳遞,在每個執行器上儲存量為 $O(n)$,在驅動程式上儲存量為 $O(n k)$。
SVD 範例
spark.mllib
提供 SVD 功能給行列向矩陣,在 RowMatrix 類別中提供。
有關 API 的詳細資訊,請參閱 SingularValueDecomposition
Python 文件。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 5 singular values and corresponding singular vectors.
svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
如果 U
定義為 IndexedRowMatrix
,則相同的程式碼適用於 IndexedRowMatrix
。
有關 API 的詳細資訊,請參閱 SingularValueDecomposition
Scala 文件。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
如果 U
定義為 IndexedRowMatrix
,則相同的程式碼適用於 IndexedRowMatrix
。
有關 API 的詳細資訊,請參閱 SingularValueDecomposition
Java 文件。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
Vector s = svd.s(); // The singular values are stored in a local dense vector.
Matrix V = svd.V(); // The V factor is a local dense matrix.
如果 U
定義為 IndexedRowMatrix
,則相同的程式碼適用於 IndexedRowMatrix
。
主成分分析 (PCA)
主成分分析 (PCA) 是一種統計方法,用於尋找一個旋轉,使得第一個座標具有最大的可能變異,而每個後續座標依序具有最大的可能變異。旋轉矩陣的欄稱為主成分。PCA 廣泛用於降維。
spark.mllib
支援儲存在列導向格式和任何向量的細長矩陣的 PCA。
以下程式碼示範如何計算 RowMatrix
的主成分,並使用它們將向量投影到低維度空間。
請參閱 RowMatrix
Python 文件 以取得 API 的詳細資料。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)
# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)
以下程式碼示範如何計算 RowMatrix
的主成分,並使用它們將向量投影到低維度空間。
請參閱 RowMatrix
Scala 文件 以取得 API 的詳細資料。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
val pc: Matrix = mat.computePrincipalComponents(4)
// Project the rows to the linear space spanned by the top 4 principal components.
val projected: RowMatrix = mat.multiply(pc)
以下程式碼示範如何計算原始向量的主成分,並使用它們將向量投影到低維度空間,同時保留關聯標籤
請參閱 PCA
Scala 文件 以取得 API 的詳細資料。
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
val data: RDD[LabeledPoint] = sc.parallelize(Seq(
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 1)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 1, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0)),
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0))))
// Compute the top 5 principal components.
val pca = new PCA(5).fit(data.map(_.features))
// Project vectors to the linear space spanned by the top 5 principal
// components, keeping the label
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
以下程式碼示範如何計算 RowMatrix
的主成分,並使用它們將向量投影到低維度空間。
請參閱 RowMatrix
Java 文件 以取得 API 的詳細資料。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);
// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);