特徵萃取、轉換和選擇

此區段涵蓋使用特徵的演算法,大致可分為下列群組

目錄

特徵萃取器

TF-IDF

詞頻-逆文件頻率 (TF-IDF) 是一種特徵向量化方法,廣泛用於文本探勘,以反映術語對語料庫中文件的重要性。用 $t$ 表示術語,用 $d$ 表示文件,用 $D$ 表示語料庫。術語頻率 $TF(t, d)$ 是術語 $t$ 出現在文件 $d$ 中的次數,而文件頻率 $DF(t, D)$ 是包含術語 $t$ 的文件數。如果我們只使用術語頻率來衡量重要性,那麼很容易過度強調出現次數很多但對文件資訊量很小的術語,例如「a」、「the」和「of」。如果一個術語在整個語料庫中出現次數很多,這表示它對特定文件沒有特殊資訊。逆文件頻率是一個數值衡量,表示一個術語提供了多少資訊:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] 其中 $|D|$ 是語料庫中的文件總數。由於使用了對數,如果一個術語出現在所有文件中,則其 IDF 值變為 0。請注意,為了避免對語料庫外的術語進行零除,套用了平滑項。TF-IDF 測量值只是 TF 和 IDF 的乘積:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 術語頻率和文件頻率的定義有幾個變體。在 MLlib 中,我們將 TF 和 IDF 分開,讓它們更靈活。

TFHashingTFCountVectorizer 都可以用來產生術語頻率向量。

HashingTF 是一個 Transformer,它接收術語集合,並將這些集合轉換為固定長度的特徵向量。在文本處理中,「術語集合」可能是詞袋。 HashingTF 利用了 雜湊技巧。原始特徵會透過套用雜湊函數來對應到一個索引(術語)。這裡使用的雜湊函數是 MurmurHash 3。然後根據對應的索引計算術語頻率。這種方法避免了計算全局術語對應索引映射的需要,而這對於大型語料庫來說可能是很昂貴的,但它會產生潛在的雜湊衝突,其中不同的原始特徵在雜湊後可能會變成同一個術語。為了降低衝突的機率,我們可以增加目標特徵維度,也就是雜湊表的儲存區數。由於使用雜湊值的簡單模數來決定向量索引,因此建議使用 2 的次方作為特徵維度,否則特徵不會均勻對應到向量索引。預設的特徵維度是 $2^{18} = 262,144$。一個可選的二進制開關參數控制術語頻率計數。當設定為 true 時,所有非零頻率計數都會設定為 1。這對於建模二進制而非整數計數的離散概率模型特別有用。

CountVectorizer 將文本文件轉換為術語計數向量。請參閱 CountVectorizer 以取得更多詳細資訊。

IDF: IDF 是一個 Estimator,它會針對資料集進行擬合並產生 IDFModelIDFModel 會採用特徵向量(通常由 HashingTFCountVectorizer 產生),並調整每個特徵的權重。直覺上,它會降低語料中出現頻率高的特徵權重。

注意: spark.ml 沒有提供文字分詞的工具。我們建議使用者參考 Stanford NLP Groupscalanlp/chalk

範例

在以下程式碼片段中,我們從一組句子開始。我們使用 Tokenizer 將每個句子拆分成單字。對於每個句子(字詞袋),我們使用 HashingTF 將句子雜湊成特徵向量。我們使用 IDF 重新調整特徵向量的權重;這通常會在使用文字作為特徵時提升效能。我們的特徵向量接著可以傳遞給學習演算法。

請參閱 HashingTF Python 文件IDF Python 文件 以取得更多 API 詳細資料。

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
在 Spark 儲存庫中的「examples/src/main/python/ml/tf_idf_example.py」中尋找完整的範例程式碼。

請參閱 HashingTF Scala 文件IDF Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = spark.createDataFrame(Seq(
  (0.0, "Hi I heard about Spark"),
  (0.0, "I wish Java could use case classes"),
  (1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)

val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)

val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala」中尋找完整的範例程式碼。

請參閱 HashingTF Java 文件IDF Java 文件 以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, "Hi I heard about Spark"),
  RowFactory.create(0.0, "I wish Java could use case classes"),
  RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);

int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(numFeatures);

Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors

IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);

Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaTfIdfExample.java」中尋找完整的範例程式碼。

Word2Vec

Word2Vec 是一個 Estimator,它會擷取表示文件的字詞序列,並訓練 Word2VecModel。此模型會將每個字詞對應到一個獨特的固定大小向量。Word2VecModel 會使用文件中所有字詞的平均值將每個文件轉換成一個向量;此向量之後可以用作預測、文件相似度計算等功能。請參閱 MLlib 使用者指南中的 Word2Vec 以取得更多詳細資料。

範例

在以下程式碼片段中,我們從一組文件開始,每個文件都表示為一個字詞序列。對於每個文件,我們會將其轉換成一個特徵向量。此特徵向量之後可以傳遞給學習演算法。

請參閱 Word2Vec Python 文件 以取得更多有關 API 的詳細資料。

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
在 Spark 儲存庫中,於「examples/src/main/python/ml/word2vec_example.py」中尋找完整的範例程式碼。

請參閱 Word2Vec Scala 文件 以取得更多有關 API 的詳細資料。

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
  println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala」中尋找完整的範例程式碼。

請參閱 Word2Vec Java 文件 以取得更多有關 API 的詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
  RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
  RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0);

Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);

for (Row row : result.collectAsList()) {
  List<String> text = row.getList(0);
  Vector vector = (Vector) row.get(1);
  System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaWord2VecExample.java」中尋找完整的範例程式碼。

CountVectorizer

CountVectorizerCountVectorizerModel 旨在協助將文本文件集合轉換成代幣計數向量。當沒有先驗字典時,CountVectorizer 可用作 Estimator 來擷取詞彙,並產生 CountVectorizerModel。此模型會針對詞彙產生文件的稀疏表示,之後可以傳遞給其他演算法,例如 LDA。

在擬合過程中,CountVectorizer 會選取語料庫中依詞頻排序的前 vocabSize 個字詞。選用參數 minDF 也會影響擬合過程,方法是指定一個詞彙必須出現在文件中最少次數(或小於 1.0 時的比例)才能包含在詞彙中。另一個選用的二進制切換參數會控制輸出向量。如果設為 true,所有非零計數都會設為 1。這對於會對二進制(而非整數)計數建模的離散機率模型特別有用。

範例

假設我們有以下 DataFrame,其欄位為 idtexts

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

texts 中的每一列都是 Array[String] 類型的文件。呼叫 CountVectorizer 的 fit 會產生一個 CountVectorizerModel,其詞彙為 (a, b, c)。然後,轉換後的輸出欄位 “vector” 會包含

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

每個向量都代表文件在詞彙中的計數。

請參閱 CountVectorizer Python 文件CountVectorizerModel Python 文件 以取得更多 API 詳細資料。

from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
在 Spark 儲存庫中,請在「examples/src/main/python/ml/count_vectorizer_example.py」中找到完整的範例程式碼。

請參閱 CountVectorizer Scala 文件CountVectorizerModel Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).show(false)
在 Spark 儲存庫中,請在「examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala」中找到完整的範例程式碼。

請參閱 CountVectorizer Java 文件CountVectorizerModel Java 文件 以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("a", "b", "c")),
  RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("feature")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df);

// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
  .setInputCol("text")
  .setOutputCol("feature");

cvModel.transform(df).show(false);
在 Spark 儲存庫中,請在「examples/src/main/java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java」中找到完整的範例程式碼。

FeatureHasher

特徵雜湊會將一組分類或數值特徵投影到指定維度的特徵向量中(通常遠小於原始特徵空間)。這是使用 雜湊技巧 將特徵對應到特徵向量中的索引。

FeatureHasher 轉換器會在多個欄位上運作。每個欄位都可能包含數值或分類特徵。欄位資料類型的行為和處理方式如下

會忽略 Null(遺失)值(在結果特徵向量中隱含為零)。

在此使用的雜湊函數也是 MurmurHash 3,用於 HashingTF。由於會使用雜湊值的簡單模數來決定向量索引,建議將 numFeatures 參數設為 2 的次方;否則,特徵不會均勻對應到向量索引。

