群集

此頁面說明 MLlib 中的群集演算法。基於 RDD API 的群集指南 中也包含有關這些演算法的相關資訊。

目錄

K 平均

K 平均 是最常使用的群集演算法之一,它會將資料點群集到預先定義的群集數目。MLlib 實作包含稱為 kmeans||K 平均++ 方法的平行變異。

KMeans 實作為 Estimator,並產生 KMeansModel 作為基礎模型。

輸入欄位

參數名稱 類型 預設值 說明
featuresCol 向量 "features" 特徵向量

輸出欄位

參數名稱 類型 預設值 說明
predictionCol 整數 "prediction" 預測的群集中心

範例

請參閱 Python API 文件 以取得更多詳細資訊。

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
在 Spark 回應中找出完整的範例程式碼 "examples/src/main/python/ml/kmeans_example.py"。

請參閱 Scala API 文件 以取得更多詳細資訊。

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
在 Spark 回應中找出完整的範例程式碼 "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala"。

請參閱 Java API 文件 以取得更多詳細資訊。

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
  System.out.println(center);
}
在 Spark 回應中找出完整的範例程式碼 "examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java"。

請參閱 R API 文件 以取得更多詳細資訊。

# Fit a k-means model with spark.kmeans
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
head(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
在 Spark 回應中找出完整的範例程式碼 "examples/src/main/r/ml/kmeans.R"。

潛在狄利克雷配置(LDA)

LDA 實作為 Estimator,支援 EMLDAOptimizerOnlineLDAOptimizer,並產生 LDAModel 作為基礎模型。專家使用者可能會將 EMLDAOptimizer 產生的 LDAModel 轉換為 DistributedLDAModel(如果需要)。

範例

有關更多詳細資訊,請參閱 Python API 文件

from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
在 Spark 儲存庫中,於「examples/src/main/python/ml/lda_example.py」中尋找完整的範例程式碼。

有關更多詳細資訊,請參閱 Scala API 文件

import org.apache.spark.ml.clustering.LDA

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt")

// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)

val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")

// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)

// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala」中尋找完整的範例程式碼。

有關更多詳細資訊,請參閱 Java API 文件

import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt");

// Trains a LDA model.
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);

double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound on perplexity: " + lp);

// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
System.out.println("The topics described by their top-weighted terms:");
topics.show(false);

// Shows the result.
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java」中尋找完整的範例程式碼。

有關更多詳細資訊,請參閱 R API 文件

# Load training data
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a latent dirichlet allocation model with spark.lda
model <- spark.lda(training, k = 10, maxIter = 10)

# Model summary
summary(model)

# Posterior probabilities
posterior <- spark.posterior(model, test)
head(posterior)

# The log perplexity of the LDA model
logPerplexity <- spark.perplexity(model, test)
print(paste0("The upper bound bound on perplexity: ", logPerplexity))
在 Spark 儲存庫中,於「examples/src/main/r/ml/lda.R」中尋找完整的範例程式碼。

二分 K 平均

二分 k 平均是一種 層級式聚類,使用分割(或「由上而下」)方法:所有觀察值都從一個叢集開始,並隨著階層向下移動,遞迴執行分割。

二分 K 平均通常比一般 K 平均快很多,但通常會產生不同的聚類。

BisectingKMeans 實作為 Estimator,並將 BisectingKMeansModel 產生為基礎模型。

範例

有關更多詳細資訊,請參閱 Python API 文件

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)
在 Spark 儲存庫中,於「examples/src/main/python/ml/bisecting_k_means_example.py」中尋找完整的範例程式碼。

有關更多詳細資訊,請參閱 Scala API 文件

import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala」中尋找完整的範例程式碼。

有關更多詳細資訊,請參閱 Java API 文件

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a bisecting k-means model.
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
System.out.println("Cluster Centers: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
  System.out.println(center);
}
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java」中尋找完整的範例程式碼。

有關更多詳細資訊,請參閱 R API 文件

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# Fit bisecting k-means model with four centers
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)

# get fitted result from a bisecting k-means model
fitted.model <- fitted(model, "centers")

# Model summary
head(summary(fitted.model))

# fitted values on training data
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
在 Spark 儲存庫中,於「examples/src/main/r/ml/bisectingKmeans.R」中尋找完整的範例程式碼。

高斯混合模型(GMM)

一個 高斯混合模型 代表一個複合分布,其中點從 k 個高斯子分布中的一個進行抽取,每個子分布都有自己的機率。spark.ml 實作使用 期望最大化 演算法,在給定一組樣本的情況下,引發最大似然模型。

