資料類型 - 基於 RDD 的 API

MLlib 支援儲存在單一機器上的本機向量與矩陣,以及由一個或多個 RDD 支援的分散式矩陣。本機向量與本機矩陣是作為公共介面的簡單資料模型。基礎線性代數運算由 Breeze 提供。監督式學習中使用的訓練範例在 MLlib 中稱為「標籤點」。

本機向量

本機向量具有整數型且從 0 開始的索引,以及儲存在單一機器上的雙精度型值。MLlib 支援兩種本機向量:稠密和稀疏。稠密向量由表示其輸入值的雙精度陣列支援,而稀疏向量由兩個平行陣列支援:索引和值。例如,向量 (1.0, 0.0, 3.0) 可表示為稠密格式 [1.0, 0.0, 3.0] 或稀疏格式 (3, [0, 2], [1.0, 3.0]),其中 3 是向量的長度。

MLlib 辨識下列類型為稠密向量

  • NumPy 的 array
  • Python 的清單,例如 [1, 2, 3]

以及下列為稀疏向量

我們建議使用 NumPy 陣列而非清單以提高效率,並使用在 Vectors 中實作的工廠方法來建立稀疏向量。

請參閱 Vectors Python 文件,以取得更多關於 API 的詳細資料。

import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))

本地向量的基底類別為 Vector,我們提供了兩個實作:DenseVectorSparseVector。我們建議使用 Vectors 中實作的工廠方法來建立本地向量。

有關 API 的詳細資訊,請參閱 Vector Scala 文件Vectors Scala 文件

import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

注意: Scala 預設會匯入 scala.collection.immutable.Vector,因此您必須明確匯入 org.apache.spark.mllib.linalg.Vector 才能使用 MLlib 的 Vector

本地向量的基底類別為 Vector,我們提供了兩個實作:DenseVectorSparseVector。我們建議使用 Vectors 中實作的工廠方法來建立本地向量。

有關 API 的詳細資訊,請參閱 Vector Java 文件Vectors Java 文件

import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Create a dense vector (1.0, 0.0, 3.0).
Vector dv = Vectors.dense(1.0, 0.0, 3.0);
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});

標籤點

標記點是一個本地向量,可以是稠密的或稀疏的,與標記/回應相關聯。在 MLlib 中,標記點用於監督式學習演算法。我們使用雙精度浮點數儲存標記,因此我們可以在迴歸和分類中使用標記點。對於二元分類,標記應為 0(負)或 1(正)。對於多類分類,標記應為從零開始的類別索引:0, 1, 2, ...

標記點由 LabeledPoint 表示。

有關 API 的更多詳細資訊,請參閱 LabeledPoint Python 文件

from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

標記點由案例類別 LabeledPoint 表示。

有關 API 的詳細資訊,請參閱 LabeledPoint Scala 文件

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

標記點由 LabeledPoint 表示。

有關 API 的詳細資訊,請參閱 LabeledPoint Java 文件

import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;

// Create a labeled point with a positive label and a dense feature vector.
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));

// Create a labeled point with a negative label and a sparse feature vector.
LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));

稀疏資料

在實務上,擁有稀疏訓練資料是很常見的。MLlib 支援讀取以 LIBSVM 格式儲存的訓練範例,這是 LIBSVMLIBLINEAR 使用的預設格式。這是一種文字格式,其中每一行都使用下列格式表示標記的稀疏特徵向量

label index1:value1 index2:value2 ...

其中索引是從 1 開始且依遞增順序排列。載入後,特徵索引會轉換為從 0 開始。

MLUtils.loadLibSVMFile 會讀取以 LIBSVM 格式儲存的訓練範例。

有關 API 的更多詳細資訊,請參閱 MLUtils Python 文件

from pyspark.mllib.util import MLUtils

examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile 會讀取以 LIBSVM 格式儲存的訓練範例。

有關 API 的詳細資訊,請參閱 MLUtils Scala 文件

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile 會讀取以 LIBSVM 格式儲存的訓練範例。

有關 API 的詳細資訊,請參閱 MLUtils Java 文件

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;

JavaRDD<LabeledPoint> examples = 
  MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();

本機矩陣

