群集
此頁面說明 MLlib 中的群集演算法。基於 RDD API 的群集指南 中也包含有關這些演算法的相關資訊。
目錄
K 平均
K 平均 是最常使用的群集演算法之一,它會將資料點群集到預先定義的群集數目。MLlib 實作包含稱為 kmeans|| 的 K 平均++ 方法的平行變異。
KMeans
實作為 Estimator
,並產生 KMeansModel
作為基礎模型。
輸入欄位
參數名稱 | 類型 | 預設值 | 說明 |
---|---|---|---|
featuresCol | 向量 | "features" | 特徵向量 |
輸出欄位
參數名稱 | 類型 | 預設值 | 說明 |
---|---|---|---|
predictionCol | 整數 | "prediction" | 預測的群集中心 |
範例
請參閱 Python API 文件 以取得更多詳細資訊。
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
請參閱 Scala API 文件 以取得更多詳細資訊。
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// Make predictions
val predictions = model.transform(dataset)
// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")
// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
請參閱 Java API 文件 以取得更多詳細資訊。
import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);
// Make predictions
Dataset<Row> predictions = model.transform(dataset);
// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();
double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);
// Shows the result.
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
System.out.println(center);
}
請參閱 R API 文件 以取得更多詳細資訊。
# Fit a k-means model with spark.kmeans
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
k = 3)
# Model summary
summary(kmeansModel)
# Get fitted result from the k-means model
head(fitted(kmeansModel))
# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
潛在狄利克雷配置(LDA)
LDA
實作為 Estimator
,支援 EMLDAOptimizer
和 OnlineLDAOptimizer
,並產生 LDAModel
作為基礎模型。專家使用者可能會將 EMLDAOptimizer
產生的 LDAModel
轉換為 DistributedLDAModel
(如果需要)。
範例
有關更多詳細資訊,請參閱 Python API 文件。
from pyspark.ml.clustering import LDA
# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))
# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)
# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
有關更多詳細資訊,請參閱 Scala API 文件。
import org.apache.spark.ml.clustering.LDA
// Loads data.
val dataset = spark.read.format("libsvm")
.load("data/mllib/sample_lda_libsvm_data.txt")
// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")
// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)
// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
有關更多詳細資訊,請參閱 Java API 文件。
import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
.load("data/mllib/sample_lda_libsvm_data.txt");
// Trains a LDA model.
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);
double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound on perplexity: " + lp);
// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
System.out.println("The topics described by their top-weighted terms:");
topics.show(false);
// Shows the result.
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
有關更多詳細資訊,請參閱 R API 文件。
# Load training data
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a latent dirichlet allocation model with spark.lda
model <- spark.lda(training, k = 10, maxIter = 10)
# Model summary
summary(model)
# Posterior probabilities
posterior <- spark.posterior(model, test)
head(posterior)
# The log perplexity of the LDA model
logPerplexity <- spark.perplexity(model, test)
print(paste0("The upper bound bound on perplexity: ", logPerplexity))
二分 K 平均
二分 k 平均是一種 層級式聚類,使用分割(或「由上而下」)方法:所有觀察值都從一個叢集開始,並隨著階層向下移動,遞迴執行分割。
二分 K 平均通常比一般 K 平均快很多,但通常會產生不同的聚類。
BisectingKMeans
實作為 Estimator
,並將 BisectingKMeansModel
產生為基礎模型。
範例
有關更多詳細資訊,請參閱 Python API 文件。
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
print(center)
有關更多詳細資訊,請參閱 Scala API 文件。
import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)
// Make predictions
val predictions = model.transform(dataset)
// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")
// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
有關更多詳細資訊,請參閱 Java API 文件。
import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// Trains a bisecting k-means model.
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);
// Make predictions
Dataset<Row> predictions = model.transform(dataset);
// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();
double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);
// Shows the result.
System.out.println("Cluster Centers: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
System.out.println(center);
}
有關更多詳細資訊,請參閱 R API 文件。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# Fit bisecting k-means model with four centers
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
# get fitted result from a bisecting k-means model
fitted.model <- fitted(model, "centers")
# Model summary
head(summary(fitted.model))
# fitted values on training data
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
高斯混合模型(GMM)
一個 高斯混合模型 代表一個複合分布,其中點從 k 個高斯子分布中的一個進行抽取,每個子分布都有自己的機率。spark.ml
實作使用 期望最大化 演算法,在給定一組樣本的情況下,引發最大似然模型。
GaussianMixture
已實作為 Estimator
,並產生 GaussianMixtureModel
作為基礎模型。
輸入欄位
參數名稱 | 類型 | 預設值 | 說明 |
---|---|---|---|
featuresCol | 向量 | "features" | 特徵向量 |
輸出欄位
參數名稱 | 類型 | 預設值 | 說明 |
---|---|---|---|
predictionCol | 整數 | "prediction" | 預測的群集中心 |
probabilityCol | 向量 | "probability" | 每個叢集的機率 |
範例
請參閱 Python API 文件 以取得更多詳細資料。
from pyspark.ml.clustering import GaussianMixture
# loads data
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)
print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
請參閱 Scala API 文件 以取得更多詳細資料。
import org.apache.spark.ml.clustering.GaussianMixture
// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
.setK(2)
val model = gmm.fit(dataset)
// output parameters of mixture model model
for (i <- 0 until model.getK) {
println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
請參閱 Java API 文件 以取得更多詳細資料。
import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Loads data
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// Trains a GaussianMixture model
GaussianMixture gmm = new GaussianMixture()
.setK(2);
GaussianMixtureModel model = gmm.fit(dataset);
// Output the parameters of the mixture model
for (int i = 0; i < model.getK(); i++) {
System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
請參閱 R API 文件 以取得更多詳細資料。
# Load training data
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a gaussian mixture clustering model with spark.gaussianMixture
model <- spark.gaussianMixture(training, ~ features, k = 2)
# Model summary
summary(model)
# Prediction
predictions <- predict(model, test)
head(predictions)
冪次迭代群集(PIC)
冪次迭代叢集 (PIC) 是由 Lin 和 Cohen 開發的可擴充圖形叢集演算法。摘要:PIC 使用資料的正規化成對相似性矩陣上的截斷冪次迭代,來尋找資料集的超低維度嵌入。
spark.ml
的 PowerIterationClustering 實作採用下列參數
k
:要建立的叢集數目initMode
:初始化演算法參數maxIter
:最大迭代次數參數srcCol
:來源頂點 ID 輸入欄位名稱參數dstCol
:目標頂點 ID 輸入欄位名稱weightCol
:權重欄位名稱參數
範例
請參閱 Python API 文件 以取得更多詳細資料。
from pyspark.ml.clustering import PowerIterationClustering
df = spark.createDataFrame([
(0, 1, 1.0),
(0, 2, 1.0),
(1, 2, 1.0),
(3, 4, 1.0),
(4, 0, 0.1)
], ["src", "dst", "weight"])
pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")
# Shows the cluster assignment
pic.assignClusters(df).show()
請參閱 Scala API 文件 以取得更多詳細資料。
import org.apache.spark.ml.clustering.PowerIterationClustering
val dataset = spark.createDataFrame(Seq(
(0L, 1L, 1.0),
(0L, 2L, 1.0),
(1L, 2L, 1.0),
(3L, 4L, 1.0),
(4L, 0L, 0.1)
)).toDF("src", "dst", "weight")
val model = new PowerIterationClustering().
setK(2).
setMaxIter(20).
setInitMode("degree").
setWeightCol("weight")
val prediction = model.assignClusters(dataset).select("id", "cluster")
// Shows the cluster assignment
prediction.show(false)
請參閱 Java API 文件 以取得更多詳細資料。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0L, 1L, 1.0),
RowFactory.create(0L, 2L, 1.0),
RowFactory.create(1L, 2L, 1.0),
RowFactory.create(3L, 4L, 1.0),
RowFactory.create(4L, 0L, 0.1)
);
StructType schema = new StructType(new StructField[]{
new StructField("src", DataTypes.LongType, false, Metadata.empty()),
new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PowerIterationClustering model = new PowerIterationClustering()
.setK(2)
.setMaxIter(10)
.setInitMode("degree")
.setWeightCol("weight");
Dataset<Row> result = model.assignClusters(df);
result.show(false);
有關更多詳細資訊,請參閱 R API 文件。
df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
# assign clusters
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
initMode = "degree", weightCol = "weight")
showDF(arrange(clusters, clusters$id))