範例

假設我們有一個包含 4 個輸入欄位的 DataFrame realboolstringNumstring。這些不同的資料類型作為輸入將說明轉換的行為,以產生特徵向量的欄位。

real| bool|stringNum|string
----|-----|---------|------
 2.2| true|        1|   foo
 3.3|false|        2|   bar
 4.4|false|        3|   baz
 5.5|false|        4|   foo

然後,FeatureHasher.transform 在這個 DataFrame 上的輸出為

real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1        |foo   |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2        |bar   |(262144,[6031,  80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3        |baz   |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4        |foo   |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])

然後可以將結果特徵向量傳遞給學習演算法。

請參閱 FeatureHasher Python 文件,以取得有關 API 的更多詳細資訊。

from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)
在 Spark 儲存庫中,於「examples/src/main/python/ml/feature_hasher_example.py」中尋找完整的範例程式碼。

請參閱 FeatureHasher Scala 文件,以取得有關 API 的更多詳細資訊。

import org.apache.spark.ml.feature.FeatureHasher

val dataset = spark.createDataFrame(Seq(
  (2.2, true, "1", "foo"),
  (3.3, false, "2", "bar"),
  (4.4, false, "3", "baz"),
  (5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")

val hasher = new FeatureHasher()
  .setInputCols("real", "bool", "stringNum", "string")
  .setOutputCol("features")

val featurized = hasher.transform(dataset)
featurized.show(false)
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/FeatureHasherExample.scala」中尋找完整的範例程式碼。

請參閱 FeatureHasher Java 文件,以取得有關 API 的更多詳細資訊。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(2.2, true, "1", "foo"),
  RowFactory.create(3.3, false, "2", "bar"),
  RowFactory.create(4.4, false, "3", "baz"),
  RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
  new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
  new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
  new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);

FeatureHasher hasher = new FeatureHasher()
  .setInputCols(new String[]{"real", "bool", "stringNum", "string"})
  .setOutputCol("features");

Dataset<Row> featurized = hasher.transform(dataset);

featurized.show(false);
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaFeatureHasherExample.java」中尋找完整的範例程式碼。

特徵轉換器

Tokenizer

詞彙分析是將文字(例如句子)取出並將其分解為個別詞彙(通常是字詞)的過程。一個簡單的 Tokenizer 類別提供此功能。以下範例顯示如何將句子拆分為字詞序列。

RegexTokenizer 允許根據正規表示式 (regex) 比對進行更進階的詞彙分析。預設情況下,參數「pattern」(regex,預設:"\\s+")用作分隔符號來拆分輸入文字。或者,使用者可以將參數「gaps」設定為 false,表示 regex「pattern」表示「token」,而不是拆分間隙,並將所有比對到的項目作為詞彙分析結果。

範例

請參閱 Tokenizer Python 文件RegexTokenizer Python 文件,以取得有關 API 的更多詳細資訊。

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
在 Spark 儲存庫中「examples/src/main/python/ml/tokenizer_example.py」中尋找完整的範例程式碼。

參閱 Tokenizer Scala 文件RegexTokenizer Scala 文件,以取得有關 API 的更多詳細資料。

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val countTokens = udf { (words: Seq[String]) => words.length }

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)
在 Spark 儲存庫中「examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala」中尋找完整的範例程式碼。

參閱 Tokenizer Java 文件RegexTokenizer Java 文件,以取得有關 API 的更多詳細資料。

import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.Seq;

import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(1, "I wish Java could use case classes"),
  RowFactory.create(2, "Logistic,regression,models,are,neat")
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

RegexTokenizer regexTokenizer = new RegexTokenizer()
    .setInputCol("sentence")
    .setOutputCol("words")
    .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
  "countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);

Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);
在 Spark 儲存庫中「examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java」中尋找完整的範例程式碼。

StopWordsRemover

停止詞 是應該從輸入中排除的詞彙,通常是因為這些詞彙出現頻率高,且沒有太多意義。

StopWordsRemover 以字串序列為輸入(例如 Tokenizer 的輸出),並從輸入序列中移除所有停止詞。停止詞清單由 stopWords 參數指定。某些語言的預設停止詞可透過呼叫 StopWordsRemover.loadDefaultStopWords(language) 存取,可用的選項包括「丹麥文」、「荷蘭文」、「英文」、「芬蘭文」、「法文」、「德文」、「匈牙利文」、「義大利文」、「挪威文」、「葡萄牙文」、「俄文」、「西班牙文」、「瑞典文」和「土耳其文」。布林參數 caseSensitive 指出比對是否應區分大小寫(預設為 false)。

範例

假設我們有以下 DataFrame,其中包含欄位 idraw

 id | raw
----|----------
 0  | [I, saw, the, red, balloon]
 1  | [Mary, had, a, little, lamb]

使用 raw 作為輸入欄位,並使用 filtered 作為輸出欄位,套用 StopWordsRemover,我們應該會得到以下結果

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, balloon]  |  [saw, red, balloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

filtered 中,停止詞「I」、「the」、「had」和「a」已過濾掉。

參閱 StopWordsRemover Python 文件,以取得有關 API 的更多詳細資料。

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
在 Spark 儲存庫中「examples/src/main/python/ml/stopwords_remover_example.py」中尋找完整的範例程式碼。

參閱 StopWordsRemover Scala 文件,以取得有關 API 的更多詳細資料。

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "balloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show(false)
在 Spark 儲存庫中「examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala」中尋找完整的範例程式碼。

參閱 StopWordsRemover Java 文件,以取得有關 API 的更多詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered");

List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
  RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);

StructType schema = new StructType(new StructField[]{
  new StructField(
    "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
在 Spark 儲存庫中「examples/src/main/java/org/apache/spark/examples/ml/JavaStopWordsRemoverExample.java」中尋找完整的範例程式碼。

$n$-gram

對於某個整數 $n$,n-gram 是 $n$ 個記號(通常是字詞)的序列。NGram 類別可用於將輸入特徵轉換為 $n$-gram。

NGram 輸入字串序列(例如 Tokenizer 的輸出)。參數 n 用於決定每個 $n$-gram 中的詞彙數。輸出將包含 $n$-gram 序列,其中每個 $n$-gram 由 $n$ 個連續字詞的空白分隔字串表示。如果輸入序列包含少於 n 個字串,則不會產生輸出。

範例

請參閱 NGram Python 文件 以取得更多 API 詳細資料。

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
在 Spark 儲存庫中的「examples/src/main/python/ml/n_gram_example.py」中尋找完整的範例程式碼。

請參閱 NGram Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala」中尋找完整的範例程式碼。

請參閱 NGram Java 文件 以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
  RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
  RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField(
    "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);

NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");

Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaNGramExample.java」中尋找完整的範例程式碼。

Binarizer

二元化是將數值特徵閾值化為二元 (0/1) 特徵的程序。

Binarizer 採用常見參數 inputColoutputCol,以及二元化的 threshold。大於閾值的特徵值會二元化為 1.0;小於或等於閾值的數值會二元化為 0.0。Vector 和 Double 類型都支援 inputCol

範例

請參閱 Binarizer Python 文件 以取得更多 API 詳細資料。

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
在 Spark 儲存庫中的「examples/src/main/python/ml/binarizer_example.py」中尋找完整的範例程式碼。

請參閱 Binarizer Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala」中尋找完整的範例程式碼。

請參閱 Binarizer Java 文件 以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 0.1),
  RowFactory.create(1, 0.8),
  RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);

Binarizer binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5);

Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);

System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaBinarizerExample.java」中尋找完整的範例程式碼。

PCA

PCA 是一種統計程序,使用正交轉換將一組可能相關變數的觀測值轉換為一組稱為主成分的線性非相關變數值。PCA 類別訓練模型以使用 PCA 將向量投影到低維度空間。以下範例顯示如何將 5 維特徵向量投影到 3 維主成分。

範例

有關 API 的詳細資訊,請參閱 PCA Python 文件

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
在 Spark 儲存庫中,於「examples/src/main/python/ml/pca_example.py」中尋找完整的範例程式碼。

有關 API 的詳細資訊,請參閱 PCA Scala 文件

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)