本地矩陣具有整數型列和欄索引以及雙精度型值,儲存在單一機器上。MLlib 支援稠密矩陣,其輸入值儲存在單一雙精度陣列中,順序為欄優先順序,以及稀疏矩陣,其非零輸入值儲存在壓縮稀疏欄 (CSC) 格式中,順序為欄優先順序。例如,下列稠密矩陣 \[ \begin{pmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{pmatrix} \] 儲存在一維陣列 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 中,矩陣大小為 (3, 2)

本地矩陣的基底類別是 Matrix,我們提供兩個實作:DenseMatrixSparseMatrix。我們建議使用在 Matrices 中實作的工廠方法來建立本地矩陣。請記住,MLlib 中的本地矩陣儲存在欄優先順序中。

有關 API 的更多詳細資訊,請參閱 Matrix Python 文件Matrices Python 文件

from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

本地矩陣的基本類別為 Matrix,我們提供兩個實作:DenseMatrixSparseMatrix。我們建議使用在 Matrices 中實作的工廠方法來建立本地矩陣。請記住,MLlib 中的本地矩陣是以欄優先順序儲存的。

有關 API 的詳細資訊,請參閱 Matrix Scala 文件Matrices Scala 文件

import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

本地矩陣的基本類別為 Matrix,我們提供兩個實作:DenseMatrixSparseMatrix。我們建議使用在 Matrices 中實作的工廠方法來建立本地矩陣。請記住,MLlib 中的本地矩陣是以欄優先順序儲存的。

有關 API 的詳細資訊,請參閱 Matrix Java 文件Matrices Java 文件

import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Matrices;

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
Matrix sm = Matrices.sparse(3, 2, new int[] {0, 1, 3}, new int[] {0, 2, 1}, new double[] {9, 6, 8});

分散式矩陣

分散式矩陣具有長型列和欄索引,以及雙精度型值,分散儲存在一個或多個 RDD 中。選擇正確的格式來儲存大型分散式矩陣非常重要。將分散式矩陣轉換為不同的格式可能需要進行全域性洗牌,這相當耗費資源。到目前為止,已經實作了四種類型的分散式矩陣。

基本類型稱為 RowMatrixRowMatrix 是沒有有意義列索引的列導向分散矩陣,例如特徵向量的集合。它由其列的 RDD 做為後盾,其中每一列都是一個區域向量。我們假設 RowMatrix 的欄位數不會太大,因此可以合理地將單一區域向量傳送到驅動程式,也可以使用單一節點儲存/操作它。 IndexedRowMatrix 類似於 RowMatrix,但有列索引,可供識別列和執行聯結。 CoordinateMatrix 是儲存在 座標清單 (COO) 格式的分散矩陣,由其條目的 RDD 做為後盾。 BlockMatrix 是由 MatrixBlock 的 RDD 做為後盾的分散矩陣,而 MatrixBlock(Int, Int, Matrix) 的組。

注意

分散矩陣的基礎 RDD 必須是確定性的,因為我們會快取矩陣大小。一般而言,使用非確定性 RDD 會導致錯誤。

RowMatrix

RowMatrix 是沒有有意義列索引的列導向分散矩陣,由其列的 RDD 做為後盾,其中每一列都是一個區域向量。由於每一列都由區域向量表示,因此欄位數會受到整數範圍的限制,但實際上應該小很多。

可以從向量的 RDD 建立 RowMatrix

請參閱 RowMatrix Python 文件,以取得有關 API 的更多詳細資料。

from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows

可以從 RDD[Vector] 執行個體建立 RowMatrix。然後,我們可以計算其欄位摘要統計資料和分解。 QR 分解 的形式為 A = QR,其中 Q 是正交矩陣,而 R 是上三角矩陣。有關 奇異值分解 (SVD)主成分分析 (PCA),請參閱 降維

請參閱 RowMatrix Scala 文件,以取得有關 API 的詳細資料。

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// QR decomposition 
val qrResult = mat.tallSkinnyQR(true)

可以從 JavaRDD<Vector> 執行個體建立 RowMatrix。然後,我們可以計算其欄位摘要統計資料。

請參閱 RowMatrix Java 文件,以取得有關 API 的詳細資料。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<Vector> rows = ... // a JavaRDD of local vectors
// Create a RowMatrix from a JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// QR decomposition 
QRDecomposition<RowMatrix, Matrix> result = mat.tallSkinnyQR(true);

IndexedRowMatrix

一個 IndexedRowMatrix 類似於 RowMatrix,但具有有意義的行索引。它由一個索引行 RDD 所支援,因此每一行都由其索引(長整型)和一個本地向量表示。

一個 IndexedRowMatrix 可以從一個 RDDIndexedRow 建立,其中 IndexedRow(long, vector) 的包裝器。一個 IndexedRowMatrix 可以透過捨棄其行索引轉換為 RowMatrix

請參閱 IndexedRowMatrix Python 文件 以取得更多有關 API 的詳細資訊。

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Create an RDD of indexed rows.
#   - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
                              IndexedRow(1, [4, 5, 6]),
                              IndexedRow(2, [7, 8, 9]),
                              IndexedRow(3, [10, 11, 12])])
#   - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
                              (2, [7, 8, 9]), (3, [10, 11, 12])])

# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows

# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()

