頻繁模式探勘 - 基於 RDD 的 API
探勘頻繁項目、項目集、子序列或其他子結構通常是分析大型資料集的第一步,這幾年來一直是資料探勘中的熱門研究主題。我們建議使用者參考維基百科的 關聯規則學習 以取得更多資訊。 spark.mllib
提供 FP-growth 的平行實作,這是一種廣泛用於探勘頻繁項目集的演算法。
FP-growth
FP-growth 演算法說明於論文 Han et al., Mining frequent patterns without candidate generation 中,其中「FP」代表頻繁模式。給定一組交易資料集,FP-growth 的第一步是計算項目頻率並找出頻繁項目。與為相同目的而設計的 Apriori 類似 演算法不同,FP-growth 的第二步使用後綴樹 (FP 樹) 結構來編碼交易,而不會明確產生候選集,而產生候選集通常很花費成本。在第二步之後,可以從 FP 樹中萃取出頻繁項目集。在 spark.mllib
中,我們實作了 FP-growth 的平行版本,稱為 PFP,如 Li et al., PFP: Parallel FP-growth for query recommendation 中所述。PFP 會根據交易後綴來分配 FP 樹的成長工作,因此比單機實作更具擴充性。我們建議使用者參閱這些論文以取得更多詳細資訊。
spark.mllib
的 FP-growth 實作採用以下(超)參數
minSupport
:將項目集識別為頻繁的最低支援度。例如,如果一個項目出現在 5 個交易中的 3 個,則其支援度為 3/5=0.6。numPartitions
:用於分配工作的分割區數目。
範例
FPGrowth
實作 FP-growth 演算法。它會取得交易的 RDD
,其中每個交易都是一般類型項目的 List
。使用交易呼叫 FPGrowth.train
會傳回 FPGrowthModel
,其中儲存頻繁項目集及其頻率。
請參閱 FPGrowth
Python 文件,以取得更多 API 詳細資料。
from pyspark.mllib.fpm import FPGrowth
data = sc.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
print(fi)
FPGrowth
實作 FP-growth 演算法。它會取得交易的 RDD
,其中每個交易都是一般類型項目的 Array
。使用交易呼叫 FPGrowth.run
會傳回 FPGrowthModel
,其中儲存頻繁項目集及其頻率。下列範例說明如何從 transactions
中找出頻繁項目集和關聯規則(請參閱 關聯規則 以取得詳細資料)。
請參閱 FPGrowth
Scala 文件,以取得 API 詳細資料。
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(s"${itemset.items.mkString("[", ",", "]")},${itemset.freq}")
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(s"${rule.antecedent.mkString("[", ",", "]")}=> " +
s"${rule.consequent .mkString("[", ",", "]")},${rule.confidence}")
}
FPGrowth
實作 FP-growth 演算法。它會取得交易的 JavaRDD
,其中每個交易都是一般類型項目的 Iterable
。使用交易呼叫 FPGrowth.run
會傳回 FPGrowthModel
,其中儲存頻繁項目集及其頻率。下列範例說明如何從 transactions
中找出頻繁項目集和關聯規則(請參閱 關聯規則 以取得詳細資料)。
有關 API 的詳細資訊,請參閱 FPGrowth
Java 文件。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));
FPGrowth fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);
for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}
double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
: model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
關聯規則
AssociationRules 實作平行規則產生演算法,用於建構以單一項目為結果的規則。
有關 API 的詳細資訊,請參閱 AssociationRules
Scala 文件。
import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
val freqItemsets = sc.parallelize(Seq(
new FreqItemset(Array("a"), 15L),
new FreqItemset(Array("b"), 35L),
new FreqItemset(Array("a", "b"), 12L)
))
val ar = new AssociationRules()
.setMinConfidence(0.8)
val results = ar.run(freqItemsets)
results.collect().foreach { rule =>
println(s"[${rule.antecedent.mkString(",")}=>${rule.consequent.mkString(",")} ]" +
s" ${rule.confidence}")
}
AssociationRules 實作平行規則產生演算法,用於建構以單一項目為結果的規則。
有關 API 的詳細資訊,請參閱 AssociationRules
Java 文件。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset;
JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
new FreqItemset<>(new String[] {"a"}, 15L),
new FreqItemset<>(new String[] {"b"}, 35L),
new FreqItemset<>(new String[] {"a", "b"}, 12L)
));
AssociationRules arules = new AssociationRules()
.setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);
for (AssociationRules.Rule<String> rule : results.collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
PrefixSpan
PrefixSpan 是一種循序模式探勘演算法,在 Pei 等人,透過模式成長進行循序模式探勘:PrefixSpan 方法 中有說明。我們建議讀者參閱所引用的論文,以了解循序模式探勘問題的正式說明。
spark.mllib
的 PrefixSpan 實作採用下列參數
minSupport
:視為頻繁循序模式所需的最小支援度。maxPatternLength
:頻繁循序模式的最大長度。任何超過此長度的頻繁模式都不會包含在結果中。maxLocalProjDBSize
:在開始對投影資料庫進行局部反覆處理之前,前置投影資料庫中允許的最大項目數。此參數應根據執行器的規模進行調整。
範例
下列範例說明 PrefixSpan 在循序中執行的運作(使用與 Pei 等人相同的符號)
<(12)3>
<1(32)(12)>
<(12)5>
<6>
PrefixSpan
實作 PrefixSpan 演算法。呼叫 PrefixSpan.run
會傳回 PrefixSpanModel
,其中儲存頻繁循序及其頻率。
有關 API 的詳細資訊,請參閱 PrefixSpan
Scala 文件 和 PrefixSpanModel
Scala 文件。
import org.apache.spark.mllib.fpm.PrefixSpan
val sequences = sc.parallelize(Seq(
Array(Array(1, 2), Array(3)),
Array(Array(1), Array(3, 2), Array(1, 2)),
Array(Array(1, 2), Array(5)),
Array(Array(6))
), 2).cache()
val prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
val model = prefixSpan.run(sequences)
model.freqSequences.collect().foreach { freqSequence =>
println(
s"${freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]")}," +
s" ${freqSequence.freq}")
}
PrefixSpan
實作 PrefixSpan 演算法。呼叫 PrefixSpan.run
會傳回 PrefixSpanModel
,其中儲存頻繁序列及其頻率。
請參閱 PrefixSpan
Java 文件和 PrefixSpanModel
Java 文件,以取得 API 詳細資料。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.mllib.fpm.PrefixSpan;
import org.apache.spark.mllib.fpm.PrefixSpanModel;
JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
for (PrefixSpan.FreqSequence<Integer> freqSeq: model.freqSequences().toJavaRDD().collect()) {
System.out.println(freqSeq.javaSequence() + ", " + freqSeq.freq());
}