聚類 - 基於 RDD 的 API

聚類是一種非監督式學習問題,我們根據某種相似性概念,將實體子集合彼此分組。聚類通常用於探索性分析和/或作為階層式監督式學習管線的組成部分(其中為每個群集訓練不同的分類器或迴歸模型)。

spark.mllib 套件支援下列模型

k 均值

k 均值是最常用的聚類演算法之一,它將資料點聚類成預先定義的群集數。 spark.mllib 實作包含平行變異的 k 均值++ 方法,稱為 kmeans||spark.mllib 中的實作具有下列參數

範例

下列範例可以在 PySpark shell 中測試。

在下列範例中,載入和剖析資料後,我們使用 KMeans 物件將資料聚類成兩個群集。將所需群集數傳遞給演算法。然後我們計算集合內平方和誤差總和 (WSSSE)。您可以透過增加 k 來減少此誤差量測。事實上,最佳 k 通常是 WSSSE 圖形中出現「肘部」的地方。

請參閱 KMeans Python 文件KMeansModel Python 文件,以取得更多關於 API 的詳細資訊。

from numpy import array
from math import sqrt

from pyspark.mllib.clustering import KMeans, KMeansModel

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
在 Spark 回存區中「examples/src/main/python/mllib/k_means_example.py」中找到完整的範例程式碼。

下列程式碼片段可以在 spark-shell 中執行。

在以下範例中,載入並剖析資料後,我們使用 KMeans 物件將資料分群成兩個群集。將所需群集數目傳遞給演算法。然後計算群集內平方和誤差總和 (WSSSE)。您可以透過增加 k 來減少此誤差量測。事實上,最佳 k 通常是 WSSSE 圖形中出現「肘點」的 k

請參閱 KMeans Scala 文件KMeansModel Scala 文件 以取得 API 的詳細資料。

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")

// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala」中,尋找完整的範例程式碼。

MLlib 的所有方法都使用 Java 友善類型,因此您可以像在 Scala 中一樣,在 Java 中匯入並呼叫它們。唯一的注意事項是,這些方法採用 Scala RDD 物件,而 Spark Java API 使用單獨的 JavaRDD 類別。您可以呼叫 .rdd() 對您的 JavaRDD 物件,將 Java RDD 轉換為 Scala RDD。以下是與 Scala 中提供的範例等效的自訂應用程式範例

請參閱 KMeans Java 文件KMeansModel Java 文件 以取得 API 的詳細資料。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
parsedData.cache();

// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
  System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);

// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);

// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
  "target/org/apache/spark/JavaKMeansExample/KMeansModel");
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java」中,尋找完整的範例程式碼。

高斯混合

高斯混合模型 代表一個複合分配,其中點從 k 個高斯子分配中抽出,每個子分配都有自己的機率。spark.mllib 實作使用 期望最大化 演算法,根據一組樣本誘導出最大似然模型。實作具有下列參數

範例

在以下範例中,載入並剖析資料後,我們使用 GaussianMixture 物件將資料分群為兩個群集。將所需群集數目傳遞給演算法。然後,我們輸出混合模型的參數。

請參閱 GaussianMixture Python 文件GaussianMixtureModel Python 文件 以取得更多有關 API 的詳細資料。

from numpy import array

from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel

# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))

# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)

# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
    .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")

# output parameters of model
for i in range(2):
    print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
          "sigma = ", gmm.gaussians[i].sigma.toArray())
在 Spark 儲存庫中的「examples/src/main/python/mllib/gaussian_mixture_example.py」中尋找完整的範例程式碼。

在以下範例中,載入並剖析資料後,我們使用 GaussianMixture 物件將資料分群為兩個群集。將所需群集數目傳遞給演算法。然後,我們輸出混合模型的參數。

請參閱 GaussianMixture Scala 文件GaussianMixtureModel Scala 文件 以取得有關 API 的詳細資料。

import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)

// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
  "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")

// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
  println("weight=%f\nmu=%s\nsigma=\n%s\n" format
    (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala」中尋找完整的範例程式碼。

MLlib 的所有方法都使用 Java 友善類型,因此您可以像在 Scala 中一樣,在 Java 中匯入並呼叫它們。唯一的注意事項是,這些方法採用 Scala RDD 物件,而 Spark Java API 使用單獨的 JavaRDD 類別。您可以呼叫 .rdd() 對您的 JavaRDD 物件,將 Java RDD 轉換為 Scala RDD。以下是與 Scala 中提供的範例等效的自訂應用程式範例

請參閱 GaussianMixture Java 文件GaussianMixtureModel Java 文件 以取得有關 API 的詳細資料。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.trim().split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
parsedData.cache();

// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());

// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
  "target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");

// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
  System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
    gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java」中尋找完整的範例程式碼。

冪次迭代聚類 (PIC)

冪次迭代分群 (PIC) 是一種可擴充且有效率的演算法,用於分群圖形的頂點,給定邊緣屬性為成對相似性,如 Lin and Cohen, Power Iteration Clustering 中所述。它透過 冪次迭代 計算圖形正規化親和矩陣的偽特徵向量,並使用它來分群頂點。 spark.mllib 包含使用 GraphX 作為其後端的 PIC 實作。它採用 (srcId, dstId, similarity) 組合的 RDD,並輸出具有分群指派的模型。相似性必須是非負值。PIC 假設相似性測量值是對稱的。一對 (srcId, dstId) 不論順序如何,在輸入資料中最多只會出現一次。如果輸入資料中缺少一對,則其相似性將視為零。 spark.mllib 的 PIC 實作採用下列 (超) 參數

範例

以下,我們展示程式碼片段,說明如何使用 spark.mllib 中的 PIC。

PowerIterationClustering 實作 PIC 演算法。它採用 RDD,其中包含表示關聯矩陣的 (srcId: Long, dstId: Long, similarity: Double) 組合。呼叫 PowerIterationClustering.run 會傳回 PowerIterationClusteringModel,其中包含計算出的叢集指派。

請參閱 PowerIterationClustering Python 文件PowerIterationClusteringModel Python 文件,以取得更多 API 詳細資料。

from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel

# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))

# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)

model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))

# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
    .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
在 Spark 回應程式中,於「examples/src/main/python/mllib/power_iteration_clustering_example.py」中尋找完整的範例程式碼。

PowerIterationClustering 實作 PIC 演算法。它採用 RDD,其中包含表示關聯矩陣的 (srcId: Long, dstId: Long, similarity: Double) 組合。呼叫 PowerIterationClustering.run 會傳回 PowerIterationClusteringModel,其中包含計算出的叢集指派。

請參閱 PowerIterationClustering Scala 文件PowerIterationClusteringModel Scala 文件,以取得 API 詳細資料。

import org.apache.spark.mllib.clustering.PowerIterationClustering

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
  .setK(params.k)
  .setMaxIterations(params.maxIterations)
  .setInitializationMode("degree")
  .run(circlesRdd)

val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
  .map { case (k, v) =>
    s"$k -> ${v.sorted.mkString("[", ",", "]")}"
  }.mkString(", ")
val sizesStr = assignments.map {
  _._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
在 Spark 回應程式中,於「examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala」中尋找完整的範例程式碼。

PowerIterationClustering 實作 PIC 演算法。它會取得一個 JavaRDD,其中包含表示親和矩陣的 (srcId: Long, dstId: Long, similarity: Double) 組合。呼叫 PowerIterationClustering.run 會傳回一個 PowerIterationClusteringModel,其中包含已計算的叢集指派。

請參閱 PowerIterationClustering Java 文件和 PowerIterationClusteringModel Java 文件,以取得 API 的詳細資訊。

import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;

JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
  new Tuple3<>(0L, 1L, 0.9),
  new Tuple3<>(1L, 2L, 0.9),
  new Tuple3<>(2L, 3L, 0.9),
  new Tuple3<>(3L, 4L, 0.1),
  new Tuple3<>(4L, 5L, 0.9)));