一個 IndexedRowMatrix 可以從一個 RDD[IndexedRow] 執行個體建立,其中 IndexedRow(Long, Vector) 的包裝器。一個 IndexedRowMatrix 可以透過捨棄其行索引轉換為 RowMatrix

請參閱 IndexedRowMatrix Scala 文件 以取得有關 API 的詳細資訊。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()

一個 IndexedRowMatrix 可以從一個 JavaRDD<IndexedRow> 執行個體建立,其中 IndexedRow(long, Vector) 的包裝器。一個 IndexedRowMatrix 可以透過捨棄其行索引轉換為 RowMatrix

請參閱 IndexedRowMatrix Java 文件 以取得有關 API 的詳細資訊。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<IndexedRow> rows = ... // a JavaRDD of indexed rows
// Create an IndexedRowMatrix from a JavaRDD<IndexedRow>.
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();

CoordinateMatrix

一個 CoordinateMatrix 是由其條目 RDD 所支援的分布式矩陣。每個條目都是 (i: Long, j: Long, value: Double) 的元組,其中 i 是行索引,j 是列索引,而 value 是條目值。僅當矩陣的兩個維度都很巨大且矩陣非常稀疏時,才應使用 CoordinateMatrix

一個 CoordinateMatrix 可以從一個 RDDMatrixEntry 條目建立,其中 MatrixEntry(long, long, float) 的包裝器。一個 CoordinateMatrix 可以透過呼叫 toRowMatrix 轉換為 RowMatrix,或透過呼叫 toIndexedRowMatrix 轉換為具有稀疏行的 IndexedRowMatrix

請參閱 CoordinateMatrix Python 文件 以取得更多有關 API 的詳細資訊。

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Create an RDD of coordinate entries.
#   - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
#   - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])

# Create a CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)

# Get its size.
m = mat.numRows()  # 3
n = mat.numCols()  # 2

# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries

# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()

可以從 RDD[MatrixEntry] 執行個體建立 CoordinateMatrix,其中 MatrixEntry(Long, Long, Double) 的包裝器。可以透過呼叫 toIndexedRowMatrixCoordinateMatrix 轉換成具有稀疏列的 IndexedRowMatrix。目前不支援 CoordinateMatrix 的其他運算。

請參閱 CoordinateMatrix Scala 文件以取得 API 詳細資料。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()

可以從 JavaRDD<MatrixEntry> 執行個體建立 CoordinateMatrix,其中 MatrixEntry(long, long, double) 的包裝器。可以透過呼叫 toIndexedRowMatrixCoordinateMatrix 轉換成具有稀疏列的 IndexedRowMatrix。目前不支援 CoordinateMatrix 的其他運算。

請參閱 CoordinateMatrix Java 文件以取得 API 詳細資料。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of matrix entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();

BlockMatrix

BlockMatrix 是由 MatrixBlock 的 RDD 支援的分布式矩陣,其中 MatrixBlock((Int, Int), Matrix) 的組,其中 (Int, Int) 是區塊的索引,而 Matrix 是給定索引處的子矩陣,大小為 rowsPerBlock x colsPerBlockBlockMatrix 支援方法,例如 addmultiply,並搭配另一個 BlockMatrixBlockMatrix 也有輔助函式 validate,可用於檢查 BlockMatrix 是否設定正確。

可以從子矩陣區塊的 RDD 建立 BlockMatrix,其中子矩陣區塊是 ((blockRowIndex, blockColIndex), sub-matrix) 組。

請參閱 BlockMatrix Python 文件以取得 API 的更多詳細資料。

from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
                         ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])

# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)

# Get its size.
m = mat.numRows()  # 6
n = mat.numCols()  # 2

# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks

# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()

最簡單的方法是從 IndexedRowMatrixCoordinateMatrix 呼叫 toBlockMatrix 來建立 BlockMatrix。預設情況下,toBlockMatrix 會建立大小為 1024 x 1024 的區塊。使用者可以透過 toBlockMatrix(rowsPerBlock, colsPerBlock) 提供值來變更區塊大小。

有關 API 的詳細資訊,請參閱 BlockMatrix Scala 文件

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()

// Calculate A^T A.
val ata = matA.transpose.multiply(matA)

最簡單的方法是從 IndexedRowMatrixCoordinateMatrix 呼叫 toBlockMatrix 來建立 BlockMatrix。預設情況下,toBlockMatrix 會建立大小為 1024 x 1024 的區塊。使用者可以透過 toBlockMatrix(rowsPerBlock, colsPerBlock) 提供值來變更區塊大小。

有關 API 的詳細資訊,請參閱 BlockMatrix Java 文件

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// Transform the CoordinateMatrix to a BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate();

// Calculate A^T A.
BlockMatrix ata = matA.transpose().multiply(matA);