val result = pca.transform(df).select("pcaFeatures")
result.show(false)
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala」中尋找完整的範例程式碼。

有關 API 的詳細資訊,請參閱 PCA Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
  RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
  RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PCAModel pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df);

Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaPCAExample.java」中尋找完整的範例程式碼。

PolynomialExpansion

多項式展開 是將特徵展開成多項式空間的程序,這是由原始維度的 n 次方組合所形成。 PolynomialExpansion 類別提供此功能。以下範例顯示如何將特徵展開成 3 次方多項式空間。

範例

有關 API 的詳細資訊,請參閱 PolynomialExpansion Python 文件

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
在 Spark 儲存庫中,於「examples/src/main/python/ml/polynomial_expansion_example.py」中尋找完整的範例程式碼。

有關 API 的詳細資訊,請參閱 PolynomialExpansion Scala 文件

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(2.0, 1.0),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)

val polyDF = polyExpansion.transform(df)
polyDF.show(false)
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala」中尋找完整的範例程式碼。

有關 API 的詳細資訊,請參閱 PolynomialExpansion Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(2.0, 1.0)),
  RowFactory.create(Vectors.dense(0.0, 0.0)),
  RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java」中尋找完整的範例程式碼。

離散餘弦轉換 (DCT)

離散餘弦轉換 會將時域中的長度 $N$ 實值序列轉換成頻域中的另一個長度 $N$ 實值序列。 DCT 類別提供此功能,實作 DCT-II 並將結果縮放為 $1/\sqrt{2}$,以便轉換的表示矩陣為酉矩陣。不會對轉換後的序列套用位移(例如,轉換後序列的第 0 個元素為第 0 個 DCT 係數,而非第 $N/2$ 個)。

範例

有關 API 的詳細資訊,請參閱 DCT Python 文件

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
在 Spark 儲存庫中,於「examples/src/main/python/ml/dct_example.py」中尋找完整的範例程式碼。

有關 API 的詳細資訊,請參閱 DCT Scala 文件

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala」中尋找完整的範例程式碼。

有關 API 的詳細資訊,請參閱 DCT Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
  RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
  RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

DCT dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false);

Dataset<Row> dctDf = dct.transform(df);

dctDf.select("featuresDCT").show(false);
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaDCTExample.java」中尋找完整的範例程式碼。

StringIndexer

StringIndexer 將標籤的字串欄位編碼成標籤索引的欄位。 StringIndexer 可以編碼多個欄位。索引位於 [0, numLabels),並支援四種排序選項:「frequencyDesc」:依標籤頻率遞減排序(最頻繁的標籤指定為 0),「frequencyAsc」:依標籤頻率遞增排序(最不頻繁的標籤指定為 0),「alphabetDesc」:依字母順序遞減排序,以及「alphabetAsc」:依字母順序遞增排序(預設為「frequencyDesc」)。請注意,當在「frequencyDesc」/「frequencyAsc」下頻率相等時,字串將進一步依字母順序排序。

如果使用者選擇保留未見過的標籤,這些標籤將會放在索引 numLabels。如果輸入欄位是數字,我們會將其轉換成字串並索引字串值。當下游管道元件(例如 EstimatorTransformer)使用這個字串索引標籤時,您必須將元件的輸入欄位設定為這個字串索引欄位名稱。在許多情況下,您可以使用 setInputCol 設定輸入欄位。

範例

假設我們有以下包含欄位 idcategory 的 DataFrame

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category 是包含三個標籤「a」、「b」和「c」的字串欄位。使用 category 作為輸入欄位和 categoryIndex 作為輸出欄位的 StringIndexer,我們應該會得到以下結果

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

「a」取得索引 0,因為它是出現最頻繁的標籤,其次是索引為 1 的「c」和索引為 2 的「b」。

此外,關於 StringIndexer 在您針對一個資料集調整 StringIndexer 然後使用它轉換另一個資料集時將如何處理未見過的標籤,有三個策略

範例

讓我們回到前一個範例,但這次在以下資料集上重複使用我們先前定義的 StringIndexer

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

如果您未設定 StringIndexer 如何處理未見標籤或將其設定為「錯誤」,系統將擲回例外。但是,如果您已呼叫 setHandleInvalid("skip"),將會產生下列資料集

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

請注意,包含「d」或「e」的列未出現。

如果您呼叫 setHandleInvalid("keep"),將會產生下列資料集

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0

請注意,包含「d」或「e」的列已對應至索引「3.0」

請參閱 StringIndexer Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
請在 Spark 儲存庫中的「examples/src/main/python/ml/string_indexer_example.py」中尋找完整的範例程式碼。

請參閱 StringIndexer Scala 文件,以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
請在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala」中尋找完整的範例程式碼。

請參閱 StringIndexer Java 文件,以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexer indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex");

Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
請在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaStringIndexerExample.java」中尋找完整的範例程式碼。

IndexToString

StringIndexer 對稱的是,IndexToString 會將標籤索引的欄位對應回包含原始標籤(字串)的欄位。常見的用例是使用 StringIndexer 從標籤產生索引,使用這些索引訓練模型,並使用 IndexToString 從預測索引的欄位中擷取原始標籤。但是,您可以自由提供自己的標籤。

範例

StringIndexer 範例為基礎,假設我們有下列包含欄位 idcategoryIndex 的資料框

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

使用 IndexToString,並將 categoryIndex 作為輸入欄位,originalCategory 作為輸出欄位,我們就能夠擷取原始標籤(它們會從欄位的元資料中推論出來)

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

請參閱 IndexToString Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
請在 Spark 儲存庫中的「examples/src/main/python/ml/index_to_string_example.py」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 IndexToString Scala 文件

import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

println(s"Transformed string column '${indexer.getInputCol}' " +
    s"to indexed column '${indexer.getOutputCol}'")
indexed.show()

val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
    s"${Attribute.fromStructField(inputColSchema).toString}\n")

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)

println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
    s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala」中,尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 IndexToString Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
Dataset<Row> indexed = indexer.transform(df);

System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
    "to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();

StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
    Attribute.fromStructField(inputColSchema).toString() + "\n");

IndexToString converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);

System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
    "original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaIndexToStringExample.java」中,尋找完整的範例程式碼。

OneHotEncoder

獨熱編碼會將表示為標籤索引的分類特徵,對應到一個二進制向量,其中至多只有一個非零值,表示所有特徵值集合中特定特徵值的存在。此編碼允許預期連續特徵的演算法(例如邏輯迴歸)使用分類特徵。對於字串類型輸入資料,通常會先使用 StringIndexer 編碼分類特徵。

OneHotEncoder 可以轉換多個欄位,並為每個輸入欄位傳回一個獨熱編碼的輸出向量欄位。通常會使用 VectorAssembler 將這些向量合併成一個特徵向量。

OneHotEncoder 支援 handleInvalid 參數,用於選擇在轉換資料期間如何處理無效輸入。可用選項包括「保留」(任何無效輸入都會指定給額外的分類索引)和「錯誤」(擲回錯誤)。

範例

有關 API 的更多詳細資訊,請參閱 OneHotEncoder Python 文件

from pyspark.ml.feature import OneHotEncoder

df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
在 Spark 儲存庫中的「examples/src/main/python/ml/onehot_encoder_example.py」中,尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 OneHotEncoder Scala 文件

import org.apache.spark.ml.feature.OneHotEncoder

val df = spark.createDataFrame(Seq(
  (0.0, 1.0),
  (1.0, 0.0),
  (2.0, 1.0),
  (0.0, 2.0),
  (0.0, 1.0),
  (2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")

val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex1", "categoryIndex2"))
  .setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)

val encoded = model.transform(df)
encoded.show()
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala」中,尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 OneHotEncoder Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, 1.0),
  RowFactory.create(1.0, 0.0),
  RowFactory.create(2.0, 1.0),
  RowFactory.create(0.0, 2.0),
  RowFactory.create(0.0, 1.0),
  RowFactory.create(2.0, 0.0)
);

