特徵萃取與轉換 - 基於 RDD 的 API
TF-IDF
注意建議使用基於資料框的 API,詳情請參閱 ML 使用者指南中的 TF-IDF。
詞頻-逆文件頻率 (TF-IDF) 是一種特徵向量化方法,廣泛用於文本探勘,用來反映術語對語料庫中文件的相關性。用 $t$
表示術語,用 $d$
表示文件,用 $D$
表示語料庫。術語頻率 $TF(t, d)$
是術語 $t$
在文件 $d$
中出現的次數,而文件頻率 $DF(t, D)$
是包含術語 $t$
的文件數。如果我們只使用術語頻率來衡量相關性,很容易過度強調出現次數很多但對文件資訊含量很少的術語,例如「a」、「the」和「of」。如果一個術語在語料庫中出現次數很多,表示它對特定文件沒有特殊資訊。逆文件頻率是一種數值測量,表示術語提供多少資訊:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
其中 $|D|$
是語料庫中的文件總數。由於使用了對數,如果一個術語出現在所有文件中,它的 IDF 值會變成 0。請注意,我們套用平滑項,以避免對語料庫外的術語除以零。TF-IDF 測量值只是 TF 和 IDF 的乘積:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
術語頻率和文件頻率的定義有幾個變體。在 spark.mllib
中,我們將 TF 和 IDF 分開,讓它們更靈活。
我們實作的術語頻率利用了 雜湊技巧。原始特徵會透過套用雜湊函數,對應到一個索引 (術語)。然後根據對應的索引計算術語頻率。這種方法避免了計算全域術語對索引對應的需要,這對於大型語料庫來說可能很耗費資源,但它會產生潛在的雜湊衝突,也就是不同的原始特徵在雜湊後可能會變成同一個術語。為了降低衝突的機率,我們可以增加目標特徵維度,也就是雜湊表的儲存區塊數。預設的特徵維度是 $2^{20} = 1,048,576$
。
注意: spark.mllib
未提供文字分詞的工具。我們建議使用者參考 Stanford NLP Group 和 scalanlp/chalk。
TF 和 IDF 已實作於 HashingTF 和 IDF 中。 HashingTF
將 RDD 的清單作為輸入。每個記錄可以是字串或其他類型的可迭代物件。
請參閱 HashingTF
Python 文件,以取得 API 的詳細資料。
from pyspark.mllib.feature import HashingTF, IDF
# Load documents (one per line).
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
TF 和 IDF 已實作於 HashingTF 和 IDF 中。 HashingTF
將 RDD[Iterable[_]]
作為輸入。每個記錄可以是字串或其他類型的可迭代物件。
請參閱 HashingTF
Scala 文件,以取得 API 的詳細資料。
import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
.map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
// First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
// spark.mllib IDF implementation provides an option for ignoring terms which occur in less than
// a minimum number of documents. In such cases, the IDF for these terms is set to 0.
// This feature can be used by passing the minDocFreq value to the IDF constructor.
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
Word2Vec
Word2Vec 計算詞彙的分布式向量表示。分布式表示的主要優點在於,相似的詞彙在向量空間中彼此相近,這使得對新模式的概化更為容易,且模型估計也更為穩健。分布式向量表示已證實對許多自然語言處理應用程式很有用,例如命名實體辨識、歧義消解、剖析、標記和機器翻譯。
模型
在我們的 Word2Vec 實作中,我們使用了跳躍式 n-gram 模型。跳躍式 n-gram 的訓練目標是要學習字詞向量表示,以便於預測同一句子中的上下文。在數學上,給定一連串訓練字詞 $w_1, w_2, \dots, w_T$
,跳躍式 n-gram 模型的目標是要最大化平均對數似然 \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \]
其中 $k$ 是訓練視窗的大小。
在跳躍式 n-gram 模型中,每個字詞 $w$ 都與兩個向量 $u_w$ 和 $v_w$ 相關聯,它們分別是 $w$ 作為字詞和上下文的向量表示。正確預測字詞 $w_i$ 給定字詞 $w_j$ 的機率是由 softmax 模型決定的,也就是 \[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \]
其中 $V$ 是詞彙量大小。
帶有 softmax 的跳躍式 n-gram 模型很昂貴,因為計算 $\log p(w_i | w_j)$ 的成本與 $V$ 成正比,而 $V$ 很容易達到數百萬個數量級。為了加速 Word2Vec 的訓練,我們使用了階層式 softmax,它將計算 $\log p(w_i | w_j)$ 的複雜度降低到 $O(\log(V))$。
範例
以下範例示範如何載入文字檔案,將它解析成 Seq[String]
的 RDD,建構一個 Word2Vec
實例,然後使用輸入資料擬合一個 Word2VecModel
。最後,我們顯示指定字詞的前 40 個同義詞。要執行範例,請先下載 text8 資料並將它解壓縮到您偏好的目錄。這裡我們假設解壓縮後的檔案是 text8
,而且與您執行 spark shell 的目錄相同。
請參閱 Word2Vec
Python 文件,以取得更多有關 API 的詳細資料。
from pyspark.mllib.feature import Word2Vec
inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)
synonyms = model.findSynonyms('1', 5)
for word, cosine_distance in synonyms:
print("{}: {}".format(word, cosine_distance))
請參閱 Word2Vec
Scala 文件,以取得有關 API 的詳細資料。
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("1", 5)
for((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")
StandardScaler
透過使用訓練集中樣本的欄位摘要統計資料,將特徵標準化為單位變異,和/或移除平均值。這是一個非常常見的預處理步驟。
例如,支援向量機的 RBF 核,或 L1 和 L2 正則化的線性模型,通常在所有特徵具有單位變異和/或平均值為零時,會發揮更好的效能。
標準化可以改善最佳化過程中收斂的速率,而且也可以防止變異非常大的特徵在模型訓練期間施加過大的影響。
模型擬合
StandardScaler
在建構函式中具有下列參數
withMean
預設為 False。在縮放前使用平均值將資料置中。它會建立一個密集輸出,因此在套用至稀疏輸入時請小心。withStd
預設為 True。將資料縮放至單位標準差。
我們在 StandardScaler
中提供一個 fit
方法,它可以輸入 RDD[Vector]
,學習摘要統計資料,然後傳回一個模型,該模型可以將輸入資料集轉換為單位標準差和/或零平均值特徵,具體取決於我們如何設定 StandardScaler
。
此模型實作 VectorTransformer
,它可以對 Vector
套用標準化以產生轉換後的 Vector
,或對 RDD[Vector]
套用標準化以產生轉換後的 RDD[Vector]
。
請注意,如果特徵的變異數為零,它會在 Vector
中傳回該特徵的預設 0.0
值。
範例
以下範例示範如何載入 libsvm 格式的資料集,並標準化特徵,以便新特徵具有單位標準差和/或零平均值。
請參閱 StandardScaler
Python 文件,以取得更多有關 API 的詳細資料。
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
請參閱 StandardScaler
Scala 文件,以取得有關 API 的詳細資料。
import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 is an identical model to scaler2, and will produce identical transformations
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)
// data1 will be unit variance.
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
// data2 will be unit variance and zero mean.
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
Normalizer
Normalizer 會將個別樣本縮放到單位 $L^p$ 範數。這是文字分類或分群的常見操作。例如,兩個 $L^2$ 正規化 TF-IDF 向量點積是向量的餘弦相似度。
Normalizer
在建構函式中具有下列參數
p
在 $L^p$ 空間中的正規化,預設為 $p = 2$。
Normalizer
實作了 VectorTransformer
,它可以將正規化套用於 Vector
以產生轉換後的 Vector
,或套用於 RDD[Vector]
以產生轉換後的 RDD[Vector]
。
請注意,如果輸入的範數為零,它會傳回輸入向量。
範例
以下範例示範如何載入 libsvm 格式的資料集,並使用 $L^2$ 範數和 $L^\infty$ 範數正規化特徵。
請參閱 Normalizer
Python 文件,以取得更多有關 API 的詳細資料。
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))
# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
請參閱 Normalizer
Scala 文件,以取得有關 API 的詳細資料。
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
// Each sample in data1 will be normalized using $L^2$ norm.
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
// Each sample in data2 will be normalized using $L^\infty$ norm.
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
ChiSqSelector
特徵選取 嘗試找出相關特徵,以用於模型建構。它會縮小特徵空間的大小,這可以同時改善速度和統計學習行為。
ChiSqSelector
實作了卡方特徵選取。它對具有類別特徵的標籤資料進行運算。ChiSqSelector 使用 卡方獨立性檢定 來決定要選取哪些特徵。它支援五種選取方法:numTopFeatures
、percentile
、fpr
、fdr
、fwe
numTopFeatures
根據卡方檢定選取固定數量的頂尖特徵。這類似於產生具有最高預測能力的特徵。percentile
類似於numTopFeatures
,但選取所有特徵的一小部分,而不是固定數量。fpr
選取 p 值低於閾值的全部特徵,從而控制選取的假陽性率。fdr
使用 Benjamini-Hochberg 程序 選取假發現率低於閾值的全部特徵。fwe
選取所有 p 值低於閾值的特徵。閾值會縮放為 1/numFeatures,因此控制選擇的家族錯誤率。
預設情況下,選擇方法為 numTopFeatures
,預設的頂尖特徵數目設定為 50。使用者可以使用 setSelectorType
選擇選擇方法。
可以使用保留驗證集調整要選取的特徵數目。
模型擬合
fit
方法採用具有分類特徵的 RDD[LabeledPoint]
輸入,學習摘要統計資料,然後傳回一個 ChiSqSelectorModel
,它可以將輸入資料集轉換為縮減的特徵空間。 ChiSqSelectorModel
可以應用於 Vector
以產生縮減的 Vector
,或應用於 RDD[Vector]
以產生縮減的 RDD[Vector]
。
請注意,使用者也可以透過提供已選取特徵索引的陣列(必須按遞增順序排序)來手動建構 ChiSqSelectorModel
。
範例
以下範例顯示 ChiSqSelector 的基本用法。所使用的資料集具有特徵矩陣,其中包含每個特徵從 0 到 255 的灰階值。
請參閱 ChiSqSelector
Scala 文件,以取得 API 詳細資料。
import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// Load some data in libsvm format
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
val discretizedData = data.map { lp =>
LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer = selector.fit(discretizedData)
// Filter the top 50 features from each feature vector
val filteredData = discretizedData.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
請參閱 ChiSqSelector
Java 文件,以取得 API 詳細資料。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
"data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Although features are doubles, the ChiSqSelector treats each unique value as a category
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
double[] discretizedFeatures = new double[lp.features().size()];
for (int i = 0; i < lp.features().size(); ++i) {
discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});
// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
new LabeledPoint(lp.label(), transformer.transform(lp.features())));
ElementwiseProduct
ElementwiseProduct
使用逐元素乘法,將每個輸入向量乘以提供的「權重」向量。換句話說,它會將資料集的每一欄縮放一個純量乘數。這表示輸入向量 v
與轉換向量 scalingVec
之間的 哈達瑪乘積,產生結果向量。
將 scalingVec
表示為「w
」,此轉換可以寫成
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
ElementwiseProduct
在建構函式中具有下列參數
scalingVec
:轉換向量。
ElementwiseProduct
實作 VectorTransformer
,它可以對 Vector
應用加權,以產生轉換後的 Vector
,或對 RDD[Vector]
應用加權,以產生轉換後的 RDD[Vector]
。
範例
以下範例說明如何使用轉換向量值轉換向量。
有關 API 的更多詳細資訊,請參閱 ElementwiseProduct
Python 文件。
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])
# Create weight vector.
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)
# Batch transform
transformedData = transformer.transform(parsedData)
# Single-row transform
transformedData2 = transformer.transform(parsedData.first())
有關 API 的詳細資訊,請參閱 ElementwiseProduct
Scala 文件。
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// Create some vector data; also works for sparse vectors
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)
// Batch transform and per-row transform give the same results:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))
有關 API 的詳細資訊,請參閱 ElementwiseProduct
Java 文件。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Create some vector data; also works for sparse vectors
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
PCA
一種特徵轉換器,它使用 PCA 將向量投影到低維度空間。詳細資訊請閱讀 降維。