聚類 - 基於 RDD 的 API
聚類是一種非監督式學習問題,我們根據某種相似性概念,將實體子集合彼此分組。聚類通常用於探索性分析和/或作為階層式監督式學習管線的組成部分(其中為每個群集訓練不同的分類器或迴歸模型)。
spark.mllib
套件支援下列模型
k 均值
k 均值是最常用的聚類演算法之一,它將資料點聚類成預先定義的群集數。 spark.mllib
實作包含平行變異的 k 均值++ 方法,稱為 kmeans||。spark.mllib
中的實作具有下列參數
- k 是所需群集數。請注意,可能會回傳少於 k 個群集,例如,如果要聚類的相異點少於 k 個。
- maxIterations 是要執行的最大迭代次數。
- initializationMode 指定隨機初始化或透過 k 均值||初始化。
- runs 此參數自 Spark 2.0.0 起無效。
- initializationSteps 決定 k 均值||演算法中的步驟數。
- epsilon 決定距離閾值,我們在其中考慮 k 均值已收斂。
- initialModel 是用於初始化的群集中心選用集。如果提供此參數,只會執行一次執行。
範例
下列範例可以在 PySpark shell 中測試。
在下列範例中,載入和剖析資料後,我們使用 KMeans 物件將資料聚類成兩個群集。將所需群集數傳遞給演算法。然後我們計算集合內平方和誤差總和 (WSSSE)。您可以透過增加 k 來減少此誤差量測。事實上,最佳 k 通常是 WSSSE 圖形中出現「肘部」的地方。
請參閱 KMeans
Python 文件 和 KMeansModel
Python 文件,以取得更多關於 API 的詳細資訊。
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
下列程式碼片段可以在 spark-shell
中執行。
在以下範例中,載入並剖析資料後,我們使用 KMeans
物件將資料分群成兩個群集。將所需群集數目傳遞給演算法。然後計算群集內平方和誤差總和 (WSSSE)。您可以透過增加 k 來減少此誤差量測。事實上,最佳 k 通常是 WSSSE 圖形中出現「肘點」的 k。
請參閱 KMeans
Scala 文件 和 KMeansModel
Scala 文件 以取得 API 的詳細資料。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
MLlib 的所有方法都使用 Java 友善類型,因此您可以像在 Scala 中一樣,在 Java 中匯入並呼叫它們。唯一的注意事項是,這些方法採用 Scala RDD 物件,而 Spark Java API 使用單獨的 JavaRDD
類別。您可以呼叫 .rdd()
對您的 JavaRDD
物件,將 Java RDD 轉換為 Scala RDD。以下是與 Scala 中提供的範例等效的自訂應用程式範例
請參閱 KMeans
Java 文件 和 KMeansModel
Java 文件 以取得 API 的詳細資料。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");
高斯混合
高斯混合模型 代表一個複合分配,其中點從 k 個高斯子分配中抽出,每個子分配都有自己的機率。spark.mllib
實作使用 期望最大化 演算法,根據一組樣本誘導出最大似然模型。實作具有下列參數
- k 是所需群集數目。
- convergenceTol 是我們認為已達到收斂時對數似然度的最大變動。
- maxIterations 是在未達到收斂前執行迭代的最大次數。
- initialModel 是 EM 演算法開始執行的選用起始點。如果省略此參數,將從資料中建構隨機起始點。
範例
在以下範例中,載入並剖析資料後,我們使用 GaussianMixture 物件將資料分群為兩個群集。將所需群集數目傳遞給演算法。然後,我們輸出混合模型的參數。
請參閱 GaussianMixture
Python 文件 和 GaussianMixtureModel
Python 文件 以取得更多有關 API 的詳細資料。
from numpy import array
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
.load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# output parameters of model
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
在以下範例中,載入並剖析資料後,我們使用 GaussianMixture 物件將資料分群為兩個群集。將所需群集數目傳遞給演算法。然後,我們輸出混合模型的參數。
請參閱 GaussianMixture
Scala 文件 和 GaussianMixtureModel
Scala 文件 以取得有關 API 的詳細資料。
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
MLlib 的所有方法都使用 Java 友善類型,因此您可以像在 Scala 中一樣,在 Java 中匯入並呼叫它們。唯一的注意事項是,這些方法採用 Scala RDD 物件,而 Spark Java API 使用單獨的 JavaRDD
類別。您可以呼叫 .rdd()
對您的 JavaRDD
物件,將 Java RDD 轉換為 Scala RDD。以下是與 Scala 中提供的範例等效的自訂應用程式範例
請參閱 GaussianMixture
Java 文件 和 GaussianMixtureModel
Java 文件 以取得有關 API 的詳細資料。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
冪次迭代聚類 (PIC)
冪次迭代分群 (PIC) 是一種可擴充且有效率的演算法,用於分群圖形的頂點,給定邊緣屬性為成對相似性,如 Lin and Cohen, Power Iteration Clustering 中所述。它透過 冪次迭代 計算圖形正規化親和矩陣的偽特徵向量,並使用它來分群頂點。 spark.mllib
包含使用 GraphX 作為其後端的 PIC 實作。它採用 (srcId, dstId, similarity)
組合的 RDD
,並輸出具有分群指派的模型。相似性必須是非負值。PIC 假設相似性測量值是對稱的。一對 (srcId, dstId)
不論順序如何,在輸入資料中最多只會出現一次。如果輸入資料中缺少一對,則其相似性將視為零。 spark.mllib
的 PIC 實作採用下列 (超) 參數
k
:叢集數目maxIterations
:最大次數的冪次迭代initializationMode
:初始化模型。預設為「隨機」,使用隨機向量作為頂點屬性,或「程度」使用正規化總相似度。
範例
以下,我們展示程式碼片段,說明如何使用 spark.mllib
中的 PIC。
PowerIterationClustering
實作 PIC 演算法。它採用 RDD
,其中包含表示關聯矩陣的 (srcId: Long, dstId: Long, similarity: Double)
組合。呼叫 PowerIterationClustering.run
會傳回 PowerIterationClusteringModel
,其中包含計算出的叢集指派。
請參閱 PowerIterationClustering
Python 文件 和 PowerIterationClusteringModel
Python 文件,以取得更多 API 詳細資料。
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
.load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
PowerIterationClustering
實作 PIC 演算法。它採用 RDD
,其中包含表示關聯矩陣的 (srcId: Long, dstId: Long, similarity: Double)
組合。呼叫 PowerIterationClustering.run
會傳回 PowerIterationClusteringModel
,其中包含計算出的叢集指派。
請參閱 PowerIterationClustering
Scala 文件 和 PowerIterationClusteringModel
Scala 文件,以取得 API 詳細資料。
import org.apache.spark.mllib.clustering.PowerIterationClustering
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
PowerIterationClustering
實作 PIC 演算法。它會取得一個 JavaRDD
,其中包含表示親和矩陣的 (srcId: Long, dstId: Long, similarity: Double)
組合。呼叫 PowerIterationClustering.run
會傳回一個 PowerIterationClusteringModel
,其中包含已計算的叢集指派。
請參閱 PowerIterationClustering
Java 文件和 PowerIterationClusteringModel
Java 文件,以取得 API 的詳細資訊。
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
new Tuple3<>(3L, 4L, 0.1),
new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
潛在狄利克雷分配 (LDA)
隱含狄利克雷配置 (LDA) 是一種主題模型,可以從文字文件集合推論出主題。LDA 可以視為一種叢集演算法,如下所示
- 主題對應到叢集中心,而文件對應到資料集中範例 (列)。
- 主題和文件都存在於特徵空間中,其中特徵向量是字數向量 (字詞袋)。
- LDA 不是使用傳統距離來估計叢集,而是使用一個函數,該函數根據文字文件產生的統計模型為基礎。
LDA 支援透過 setOptimizer
函數使用不同的推論演算法。 EMLDAOptimizer
使用對數似然函數上的 期望最大化 來學習叢集,並產生全面的結果,而 OnlineLDAOptimizer
則使用反覆最小批次抽樣進行 線上變異推論,而且通常對記憶體很友善。
LDA 會取得一個文件集合,其中包含字數向量和下列參數 (使用建造者模式設定)
k
:主題數目 (即叢集中心)optimizer
:用於學習 LDA 模型的最佳化器,可以是EMLDAOptimizer
或OnlineLDAOptimizer
docConcentration
:文件對主題分佈的先驗 Dirichlet 參數。較大的值會鼓勵更平滑的推論分佈。topicConcentration
:主題對詞彙 (字詞) 分佈的先驗 Dirichlet 參數。較大的值會鼓勵更平滑的推論分佈。maxIterations
:迭代次數的限制。checkpointInterval
:如果使用檢查點 (在 Spark 組態中設定),此參數會指定建立檢查點的頻率。如果maxIterations
很大的話,使用檢查點可以協助減少磁碟上的洗牌檔案大小,並協助復原錯誤。
所有 spark.mllib
的 LDA 模型都支援
describeTopics
:傳回主題作為最重要的詞彙和詞彙權重的陣列topicsMatrix
:傳回vocabSize
乘以k
的矩陣,其中每一欄都是一個主題
注意:LDA 仍是積極開發中的實驗性功能。因此,某些功能只會出現在最佳化器/最佳化器產生的兩個模型之一中。目前,分散式模型可以轉換成區域模型,但反之則不行。
以下討論會分別說明每個最佳化器/模型配對。
期望最大化
實作於 EMLDAOptimizer
和 DistributedLDAModel
中。
對於提供給 LDA
的參數
docConcentration
:只支援對稱先驗,因此提供的k
維向量中的所有值都必須相同。所有值也必須大於 $> 1.0$。提供Vector(-1)
會產生預設行為 (值為 $(50 / k) + 1$ 的均勻k
維向量)topicConcentration
:只支援對稱先驗。值必須大於 $> 1.0$。提供-1
會預設為 $0.1 + 1$ 的值。maxIterations
:EM 迭代的最大次數。
注意:執行足夠的迭代很重要。在早期迭代中,EM 通常會有無用的主題,但這些主題在更多次迭代後會大幅改善。根據您的資料集,使用至少 20 次,甚至可能是 50-100 次迭代通常是合理的。
EMLDAOptimizer
會產生 DistributedLDAModel
,它不僅會儲存推論的主題,還會儲存完整的訓練語料庫和訓練語料庫中每個文件的每個主題分佈。DistributedLDAModel
支援
topTopicsPerDocument
:訓練語料庫中每個文件的頂尖主題及其權重topDocumentsPerTopic
:每個主題的頂尖文件和主題在文件中的對應權重。logPrior
:在給定超參數docConcentration
和topicConcentration
的情況下,估計的主題和文件主題分佈的對數機率logLikelihood
:給定推論的主題和文件主題分佈,訓練語料庫的對數似然
在線變分貝氏
在 OnlineLDAOptimizer
和 LocalLDAModel
中實作。
對於提供給 LDA
的參數
docConcentration
:可透過傳入一個向量來使用非對稱先驗,其中向量的值等於k
個維度中每個維度的 Dirichlet 參數。值應 $>= 0$。提供Vector(-1)
會產生預設行為(值為 $(1.0 / k)$ 的均勻k
維度向量)topicConcentration
:僅支援對稱先驗。值必須 $>= 0$。提供-1
會預設為值 $(1.0 / k)$。maxIterations
:要提交的最小批次最大數目。
此外,OnlineLDAOptimizer
接受下列參數
miniBatchFraction
:每次反覆運算中取樣並使用的語料庫分數optimizeDocConcentration
:如果設為 true,會在每個最小批次後執行超參數docConcentration
(又稱alpha
)的最大似然估計,並在回傳的LocalLDAModel
中設定最佳化的docConcentration
tau0
和kappa
:用於學習率衰減,計算方式為 $(\tau_0 + iter)^{-\kappa}$,其中 $iter$ 是目前的反覆運算次數。
OnlineLDAOptimizer
會產生 LocalLDAModel
,它只會儲存推論的主題。 LocalLDAModel
支援
logLikelihood(documents)
:計算給定推論的主題時,提供的documents
的下界。logPerplexity(documents)
:計算給定推論的主題時,提供的documents
的上界。
範例
在下列範例中,我們會載入代表文件語料庫的字數向量。然後我們使用 LDA 從文件中推論出三個主題。已將所需叢集的數目傳遞給演算法。然後我們會輸出主題,表示為字詞的機率分佈。
請參閱 LDA
Python 文件 和 LDAModel
Python 文件,以取得更多 API 詳細資料。
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
請參閱 LDA
Scala 文件 和 DistributedLDAModel
Scala 文件,以取得 API 詳細資料。
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print(s"Topic $topic :")
for (word <- Range(0, ldaModel.vocabSize)) {
print(s"${topics(word, topic)}")
}
println()
}
// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
有關 API 的詳細資訊,請參閱 LDA
Java 文件 和 DistributedLDAModel
Java 文件。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
二分 k 均值
兩分 K 平均值通常可以比一般 K 平均值快很多,但它通常會產生不同的分群。
兩分 K 平均值是一種 階層式分群。階層式分群是最常使用的分群分析方法之一,用於建立分群的階層。階層式分群的策略通常分為兩種
- 凝聚式:這是一種「由下而上」的方法:每個觀察值一開始都在自己的分群中,而當階層向上移動時,分群會成對合併。
- 分裂式:這是一種「由上而下」的方法:所有觀察值一開始都在一個分群中,而當階層向下移動時,會遞迴執行分割。
兩分 K 平均值演算法是一種分裂式演算法。MLlib 中的實作具有以下參數
- k:所需的葉子分群數目 (預設值:4)。如果沒有可分割的葉子分群,實際數目可能會更少。
- maxIterations:用於分割分群的 K 平均值最大反覆次數 (預設值:20)
- minDivisibleClusterSize:可分割分群的點數最小數目 (如果 >= 1.0) 或點數最小比例 (如果 < 1.0) (預設值:1)
- 種子:亂數種子 (預設值:類別名稱的雜湊值)
範例
有關 API 的更多詳細資訊,請參閱 BisectingKMeans
Python 文件 和 BisectingKMeansModel
Python 文件。
from numpy import array
from pyspark.mllib.clustering import BisectingKMeans
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)
# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
有關 API 的詳細資訊,請參閱 BisectingKMeans
Scala 文件 和 BisectingKMeansModel
Scala 文件。
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}
請參閱 BisectingKMeans
Java 文件 和 BisectingKMeansModel
Java 文件 以了解 API 的詳細資訊。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
List<Vector> localData = Arrays.asList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("Compute Cost: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
串流 k 均值
當資料以串流形式到達時,我們可能想要動態估計叢集,並在新的資料到達時更新它們。spark.mllib
提供對串流 k 平均叢集的支援,其中包含控制估計值的衰減(或「遺忘」)的參數。此演算法使用 mini 批次 k 平均更新規則的概括。對於每一批次資料,我們將所有點分配給它們最近的叢集,計算新的叢集中心,然後使用下列公式更新每個叢集:
\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation}
\begin{equation} n_{t+1} = n_t + m_t \end{equation}
其中 $c_t$
是叢集的先前中心,$n_t$
是迄今已分配給叢集的點數,$x_t$
是來自目前批次的新的叢集中心,而 $m_t$
是在目前批次中新增到叢集的點數。衰減因子 $\alpha$
可用於忽略過去:使用 $\alpha$=1
時,將從一開始使用所有資料;使用 $\alpha$=0
時,將只使用最近的資料。這類似於指數加權移動平均。
衰減可以使用 halfLife
參數指定,它決定正確的衰減因子 a
,使得對於在時間 t
獲取的資料,其在時間 t + halfLife
的貢獻將下降至 0.5。時間單位可以指定為 batches
或 points
,更新規則將相應調整。
範例
此範例說明如何估計串流資料的叢集。
有關 API 的更多詳細資訊,請參閱 StreamingKMeans
Python 文件。有關 StreamingContext 的詳細資訊,請參閱 Spark 串流程式設計指南。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
有關 API 的更多詳細資訊,請參閱 StreamingKMeans
Scala 文件。有關 StreamingContext 的詳細資訊,請參閱 Spark 串流程式設計指南。
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
當您新增包含資料的新文字檔案時,叢集中心將會更新。每個訓練點應格式化為 [x1, x2, x3]
,每個測試資料點應格式化為 (y, [x1, x2, x3])
,其中 y
是某個有用的標籤或識別碼(例如,真正的類別指派)。任何時候,只要文字檔案放置在 /training/data/dir
中,模型就會更新。任何時候,只要文字檔案放置在 /testing/data/dir
中,您就會看到預測。有了新資料,叢集中心就會改變!