StructType schema = new StructType(new StructField[]{
  new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

OneHotEncoder encoder = new OneHotEncoder()
  .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
  .setOutputCols(new String[] {"categoryVec1", "categoryVec2"});

OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java」中,尋找完整的範例程式碼。

VectorIndexer

VectorIndexer 可協助索引 Vector 資料集中的分類特徵。它可以自動判斷哪些特徵是分類特徵,並將原始值轉換為類別索引。具體來說,它會執行下列動作

  1. 取得類型為 Vector 的輸入欄位和參數 maxCategories
  2. 根據不同值的數量,判斷哪些特徵應為分類特徵,其中最多具有 maxCategories 的特徵會宣告為分類特徵。
  3. 為每個分類特徵計算從 0 開始的類別索引。
  4. 索引分類特徵,並將原始特徵值轉換為索引。

索引分類特徵可讓決策樹和樹狀集合等演算法適當地處理分類特徵,進而提升效能。

範例

在以下範例中,我們讀取標記點的資料集,然後使用 VectorIndexer 來判斷哪些特徵應視為分類特徵。我們將分類特徵值轉換為其索引。此轉換後的資料接著可以傳遞給處理分類特徵的演算法,例如 DecisionTreeRegressor

有關 API 的更多詳細資訊,請參閱 VectorIndexer Python 文件

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
在 Spark 儲存庫中,於「範例/src/main/python/ml/vector_indexer_example.py」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 VectorIndexer Scala 文件

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
  s"categorical features: ${categoricalFeatures.mkString(", ")}")

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
在 Spark 儲存庫中,於「範例/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 VectorIndexer Java 文件

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
在 Spark 儲存庫中,於「範例/src/main/java/org/apache/spark/examples/ml/JavaVectorIndexerExample.java」中尋找完整的範例程式碼。

互動

InteractionTransformer,它採用向量或雙值欄,並產生單一向量欄,其中包含每個輸入欄中一個值的乘積組合。

例如,如果您有 2 個向量類型欄,每個欄有 3 個維度作為輸入欄,則您將獲得 9 維向量作為輸出欄。

範例

假設我們有以下 DataFrame,其中包含「id1」、「vec1」和「vec2」欄

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]     

使用 Interaction 應用這些輸入欄,則 interactedCol 作為輸出欄包含

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]       

有關 API 的更多詳細資訊,請參閱 Interaction Python 文件

from pyspark.ml.feature import Interaction, VectorAssembler

df = spark.createDataFrame(
    [(1, 1, 2, 3, 8, 4, 5),
     (2, 4, 3, 8, 7, 9, 8),
     (3, 6, 1, 9, 2, 3, 6),
     (4, 10, 8, 6, 9, 4, 5),
     (5, 9, 2, 7, 10, 7, 3),
     (6, 1, 1, 4, 2, 8, 4)],
    ["id1", "id2", "id3", "id4", "id5", "id6", "id7"])

assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")

assembled1 = assembler1.transform(df)

assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")

assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")

interacted = interaction.transform(assembled2)

interacted.show(truncate=False)
在 Spark 儲存庫中,於「範例/src/main/python/ml/interaction_example.py」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 Interaction Scala 文件

import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 10, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 10, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
  setInputCols(Array("id2", "id3", "id4")).
  setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
  setInputCols(Array("id5", "id6", "id7")).
  setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
  .setInputCols(Array("id1", "vec1", "vec2"))
  .setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)
在 Spark 儲存庫中,於「範例/src/main/scala/org/apache/spark/examples/ml/InteractionExample.scala」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 Interaction Java 文件

List<Row> data = Arrays.asList(
  RowFactory.create(1, 1, 2, 3, 8, 4, 5),
  RowFactory.create(2, 4, 3, 8, 7, 9, 8),
  RowFactory.create(3, 6, 1, 9, 2, 3, 6),
  RowFactory.create(4, 10, 8, 6, 9, 4, 5),
  RowFactory.create(5, 9, 2, 7, 10, 7, 3),
  RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VectorAssembler assembler1 = new VectorAssembler()
        .setInputCols(new String[]{"id2", "id3", "id4"})
        .setOutputCol("vec1");

Dataset<Row> assembled1 = assembler1.transform(df);

VectorAssembler assembler2 = new VectorAssembler()
        .setInputCols(new String[]{"id5", "id6", "id7"})
        .setOutputCol("vec2");

Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");

Interaction interaction = new Interaction()
        .setInputCols(new String[]{"id1","vec1","vec2"})
        .setOutputCol("interactedCol");

Dataset<Row> interacted = interaction.transform(assembled2);

interacted.show(false);
在 Spark 儲存庫中,於「範例/src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java」中尋找完整的範例程式碼。

Normalizer

NormalizerTransformer,它轉換 Vector 列的資料集,將每個 Vector 正規化為單位範數。它採用參數 p,它指定用於正規化的 p-範數。(預設為 $p = 2$。)此正規化有助於標準化您的輸入資料,並改善學習演算法的行為。

範例

以下範例示範如何載入 libsvm 格式的資料集,然後將每列正規化為單位 $L^1$ 範數和單位 $L^\infty$ 範數。

有關 API 的更多詳細資訊,請參閱 Normalizer Python 文件

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
在 Spark 回存區中,請在「範例/src/main/python/ml/normalizer_example.py」中尋找完整的範例程式碼。

請參閱 Normalizer Scala 文件,以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
在 Spark 回存區中,請在「範例/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala」中尋找完整的範例程式碼。

請參閱 Normalizer Java 文件,以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
在 Spark 回存區中,請在「範例/src/main/java/org/apache/spark/examples/ml/JavaNormalizerExample.java」中尋找完整的範例程式碼。

StandardScaler

StandardScaler 會轉換 Vector 列的資料集,將每個特徵正規化為單位標準差和/或平均值為零。它會採用下列參數

StandardScalerEstimator,可以在資料集上 fit 以產生 StandardScalerModel;這等同於計算摘要統計資料。然後,模型可以轉換資料集中的 Vector 欄,使其具有單位標準差和/或平均值為零的特徵。

請注意,如果特徵的標準差為零,它會在 Vector 中傳回該特徵的預設 0.0 值。

範例

下列範例示範如何載入 libsvm 格式的資料集,然後將每個特徵正規化為單位標準差。

請參閱 StandardScaler Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 回存區中,請在「範例/src/main/python/ml/standard_scaler_example.py」中尋找完整的範例程式碼。

請參閱 StandardScaler Scala 文件,以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 回存區中,請在「範例/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala」中尋找完整的範例程式碼。

請參閱 StandardScaler Java 文件,以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
在 Spark 回存區中,請在「範例/src/main/java/org/apache/spark/examples/ml/JavaStandardScalerExample.java」中尋找完整的範例程式碼。

RobustScaler

RobustScaler 轉換 Vector 列資料集,移除中位數並根據特定分位數範圍(預設為 IQR:四分位距,第 1 四分位數和第 3 四分位數之間的分位數範圍)縮放資料。其行為與 StandardScaler 非常相似,但使用中位數和分位數範圍,而不是平均值和標準差,這使其對異常值具有魯棒性。它採用參數

RobustScalerEstimator,可以在資料集上 fit 以產生 RobustScalerModel;這等於計算分位數統計資料。然後,模型可以轉換資料集中的 Vector 欄,使其具有單位分位數範圍和/或零中位數特徵。

請注意,如果特徵的分位數範圍為零,它將在 Vector 中為該特徵傳回預設 0.0 值。

範例

以下範例說明如何載入 libsvm 格式的資料集,然後將每個特徵正規化為單位分位數範圍。

請參閱 RobustScaler Python 文件,以取得更多有關 API 的詳細資訊。

from pyspark.ml.feature import RobustScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)

# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 儲存庫中,於「examples/src/main/python/ml/robust_scaler_example.py」中尋找完整的範例程式碼。

請參閱 RobustScaler Scala 文件,以取得更多有關 API 的詳細資訊。

import org.apache.spark.ml.feature.RobustScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75)

// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)

// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/RobustScalerExample.scala」中尋找完整的範例程式碼。

請參閱 RobustScaler Java 文件,以取得更多有關 API 的詳細資訊。

import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

RobustScaler scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75);

// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);

// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaRobustScalerExample.java」中尋找完整的範例程式碼。

MinMaxScaler

MinMaxScaler 轉換 Vector 列資料集,將每個特徵重新縮放到特定範圍(通常為 [0, 1])。它採用參數