PowerIterationClustering pic = new PowerIterationClustering()
  .setK(2)
  .setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);

for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
  System.out.println(a.id() + " -> " + a.cluster());
}
請在 Spark 回應程式中找到完整的範例程式碼「examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java」。

潛在狄利克雷分配 (LDA)

隱含狄利克雷配置 (LDA) 是一種主題模型,可以從文字文件集合推論出主題。LDA 可以視為一種叢集演算法,如下所示

LDA 支援透過 setOptimizer 函數使用不同的推論演算法。 EMLDAOptimizer 使用對數似然函數上的 期望最大化 來學習叢集,並產生全面的結果,而 OnlineLDAOptimizer 則使用反覆最小批次抽樣進行 線上變異推論,而且通常對記憶體很友善。

LDA 會取得一個文件集合,其中包含字數向量和下列參數 (使用建造者模式設定)

所有 spark.mllib 的 LDA 模型都支援

注意:LDA 仍是積極開發中的實驗性功能。因此,某些功能只會出現在最佳化器/最佳化器產生的兩個模型之一中。目前,分散式模型可以轉換成區域模型,但反之則不行。

以下討論會分別說明每個最佳化器/模型配對。

期望最大化

實作於 EMLDAOptimizerDistributedLDAModel 中。

對於提供給 LDA 的參數

注意:執行足夠的迭代很重要。在早期迭代中,EM 通常會有無用的主題,但這些主題在更多次迭代後會大幅改善。根據您的資料集,使用至少 20 次,甚至可能是 50-100 次迭代通常是合理的。

EMLDAOptimizer 會產生 DistributedLDAModel,它不僅會儲存推論的主題,還會儲存完整的訓練語料庫和訓練語料庫中每個文件的每個主題分佈。DistributedLDAModel 支援

在線變分貝氏

OnlineLDAOptimizerLocalLDAModel 中實作。

對於提供給 LDA 的參數

此外,OnlineLDAOptimizer 接受下列參數

OnlineLDAOptimizer 會產生 LocalLDAModel,它只會儲存推論的主題。 LocalLDAModel 支援

範例

在下列範例中,我們會載入代表文件語料庫的字數向量。然後我們使用 LDA 從文件中推論出三個主題。已將所需叢集的數目傳遞給演算法。然後我們會輸出主題,表示為字詞的機率分佈。

請參閱 LDA Python 文件LDAModel Python 文件,以取得更多 API 詳細資料。

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
    .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
在 Spark 儲存庫中的「examples/src/main/python/mllib/latent_dirichlet_allocation_example.py」中,找到完整的範例程式碼。

請參閱 LDA Scala 文件DistributedLDAModel Scala 文件,以取得 API 詳細資料。

import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()

// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)

// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
  print(s"Topic $topic :")
  for (word <- Range(0, ldaModel.vocabSize)) {
    print(s"${topics(word, topic)}")
  }
  println()
}

// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
  "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala」中,找到完整的範例程式碼。

有關 API 的詳細資訊,請參閱 LDA Java 文件DistributedLDAModel Java 文件

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.trim().split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
  JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();

// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);

// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
  + " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
  System.out.print("Topic " + topic + ":");
  for (int word = 0; word < ldaModel.vocabSize(); word++) {
    System.out.print(" " + topics.apply(word, topic));
  }
  System.out.println();
}

ldaModel.save(jsc.sc(),
  "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
  "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java」中尋找完整的範例程式碼。

二分 k 均值

兩分 K 平均值通常可以比一般 K 平均值快很多,但它通常會產生不同的分群。

兩分 K 平均值是一種 階層式分群。階層式分群是最常使用的分群分析方法之一,用於建立分群的階層。階層式分群的策略通常分為兩種

兩分 K 平均值演算法是一種分裂式演算法。MLlib 中的實作具有以下參數

範例

有關 API 的更多詳細資訊,請參閱 BisectingKMeans Python 文件BisectingKMeansModel Python 文件

from numpy import array

from pyspark.mllib.clustering import BisectingKMeans

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)

# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
在 Spark 儲存庫中,於「examples/src/main/python/mllib/bisecting_k_means_example.py」中尋找完整的範例程式碼。

有關 API 的詳細資訊,請參閱 BisectingKMeans Scala 文件BisectingKMeansModel Scala 文件

import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()

// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)

// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
  println(s"Cluster Center ${idx}: ${center}")
}
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala」中尋找完整的範例程式碼。

請參閱 BisectingKMeans Java 文件BisectingKMeansModel Java 文件 以了解 API 的詳細資訊。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

List<Vector> localData = Arrays.asList(
  Vectors.dense(0.1, 0.1),   Vectors.dense(0.3, 0.3),
  Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
  Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
  Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);

BisectingKMeans bkm = new BisectingKMeans()
  .setK(4);
BisectingKMeansModel model = bkm.run(data);

System.out.println("Compute Cost: " + model.computeCost(data));

Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
  Vector clusterCenter = clusterCenters[i];
  System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
在 Spark 回應程式中,找到「examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java」的完整範例程式碼。

串流 k 均值

當資料以串流形式到達時,我們可能想要動態估計叢集,並在新的資料到達時更新它們。spark.mllib 提供對串流 k 平均叢集的支援,其中包含控制估計值的衰減(或「遺忘」)的參數。此演算法使用 mini 批次 k 平均更新規則的概括。對於每一批次資料,我們將所有點分配給它們最近的叢集,計算新的叢集中心,然後使用下列公式更新每個叢集:

\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation} \begin{equation} n_{t+1} = n_t + m_t \end{equation}

其中 $c_t$ 是叢集的先前中心,$n_t$ 是迄今已分配給叢集的點數,$x_t$ 是來自目前批次的新的叢集中心,而 $m_t$ 是在目前批次中新增到叢集的點數。衰減因子 $\alpha$ 可用於忽略過去:使用 $\alpha$=1 時,將從一開始使用所有資料;使用 $\alpha$=0 時,將只使用最近的資料。這類似於指數加權移動平均。

衰減可以使用 halfLife 參數指定,它決定正確的衰減因子 a,使得對於在時間 t 獲取的資料,其在時間 t + halfLife 的貢獻將下降至 0.5。時間單位可以指定為 batchespoints,更新規則將相應調整。

範例

此範例說明如何估計串流資料的叢集。

有關 API 的更多詳細資訊,請參閱 StreamingKMeans Python 文件。有關 StreamingContext 的詳細資訊,請參閱 Spark 串流程式設計指南

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(')')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))

    return LabeledPoint(label, vec)

trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
    .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))

testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)

trainingQueue = [trainingData]
testingQueue = [testingData]

trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)

# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)

result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()

ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
在 Spark 儲存庫中的「examples/src/main/python/mllib/streaming_k_means_example.py」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 StreamingKMeans Scala 文件。有關 StreamingContext 的詳細資訊,請參閱 Spark 串流程式設計指南

import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val model = new StreamingKMeans()
  .setK(args(3).toInt)
  .setDecayFactor(1.0)
  .setRandomCenters(args(4).toInt, 0.0)

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala」中尋找完整的範例程式碼。

當您新增包含資料的新文字檔案時,叢集中心將會更新。每個訓練點應格式化為 [x1, x2, x3],每個測試資料點應格式化為 (y, [x1, x2, x3]),其中 y 是某個有用的標籤或識別碼(例如,真正的類別指派)。任何時候,只要文字檔案放置在 /training/data/dir 中,模型就會更新。任何時候,只要文字檔案放置在 /testing/data/dir 中,您就會看到預測。有了新資料,叢集中心就會改變!