GaussianMixture 已實作為 Estimator,並產生 GaussianMixtureModel 作為基礎模型。

輸入欄位

參數名稱 類型 預設值 說明
featuresCol 向量 "features" 特徵向量

輸出欄位

參數名稱 類型 預設值 說明
predictionCol 整數 "prediction" 預測的群集中心
probabilityCol 向量 "probability" 每個叢集的機率

範例

請參閱 Python API 文件 以取得更多詳細資料。

from pyspark.ml.clustering import GaussianMixture

# loads data
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
在 Spark 儲存庫中的「examples/src/main/python/ml/gaussian_mixture_example.py」中尋找完整的範例程式碼。

請參閱 Scala API 文件 以取得更多詳細資料。

import org.apache.spark.ml.clustering.GaussianMixture

// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(2)
val model = gmm.fit(dataset)

// output parameters of mixture model model
for (i <- 0 until model.getK) {
  println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
      s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala」中尋找完整的範例程式碼。

請參閱 Java API 文件 以取得更多詳細資料。

import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a GaussianMixture model
GaussianMixture gmm = new GaussianMixture()
  .setK(2);
GaussianMixtureModel model = gmm.fit(dataset);

// Output the parameters of the mixture model
for (int i = 0; i < model.getK(); i++) {
  System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
          i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java」中尋找完整的範例程式碼。

請參閱 R API 文件 以取得更多詳細資料。

# Load training data
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a gaussian mixture clustering model with spark.gaussianMixture
model <- spark.gaussianMixture(training, ~ features, k = 2)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 儲存庫中的「examples/src/main/r/ml/gaussianMixture.R」中尋找完整的範例程式碼。

冪次迭代群集(PIC)

冪次迭代叢集 (PIC) 是由 Lin 和 Cohen 開發的可擴充圖形叢集演算法。摘要:PIC 使用資料的正規化成對相似性矩陣上的截斷冪次迭代,來尋找資料集的超低維度嵌入。

spark.ml 的 PowerIterationClustering 實作採用下列參數

範例

請參閱 Python API 文件 以取得更多詳細資料。

from pyspark.ml.clustering import PowerIterationClustering

df = spark.createDataFrame([
    (0, 1, 1.0),
    (0, 2, 1.0),
    (1, 2, 1.0),
    (3, 4, 1.0),
    (4, 0, 0.1)
], ["src", "dst", "weight"])

pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")

# Shows the cluster assignment
pic.assignClusters(df).show()
在 Spark 儲存庫中的「examples/src/main/python/ml/power_iteration_clustering_example.py」中尋找完整的範例程式碼。

請參閱 Scala API 文件 以取得更多詳細資料。

import org.apache.spark.ml.clustering.PowerIterationClustering

val dataset = spark.createDataFrame(Seq(
  (0L, 1L, 1.0),
  (0L, 2L, 1.0),
  (1L, 2L, 1.0),
  (3L, 4L, 1.0),
  (4L, 0L, 0.1)
)).toDF("src", "dst", "weight")

val model = new PowerIterationClustering().
  setK(2).
  setMaxIter(20).
  setInitMode("degree").
  setWeightCol("weight")

val prediction = model.assignClusters(dataset).select("id", "cluster")

//  Shows the cluster assignment
prediction.show(false)
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala」中尋找完整的範例程式碼。

請參閱 Java API 文件 以取得更多詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0L, 1L, 1.0),
  RowFactory.create(0L, 2L, 1.0),
  RowFactory.create(1L, 2L, 1.0),
  RowFactory.create(3L, 4L, 1.0),
  RowFactory.create(4L, 0L, 0.1)
);

StructType schema = new StructType(new StructField[]{
  new StructField("src", DataTypes.LongType, false, Metadata.empty()),
  new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
  new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PowerIterationClustering model = new PowerIterationClustering()
  .setK(2)
  .setMaxIter(10)
  .setInitMode("degree")
  .setWeightCol("weight");

Dataset<Row> result = model.assignClusters(df);
result.show(false);
在 Spark 儲存庫的「examples/src/main/java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java」中尋找完整的範例程式碼。

有關更多詳細資訊,請參閱 R API 文件

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
# assign clusters
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
                                 initMode = "degree", weightCol = "weight")

showDF(arrange(clusters, clusters$id))
在 Spark 儲存庫的「examples/src/main/r/ml/powerIterationClustering.R」中尋找完整的範例程式碼。