MinMaxScaler 會計算資料集的摘要統計資料,並產生一個 MinMaxScalerModel。然後,模型可以個別轉換每個特徵,讓它在給定的範圍內。

特徵 E 的重新縮放值會計算為 \begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation} 對於 $E_{max} == E_{min}$ 的情況,$Rescaled(e_i) = 0.5 * (max + min)$

請注意,由於零值可能會轉換為非零值,因此轉換器的輸出會是 DenseVector,即使是稀疏輸入也是如此。

範例

以下範例示範如何載入 libsvm 格式的資料集,然後將每個特徵重新縮放到 [0, 1]。

請參閱 MinMaxScaler Python 文件MinMaxScalerModel Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
在 Spark 儲存庫中的「examples/src/main/python/ml/min_max_scaler_example.py」中,尋找完整的範例程式碼。

請參閱 MinMaxScaler Scala 文件MinMaxScalerModel Scala 文件,以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -1.0)),
  (1, Vectors.dense(2.0, 1.1, 1.0)),
  (2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala」中,尋找完整的範例程式碼。

請參閱 MinMaxScaler Java 文件MinMaxScalerModel Java 文件,以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
    RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
    + scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaMinMaxScalerExample.java」中,尋找完整的範例程式碼。

MaxAbsScaler

MaxAbsScaler 會轉換 Vector 列的資料集,透過除以每個特徵中的最大絕對值,將每個特徵重新縮放到 [-1, 1] 範圍。它不會位移/置中資料,因此不會破壞任何稀疏性。

MaxAbsScaler 會計算資料集的摘要統計資料,並產生一個 MaxAbsScalerModel。然後,模型可以個別轉換每個特徵到 [-1, 1] 範圍。

範例

以下範例說明如何載入 libsvm 格式的資料集,然後將每個特徵重新縮放至 [-1, 1]。

請參閱 MaxAbsScaler Python 文件MaxAbsScalerModel Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()
在 Spark 儲存庫中,於「examples/src/main/python/ml/max_abs_scaler_example.py」中尋找完整的範例程式碼。

請參閱 MaxAbsScaler Scala 文件MaxAbsScalerModel Scala 文件,以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -8.0)),
  (1, Vectors.dense(2.0, 1.0, -4.0)),
  (2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala」中尋找完整的範例程式碼。

請參閱 MaxAbsScaler Java 文件MaxAbsScalerModel Java 文件,以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaMaxAbsScalerExample.java」中尋找完整的範例程式碼。

Bucketizer

Bucketizer 將連續特徵的欄位轉換為特徵區段的欄位,其中區段由使用者指定。它需要一個參數

請注意,如果您不知道目標欄位的上限和下限,您應該將 Double.NegativeInfinityDouble.PositiveInfinity 新增為區段的界限,以防止潛在的 Bucketizer 界限範圍異常。

另請注意,您提供的區段必須嚴格遞增順序,即 s0 < s1 < s2 < ... < sn

更多詳細資料可在 Bucketizer 的 API 文件中找到。

範例

以下範例說明如何將 Double 的欄位區段化為另一個索引明智的欄位。

請參閱 Bucketizer Python 文件,以取得更多 API 詳細資料。

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
在 Spark 儲存庫中,於「examples/src/main/python/ml/bucketizer_example.py」中尋找完整的範例程式碼。

請參閱 Bucketizer Scala 文件,以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()

val splitsArray = Array(
  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
  Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))

val data2 = Array(
  (-999.9, -999.9),
  (-0.5, -0.2),
  (-0.3, -0.1),
  (0.0, 0.0),
  (0.2, 0.4),
  (999.9, 999.9))
val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")

val bucketizer2 = new Bucketizer()
  .setInputCols(Array("features1", "features2"))
  .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
  .setSplitsArray(splitsArray)

// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)

println(s"Bucketizer output with [" +
  s"${bucketizer2.getSplitsArray(0).length-1}, " +
  s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 Bucketizer Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-999.9),
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2),
  RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);

System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();

