協同過濾 - 基於 RDD 的 API
協同過濾
協同過濾通常用於推薦系統。這些技術旨在填補使用者-項目關聯矩陣中的遺失項目。 spark.mllib
目前支援基於模型的協同過濾,其中使用者和產品由一組小的潛在因子描述,這些因子可用於預測遺失的項目。 spark.mllib
使用交替最小平方 (ALS)演算法來學習這些潛在因子。 spark.mllib
中的實作具有下列參數
- numBlocks 是用於並行化運算的區塊數目 (設為 -1 以自動設定)。
- rank 是要使用的特徵數目 (也稱為潛在因子的數目)。
- iterations 是要執行的 ALS 迭代次數。ALS 通常在 20 次迭代或更少次數內收斂至合理的解。
- lambda 指定 ALS 中的正規化參數。
- implicitPrefs 指定是要使用明確回饋 ALS 變體,還是改編為隱含回饋資料的變體。
- alpha 是適用於 ALS 隱含回饋變體的參數,用於控制對偏好觀察的基線信心。
明確回饋與隱含回饋
基於矩陣分解的協同過濾標準方法將使用者-項目矩陣中的項目視為使用者給予項目的明確偏好,例如使用者對電影給予評分。
在許多實際使用案例中,通常只能存取隱含回饋 (例如檢視、點擊、購買、按讚、分享等)。 spark.mllib
中用於處理此類資料的方法取自隱含回饋資料集的協同過濾。基本上,此方法並非嘗試直接對評分矩陣建模,而是將資料視為代表使用者動作觀察強度的數字 (例如點擊次數,或某人觀看電影的累積時間)。然後將這些數字與觀察到的使用者偏好信心程度相關聯,而非給予項目的明確評分。接著,模型會嘗試找出可預測使用者對項目預期偏好的潛在因子。
正規化參數的縮放
自 v1.1 起,我們透過使用者在更新使用者因素時產生的評分數量,或產品在更新產品因素時收到的評分數量,來調整求解每個最小平方問題的正規化參數 lambda
。此方法稱為「ALS-WR」,並在論文「Netflix 獎項的大規模平行協同過濾」中討論。它讓 lambda
較不受資料集規模的影響,因此我們可以將從抽樣子集學到的最佳參數套用至完整資料集,並預期有類似的效能。
範例
在下列範例中,我們載入評分資料。每一列包含一位使用者、一個產品和一個評分。我們使用預設的 ALS.train() 方法,它假設評分是明確的。我們透過評分預測的平均平方誤差來評估建議。
請參閱 ALS
Python 文件,以取得更多有關 API 的詳細資訊。
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
如果評分矩陣是由其他資訊來源衍生而來(即從其他訊號推論而來),您可以使用 trainImplicit 方法來取得更好的結果。
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)
在下列範例中,我們載入評分資料。每一列包含一位使用者、一個產品和一個評分。我們使用預設的 ALS.train() 方法,它假設評分是明確的。我們透過評分預測的平均平方誤差來評估建議模型。
請參閱 ALS
Scala 文件,以取得更多有關 API 的詳細資訊。
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"Mean Squared Error = $MSE")
// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
如果評分矩陣是由另一個資訊來源衍生而來(即從其他訊號推論而來),您可以使用 trainImplicit
方法來取得更好的結果。
val alpha = 0.01
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)
MLlib 的所有方法都使用 Java 友善類型,因此您可以像在 Scala 中一樣匯入並呼叫它們。唯一的注意事項是,這些方法採用 Scala RDD 物件,而 Spark Java API 使用一個獨立的 JavaRDD
類別。您可以透過在 JavaRDD
物件上呼叫 .rdd()
,將 Java RDD 轉換為 Scala RDD。以下是與 Scala 中提供的範例等效的自含式應用程式範例
請參閱 ALS
Java 文件,以取得更多有關 API 的詳細資訊。
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;
SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);
// Load and parse the data
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(s -> {
String[] sarray = s.split(",");
return new Rating(Integer.parseInt(sarray[0]),
Integer.parseInt(sarray[1]),
Double.parseDouble(sarray[2]));
});
// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts =
ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
.join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
double err = pair._1() - pair._2();
return err * err;
}).mean();
System.out.println("Mean Squared Error = " + MSE);
// Save and load model
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
"target/tmp/myCollaborativeFilter");
若要執行上述應用程式,請遵循 Spark 快速入門指南的 獨立應用程式 區段中提供的說明。請務必將 spark-mllib 也包含在建置檔案中,做為相依性。
教學
Spark Summit 2014 的 訓練練習 包含一個動手操作教學課程,說明如何使用 spark.mllib
進行 個人化電影推薦。