// Bucketize multiple columns at one pass.
double[][] splitsArray = {
  {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
  {Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};

List<Row> data2 = Arrays.asList(
  RowFactory.create(-999.9, -999.9),
  RowFactory.create(-0.5, -0.2),
  RowFactory.create(-0.3, -0.1),
  RowFactory.create(0.0, 0.0),
  RowFactory.create(0.2, 0.4),
  RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
  new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);

Bucketizer bucketizer2 = new Bucketizer()
  .setInputCols(new String[] {"features1", "features2"})
  .setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
  .setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);

System.out.println("Bucketizer output with [" +
  (bucketizer2.getSplitsArray()[0].length-1) + ", " +
  (bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
在 Spark 儲存庫的「examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java」中,尋找完整的範例程式碼。

ElementwiseProduct

ElementwiseProduct 使用逐元素乘法,將每個輸入向量乘以提供的「權重」向量。換句話說,它會將資料集的每一欄縮放為一個純量乘數。這表示輸入向量 v 和轉換向量 w 之間的 哈達瑪乘積,會產生一個結果向量。

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

範例

以下範例說明如何使用轉換向量值轉換向量。

有關 API 的更多詳細資訊,請參閱 ElementwiseProduct Python 文件

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
在 Spark 儲存庫的「examples/src/main/python/ml/elementwise_product_example.py」中,尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 ElementwiseProduct Scala 文件

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
在 Spark 儲存庫的「examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala」中,尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 ElementwiseProduct Java 文件

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
在 Spark 儲存庫的「examples/src/main/java/org/apache/spark/examples/ml/JavaElementwiseProductExample.java」中,尋找完整的範例程式碼。

SQLTransformer

SQLTransformer 實作由 SQL 陳述式定義的轉換。目前,我們僅支援類似 "SELECT ... FROM __THIS__ ..." 的 SQL 語法,其中 "__THIS__" 代表輸入資料集的基礎資料表。選取子句會指定要顯示在輸出中的欄位、常數和運算式,而且可以是 Spark SQL 支援的任何選取子句。使用者也可以使用 Spark SQL 內建函式和 UDF 來操作這些選取的欄位。例如,SQLTransformer 支援類似下列的陳述式

範例

假設我們有以下 DataFrame,其中包含欄位 idv1v2

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

這是 SQLTransformer 的輸出,其陳述式為 "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0

請參閱 SQLTransformer Python 文件,以取得更多有關 API 的詳細資料。

from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
在 Spark 回存區中,請在「examples/src/main/python/ml/sql_transformer.py」中尋找完整的範例程式碼。

請參閱 SQLTransformer Scala 文件,以取得更多有關 API 的詳細資料。

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
在 Spark 回存區中,請在「examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala」中尋找完整的範例程式碼。

請參閱 SQLTransformer Java 文件,以取得更多有關 API 的詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();
在 Spark 回存區中,請在「examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java」中尋找完整的範例程式碼。

VectorAssembler

VectorAssembler 是一個轉換器,它會將給定的欄位清單組合成單一向量欄位。它對於將原始特徵和由不同特徵轉換器產生的特徵組合成單一特徵向量非常有用,以便訓練機器學習模型,例如邏輯迴歸和決策樹。 VectorAssembler 接受下列輸入欄位類型:所有數值類型、布林類型和向量類型。在每一列中,輸入欄位的值將會以指定順序串接成一個向量。

範例

假設我們有一個 DataFrame,其中包含欄位 idhourmobileuserFeaturesclicked

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeatures 是一個向量欄位,其中包含三個使用者特徵。我們想要將 hourmobileuserFeatures 組合到稱為 features 的單一特徵向量中,並使用它來預測是否 clicked。如果我們將 VectorAssembler 的輸入欄位設定為 hourmobileuserFeatures,並將輸出欄位設定為 features,在轉換後,我們應該會取得以下 DataFrame

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

請參閱 VectorAssembler Python 文件,以取得更多有關 API 的詳細資料。

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
在 Spark 儲存庫中的「examples/src/main/python/ml/vector_assembler_example.py」中找到完整的範例程式碼。

請參閱 VectorAssembler Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala」中找到完整的範例程式碼。

請參閱 VectorAssembler Java 文件 以取得更多 API 詳細資料。

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaVectorAssemblerExample.java」中找到完整的範例程式碼。

VectorSizeHint

有時明確指定 VectorType 欄位的向量大小會很有用。例如,VectorAssembler 使用其輸入欄位的大小資訊來產生其輸出欄位的大小資訊和元資料。雖然在某些情況下,可以透過檢查欄位的內容來取得這些資訊,但在串流資料框中,內容在串流開始之前是不可用的。VectorSizeHint 允許使用者明確指定欄位的向量大小,以便 VectorAssembler 或可能需要知道向量大小的其他轉換器,可以使用該欄位作為輸入。

若要使用 VectorSizeHint,使用者必須設定 inputColsize 參數。將此轉換器套用至資料框會產生一個新的資料框,其中 inputCol 的更新元資料會指定向量大小。對結果資料框的下游作業可以使用元資料取得此大小。

VectorSizeHint 也可以採用一個選用的 handleInvalid 參數,用來控制當向量欄位包含 Null 或大小錯誤的向量時的行為。預設情況下,handleInvalid 設定為「error」,表示應該擲回例外。此參數也可以設定為「skip」,表示包含無效值的列應該從結果資料框中濾除,或「optimistic」,表示不應該檢查欄位是否有無效值,且應該保留所有列。請注意,使用「optimistic」可能會導致結果資料框處於不一致的狀態,表示套用 VectorSizeHint 的欄位元資料與該欄位的內容不符。使用者應該小心避免這種不一致的狀態。

請參閱 VectorSizeHint Python 文件 以取得更多 API 詳細資料。

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

sizeHint = VectorSizeHint(
    inputCol="userFeatures",
    handleInvalid="skip",
    size=3)

datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
在 Spark 回存中,尋找完整的範例程式碼於「範例/src/main/python/ml/vector_size_hint_example.py」

有關 API 的更多詳細資訊,請參閱 VectorSizeHint Scala 文件

import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
    (0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
    (0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3)

val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
在 Spark 回存中,尋找完整的範例程式碼於「範例/src/main/scala/org/apache/spark/examples/ml/VectorSizeHintExample.scala」

有關 API 的更多詳細資訊,請參閱 VectorSizeHint Java 文件

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);

VectorSizeHint sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3);

Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
在 Spark 回存中,尋找完整的範例程式碼於「範例/src/main/java/org/apache/spark/examples/ml/JavaVectorSizeHintExample.java」

QuantileDiscretizer

QuantileDiscretizer 會取得具有連續特徵的欄位,並輸出具有分類特徵的欄位。欄位數目會由 numBuckets 參數設定。欄位數目可能會小於此值,例如,如果輸入的相異值太少,無法建立足夠的相異分位數。

NaN 值:NaN 值會在 QuantileDiscretizer 擬合期間從欄位中移除。這將產生一個 Bucketizer 模型,用於進行預測。在轉換期間,Bucketizer 會在資料集中找到 NaN 值時引發錯誤,但使用者也可以選擇透過設定 handleInvalid 來保留或移除資料集中的 NaN 值。如果使用者選擇保留 NaN 值,這些值將被特別處理並放置在自己的欄位中,例如,如果使用 4 個欄位,則非 NaN 資料會放入欄位 [0-3],但 NaN 會計入特殊欄位 [4]。

演算法:欄位範圍會使用近似演算法來選擇(有關詳細說明,請參閱 approxQuantile 的文件)。近似的精確度可以使用 relativeError 參數來控制。設定為零時,會計算精確分位數(注意:計算精確分位數是一項昂貴的操作)。較低和較高的欄位界線會是 -Infinity+Infinity,涵蓋所有實值。

範例

假設我們有一個具有欄位 idhour 的資料框

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hour 是一個連續特徵,其類型為 Double。我們希望將連續特徵轉換為分類特徵。假設 numBuckets = 3,我們應該取得下列 DataFrame

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

請參閱 QuantileDiscretizer Python 文件 以取得更多 API 詳細資料。

from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()
在 Spark 儲存庫中,請在「examples/src/main/python/ml/quantile_discretizer_example.py」中尋找完整的範例程式碼。

請參閱 QuantileDiscretizer Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show(false)
在 Spark 儲存庫中,請在「examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala」中尋找完整的範例程式碼。

請參閱 QuantileDiscretizer Java 文件 以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 18.0),
  RowFactory.create(1, 19.0),
  RowFactory.create(2, 8.0),
  RowFactory.create(3, 5.0),
  RowFactory.create(4, 2.2)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
在 Spark 儲存庫中,請在「examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java」中尋找完整的範例程式碼。

Imputer

Imputer 估計器會使用遺失值所在欄位的平均值、中位數或眾數,來完成資料集中的遺失值。輸入欄位應為數值類型。目前,Imputer 不支援分類特徵,而且可能會為包含分類特徵的欄位建立不正確的值。Imputer 可以透過 .setMissingValue(custom_value) 來估算「NaN」以外的客製值。例如,.setMissingValue(0) 將估算所有 (0) 的發生。

注意輸入欄位中的所有 null 值都會視為遺失值,因此也會估算。

範例

假設我們有一個包含欄位 ab 的 DataFrame

      a     |      b      
------------|-----------
     1.0    | Double.NaN
     2.0    | Double.NaN
 Double.NaN |     3.0   
     4.0    |     4.0   
     5.0    |     5.0   

在此範例中,Imputer 會將所有 Double.NaN (遺失值的預設值) 發生取代為平均值 (預設的估算策略),該平均值是由對應欄位中的其他值計算而得。在此範例中,欄位 ab 的替代值分別為 3.0 和 4.0。轉換後,輸出欄位中的遺失值將會由相關欄位的替代值取代。

      a     |      b     | out_a | out_b   
------------|------------|-------|-------
     1.0    | Double.NaN |  1.0  |  4.0 
     2.0    | Double.NaN |  2.0  |  4.0 
 Double.NaN |     3.0    |  3.0  |  3.0 
     4.0    |     4.0    |  4.0  |  4.0
     5.0    |     5.0    |  5.0  |  5.0 

請參閱 Imputer Python 文件 以取得更多 API 詳細資料。

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
在 Spark 儲存庫中,請在「examples/src/main/python/ml/imputer_example.py」中尋找完整的範例程式碼。

請參閱 Imputer Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.feature.Imputer

val df = spark.createDataFrame(Seq(
  (1.0, Double.NaN),
  (2.0, Double.NaN),
  (Double.NaN, 3.0),
  (4.0, 4.0),
  (5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
  .setInputCols(Array("a", "b"))
  .setOutputCols(Array("out_a", "out_b"))

val model = imputer.fit(df)
model.transform(df).show()
在 Spark 儲存庫中,請在「examples/src/main/scala/org/apache/spark/examples/ml/ImputerExample.scala」中尋找完整的範例程式碼。

請參閱 Imputer Java 文件 以取得更多 API 詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1.0, Double.NaN),
  RowFactory.create(2.0, Double.NaN),
  RowFactory.create(Double.NaN, 3.0),
  RowFactory.create(4.0, 4.0),
  RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
  createStructField("a", DoubleType, false),
  createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Imputer imputer = new Imputer()
  .setInputCols(new String[]{"a", "b"})
  .setOutputCols(new String[]{"out_a", "out_b"});

ImputerModel model = imputer.fit(df);
model.transform(df).show();
在 Spark 儲存庫中,請在「examples/src/main/java/org/apache/spark/examples/ml/JavaImputerExample.java」中尋找完整的範例程式碼。

特徵選取器

VectorSlicer

VectorSlicer 是一個變壓器,它會接收一個特徵向量並輸出一個新的特徵向量,其中包含原始特徵的子陣列。它對於從向量欄位中萃取特徵很有用。

VectorSlicer 接受一個具有指定索引的向量欄位,然後輸出一個新的向量欄位,其值是透過這些索引選取的。有兩種索引類型,

  1. 整數索引,代表向量中的索引,setIndices()

  2. 字串索引,代表向量中特徵的名稱,setNames()這需要向量欄位有一個 AttributeGroup,因為實作會比對 Attribute 的名稱欄位。

同時以整數和字串指定是可以接受的。此外,您可以同時使用整數索引和字串名稱。至少必須選取一個特徵。不允許重複特徵,因此選取的索引和名稱之間不能有重疊。請注意,如果選取了特徵名稱,則在遇到空輸入屬性時會擲回例外狀況。

輸出向量會先依序排列選取索引的特徵(依據提供的順序),然後再排列選取名稱的特徵(依據提供的順序)。

範例

假設我們有一個具有欄位 userFeatures 的 DataFrame

 userFeatures
------------------
 [0.0, 10.0, 0.5]

userFeatures 是包含三個使用者特徵的向量欄位。假設 userFeatures 的第一欄都是零,因此我們想要移除它並只選取最後兩欄。VectorSlicer 使用 setIndices(1, 2) 選取最後兩個元素,然後產生一個名為 features 的新向量欄位

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]

假設我們也有 userFeatures 的潛在輸入屬性,例如 ["f1", "f2", "f3"],那麼我們可以使用 setNames("f2", "f3") 來選取它們。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

有關 API 的更多詳細資訊,請參閱 VectorSlicer Python 文件

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
在 Spark 儲存庫中,於「examples/src/main/python/ml/vector_slicer_example.py」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 VectorSlicer Scala 文件

import java.util.Arrays

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

val data = Arrays.asList(
  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  Row(Vectors.dense(-2.0, 2.3, 0.0))
)

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
output.show(false)
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala」中尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 VectorSlicer Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

Attribute[] attrs = {
  NumericAttribute.defaultAttr().withName("f1"),
  NumericAttribute.defaultAttr().withName("f2"),
  NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
  RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);

Dataset<Row> dataset =
  spark.createDataFrame(data, (new StructType()).add(group.toStructField()));

VectorSlicer vectorSlicer = new VectorSlicer()
  .setInputCol("userFeatures").setOutputCol("features");

vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})

Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java」中尋找完整的範例程式碼。

RFormula

RFormula 選取由 R 模型公式 指定的欄位。目前我們支援 R 運算子的一個有限子集,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-‘。基本運算子為

假設 ab 是雙精度欄位,我們使用以下簡單範例來說明 RFormula 的效果

RFormula 會產生特徵的向量欄位和標籤的雙精度或字串欄位。就像在 R 中將公式用於線性迴歸時,數值欄位會轉換為雙精度。至於字串輸入欄位,它們會先使用 StringIndexer 進行轉換,其排序由 stringOrderType 決定,並捨棄排序後的最後一個類別,然後對雙精度進行獨熱編碼。

假設一個字串特徵欄位包含值 {'b', 'a', 'b', 'a', 'c', 'b'},我們設定 stringOrderType 來控制編碼

stringOrderType | Category mapped to 0 by StringIndexer |  Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b')          | least frequent category ('c')
'frequencyAsc'  | least frequent category ('c')         | most frequent category ('b')
'alphabetDesc'  | last alphabetical category ('c')      | first alphabetical category ('a')
'alphabetAsc'   | first alphabetical category ('a')     | last alphabetical category ('c')

如果標籤欄位是字串類型,它會先使用 StringIndexer 轉換成 double,使用 frequencyDesc 排序。如果標籤欄位不存在於 DataFrame 中,會從公式中指定的回應變數建立輸出標籤欄位。

注意: 排序選項 stringOrderType 不會用於標籤欄位。當標籤欄位被索引時,它會使用 StringIndexer 中預設的降頻排序。

範例

假設我們有一個 DataFrame,其中包含欄位 idcountryhourclicked

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

如果我們使用 RFormula,其中公式字串為 clicked ~ country + hour,表示我們想要根據 countryhour 來預測 clicked,在轉換後,我們應該會得到以下 DataFrame

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

請參閱 RFormula Python 文件,以取得更多有關 API 的詳細資料。

from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
在 Spark 儲存庫中,於「examples/src/main/python/ml/rformula_example.py」中找到完整的範例程式碼。

請參閱 RFormula Scala 文件,以取得更多有關 API 的詳細資料。

import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")

val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
在 Spark 儲存庫中,於「examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala」中找到完整的範例程式碼。

請參閱 RFormula Java 文件,以取得更多有關 API 的詳細資料。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("country", StringType, false),
  createStructField("hour", IntegerType, false),
  createStructField("clicked", DoubleType, false)
});

List<Row> data = Arrays.asList(
  RowFactory.create(7, "US", 18, 1.0),
  RowFactory.create(8, "CA", 12, 0.0),
  RowFactory.create(9, "NZ", 15, 0.0)
);

Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
在 Spark 儲存庫中,於「examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java」中找到完整的範例程式碼。

ChiSqSelector

ChiSqSelector 代表卡方特徵選擇。它在有類別特徵的標籤資料上運作。ChiSqSelector 使用 卡方獨立檢定 來決定要選擇哪些特徵。它支援五種選擇方法:numTopFeaturespercentilefprfdrfwe

範例

假設我們有一個 DataFrame,其欄位為 idfeaturesclicked,其中 clicked 用作我們要預測的目標

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0

如果我們使用 ChiSqSelector,並設定 numTopFeatures = 1,則根據我們的標籤 clicked,我們的 features 中的最後一欄會被選為最有用的特徵

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

請參閱 ChiSqSelector Python 文件,以取得更多 API 詳細資訊。

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
在 Spark 回存區中,請在「examples/src/main/python/ml/chisq_selector_example.py」中找到完整的範例程式碼。

請參閱 ChiSqSelector Scala 文件,以取得更多 API 詳細資訊。

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
在 Spark 回存區中,請在「examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala」中找到完整的範例程式碼。

請參閱 ChiSqSelector Java 文件,以取得更多 API 詳細資訊。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

ChiSqSelector selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
    + " features selected");
result.show();
在 Spark 回存區中,請在「examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java」中找到完整的範例程式碼。

UnivariateFeatureSelector

UnivariateFeatureSelector 針對具有分類/連續特徵的分類/連續標籤運作。使用者可以設定 featureTypelabelType,而 Spark 會根據指定的 featureTypelabelType 選擇要使用的分數函數。

featureType |  labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous  |categorical | ANOVATest (f_classif)
continuous  |continuous  | F-value (f_regression)

它支援五種選取模式:numTopFeaturespercentilefprfdrfwe

預設情況下,選取模式為 numTopFeatures,預設的 selectionThreshold 設定為 50。

範例

假設我們有一個 DataFrame,其欄位為 idfeatureslabel,其中 label 用作我們要預測的目標

id | features                       | label
---|--------------------------------|---------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0

如果我們將 featureType 設定為 continuous,並將 labelType 設定為 categorical,且 numTopFeatures = 1,則我們 features 中的最後一欄將被選為最有用的特徵

id | features                       | label   | selectedFeatures
---|--------------------------------|---------|------------------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0     | [2.3]
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0     | [4.1]
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0     | [2.5]
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0     | [3.8]
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0     | [3.0]
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0     | [2.1]

有關 API 的更多詳細資訊,請參閱 UnivariateFeatureSelector Python 文件

from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
    (2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
    (3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
    (4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
    (5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
    (6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)

result = selector.fit(df).transform(df)

print("UnivariateFeatureSelector output with top %d features selected using f_classif"
      % selector.getSelectionThreshold())
result.show()
在 Spark 儲存庫中找到完整的範例程式碼,路徑為「examples/src/main/python/ml/univariate_feature_selector_example.py」。

有關 API 的更多詳細資訊,請參閱 UnivariateFeatureSelector Scala 文件

import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  (2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  (3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  (4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  (5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  (6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)

val df = spark.createDataset(data).toDF("id", "features", "label")

val selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
  s" features selected using f_classif")
result.show()
在 Spark 儲存庫中找到完整的範例程式碼,路徑為「examples/src/main/scala/org/apache/spark/examples/ml/UnivariateFeatureSelectorExample.scala」。

有關 API 的更多詳細資訊,請參閱 UnivariateFeatureSelector Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("UnivariateFeatureSelector output with top "
    + selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
在 Spark 儲存庫中找到完整的範例程式碼,路徑為「examples/src/main/java/org/apache/spark/examples/ml/JavaUnivariateFeatureSelectorExample.java」。

VarianceThresholdSelector

VarianceThresholdSelector 是一個用來移除低變異特徵的選擇器。變異 (樣本) 不大於 varianceThreshold 的特徵將會被移除。如果未設定,varianceThreshold 的預設值為 0,這表示只有變異為 0 的特徵 (即在所有樣本中具有相同值的特徵) 會被移除。

範例

假設我們有一個 DataFrame,其中包含欄位 idfeatures,而 features 被用作我們的目標,用於預測

id | features
---|--------------------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]

6 個特徵的樣本變異分別為 16.67、0.67、8.17、10.17、5.07 和 11.47。如果我們使用 VarianceThresholdSelector,且 varianceThreshold = 8.0,則會移除變異 <= 8.0 的特徵

id | features                       | selectedFeatures
---|--------------------------------|-------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]

有關 API 的更多詳細資訊,請參閱 VarianceThresholdSelector Python 文件

from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
    (2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
    (3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
    (4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
    (5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
    (6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])

selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")

result = selector.fit(df).transform(df)

print("Output: Features with variance lower than %f are removed." %
      selector.getVarianceThreshold())
result.show()
在 Spark 儲存庫中找到完整的範例程式碼,路徑為「examples/src/main/python/ml/variance_threshold_selector_example.py」。

有關 API 的更多詳細資訊,請參閱 VarianceThresholdSelector Scala 文件

import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  (2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  (3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  (4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  (5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  (6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)

val df = spark.createDataset(data).toDF("id", "features")

val selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"Output: Features with variance lower than" +
  s" ${selector.getVarianceThreshold} are removed.")
result.show()
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/VarianceThresholdSelectorExample.scala」中,尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 VarianceThresholdSelector Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VarianceThresholdSelector selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("Output: Features with variance lower than "
    + selector.getVarianceThreshold() + " are removed.");
result.show();
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaVarianceThresholdSelectorExample.java」中,尋找完整的範例程式碼。

局部敏感雜湊

局部敏感雜湊 (LSH) 是一類重要的雜湊技術,通常用於大型資料集的群集、近似最近鄰搜尋和異常值偵測。

LSH 的一般概念是使用函數系列(「LSH 系列」)將資料點雜湊到儲存區中,以便彼此接近的資料點很可能在同一個儲存區中,而彼此相距甚遠的資料點則很可能在不同的儲存區中。LSH 系列正式定義如下。

在度量空間 (M, d) 中,其中 M 是集合,dM 上的距離函數,LSH 系列是函數系列 h,滿足下列屬性:\[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \] 此 LSH 系列稱為 (r1, r2, p1, p2) 敏感。

在 Spark 中,不同的 LSH 系列在不同的類別中實作(例如 MinHash),而各個類別中都提供特徵轉換、近似相似性聯結和近似最近鄰的 API。

在 LSH 中,我們將雜湊到同一個儲存區的遠距離輸入特徵($d(p,q) \geq r2$)定義為假陽性,而將雜湊到不同儲存區的鄰近特徵($d(p,q) \leq r1$)定義為假陰性。

LSH 操作

我們描述 LSH 可用於的主要作業類型。已配適的 LSH 模型有方法可執行這些作業。

特徵轉換

特徵轉換是將雜湊值新增為新欄位的基本功能。這對於降維很有用。使用者可以透過設定 inputColoutputCol 來指定輸入和輸出欄位名稱。

LSH 也支援多個 LSH hash 表格。使用者可以透過設定 numHashTables 來指定 hash 表格的數量。這也用於近似相似性聯結和近似最近鄰居中的 OR 放大。增加 hash 表格的數量會提高準確度,但也會增加通訊成本和執行時間。

outputCol 的類型是 Seq[Vector],其中陣列的維度等於 numHashTables,而向量的維度目前設定為 1。在未來的版本中,我們將實作 AND 放大,以便使用者可以指定這些向量的維度。

近似相似性聯結

近似相似性聯結會採用兩個資料集,並近似傳回資料集中距離小於使用者定義閾值的列對。近似相似性聯結支援聯結兩個不同的資料集和自我聯結。自我聯結會產生一些重複的列對。

近似相似性聯結接受已轉換和未轉換的資料集作為輸入。如果使用未轉換的資料集,它會自動轉換。在此情況下,hash 簽章會建立為 outputCol

在已聯結的資料集中,可以在 datasetAdatasetB 中查詢原始資料集。會將距離欄位新增到輸出資料集中,以顯示傳回的每一列對之間的真實距離。

近似最近鄰居搜尋會採用一個資料集(特徵向量的)和一個金鑰(單一特徵向量),並近似傳回資料集中與該向量最接近的指定數量列。

近似最近鄰居搜尋接受已轉換和未轉換的資料集作為輸入。如果使用未轉換的資料集,它會自動轉換。在此情況下,hash 簽章會建立為 outputCol

會將距離欄位新增到輸出資料集中,以顯示每個輸出列和搜尋金鑰之間的真實距離。

注意:當 hash 儲存區中沒有足夠的候選項時,近似最近鄰居搜尋會傳回少於 k 的列。

LSH 演算法

歐幾里得距離的加權隨機投影

分桶隨機投影是歐幾里得距離的 LSH 家族。歐幾里得距離定義如下:\[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \]其 LSH 家族將特徵向量 $\mathbf{x}$ 投影到隨機單位向量 $\mathbf{v}$ 上,並將投影結果分為雜湊分桶:\[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \]其中 r 是使用者定義的分桶長度。分桶長度可用於控制雜湊分桶的平均大小(從而控制分桶數)。較大分桶長度(即較少分桶)會增加特徵雜湊到同一分桶的機率(增加真陽性和假陽性的數量)。

分桶隨機投影接受任意向量作為輸入特徵,並支援稀疏和稠密向量。

有關 API 的更多詳細資訊,請參閱 BucketedRandomProjectionLSH Python 文件

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 回應程式中,請尋找「examples/src/main/python/ml/bucketed_random_projection_lsh_example.py」以尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 BucketedRandomProjectionLSH Scala 文件

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = brp.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 回應程式中,請尋找「examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala」以尋找完整的範例程式碼。

有關 API 的更多詳細資訊,請參閱 BucketedRandomProjectionLSH Java 文件

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.dense(1.0, 1.0)),
  RowFactory.create(1, Vectors.dense(1.0, -1.0)),
  RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
  RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);

List<Row> dataB = Arrays.asList(
    RowFactory.create(4, Vectors.dense(1.0, 0.0)),
    RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
    RowFactory.create(6, Vectors.dense(0.0, 1.0)),
    RowFactory.create(7, Vectors.dense(0.0, -1.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

Vector key = Vectors.dense(1.0, 0.0);

BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes");

BucketedRandomProjectionLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
在 Spark 回應程式中,請尋找「examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java」以尋找完整的範例程式碼。

Jaccard 距離的 MinHash

MinHash 是 Jaccard 距離的 LSH 家族,其中輸入特徵是自然數集合。兩個集合的 Jaccard 距離由其交集和聯集的基數定義:\[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \] MinHash 對集合中的每個元素套用隨機雜湊函數 g,並取所有雜湊值的最小值:\[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]

MinHash 的輸入集合表示為二進制向量,其中向量索引表示元素本身,而向量中的非零值表示該元素在集合中的存在。雖然支援稠密和稀疏向量,但通常建議使用稀疏向量以提高效率。例如,Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) 表示空間中有 10 個元素。此集合包含元素 2、元素 3 和元素 5。所有非零值都視為二進制「1」值。

注意: MinHash 無法轉換空集合,這表示任何輸入向量都必須至少有 1 個非零項目。

有關 API 的更多詳細資訊,請參閱 MinHashLSH Python 文件

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 回存中的「範例/src/main/python/ml/min_hash_lsh_example.py」中尋找完整的範例程式碼。

請參閱 MinHashLSH Scala 文件 以取得更多關於 API 的詳細資訊。

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = mh.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 回存中的「範例/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala」中尋找完整的範例程式碼。

請參閱 MinHashLSH Java 文件 以取得更多關於 API 的詳細資訊。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

List<Row> dataB = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);

MinHashLSH mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes");

MinHashLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
在 Spark 回存中的「範例/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java」中尋找完整的範例程式碼。