協同過濾

協同過濾

協同過濾通常用於推薦系統。這些技術旨在填補使用者項目關聯矩陣中遺失的項目。spark.ml目前支援基於模型的協同過濾,其中使用者和產品由一組小的潛在因子描述,可用於預測遺失的項目。spark.ml使用交替最小平方法 (ALS)演算法來學習這些潛在因子。spark.ml中的實作具有以下參數

注意:ALS 的基於 DataFrame 的 API 目前僅支援使用者和項目 ID 的整數。使用者和項目 ID 欄位支援其他數字類型,但 ID 必須在整數值範圍內。

明確回饋與隱含回饋

基於矩陣分解的協同過濾標準方法將使用者-項目矩陣中的條目視為使用者給予項目的明確偏好,例如使用者給予電影的評分。

在許多實際使用案例中,通常只能存取隱含回饋(例如檢視、按讚、購買、按讚、分享等)。spark.ml 中用於處理此類資料的方法取自 Collaborative Filtering for Implicit Feedback Datasets。基本上,此方法並非嘗試直接對評分矩陣進行建模,而是將資料視為表示使用者動作觀察結果的強度的數字(例如按讚次數或某人觀看電影的累積時間)。然後,這些數字與對觀察到的使用者偏好的信心程度相關,而不是給予項目的明確評分。接著,模型會嘗試找出可用於預測使用者對項目預期偏好的潛在因子。

正則化參數的縮放

我們在解決每個最小平方問題時,會根據使用者在更新使用者因子時產生的評分數目或產品在更新產品因子時收到的評分數目,調整正規化參數 regParam 的比例。此方法稱為「ALS-WR」,並在論文「Large-Scale Parallel Collaborative Filtering for the Netflix Prize」中討論。它讓 regParam 較不受資料集規模的影響,因此我們可以將從取樣子集學到的最佳參數套用至完整資料集,並預期有類似的效能。

冷啟動策略

使用 ALSModel 進行預測時,通常會在測試資料集中遇到在訓練模型期間不存在的使用者和/或項目。這通常會在兩種情況下發生

  1. 在實際應用中,對於沒有評分記錄且模型未受過訓練的新使用者或項目(這是「冷啟動問題」)。
  2. 在交叉驗證期間,資料會在訓練和評估集之間分割。當使用 Spark 中的 CrossValidatorTrainValidationSplit 中的簡單隨機分割時,實際上會非常常在評估集中遇到不在訓練集中的使用者和/或項目

在預設情況下,當使用者和/或項目因子不出現在模型中時,Spark 會在 ALSModel.transform 期間指派 NaN 預測。這在生產系統中可能很有用,因為它表示新的使用者或項目,因此系統可以決定使用某些替代方案作為預測。

然而,這在交叉驗證期間是不理想的,因為任何 NaN 預測值都會導致評估指標的 NaN 結果(例如,在使用 RegressionEvaluator 時)。這使得模型選擇變得不可能。

Spark 允許使用者將 coldStartStrategy 參數設定為「drop」,以刪除包含 NaN 值的預測 DataFrame 中的任何列。然後評估指標將在非 NaN 資料上計算,並且會有效。此參數的使用在下方的範例中說明。

注意:目前支援的冷啟動策略為「nan」(上面提到的預設行為)和「drop」。未來可能會支援更多策略。

範例

在以下範例中,我們從 MovieLens 資料集 載入評分資料,每一列包含使用者、電影、評分和時間戳記。然後我們訓練一個 ALS 模型,它預設假設評分是明確的(implicitPrefsFalse)。我們透過測量評分預測的均方根誤差來評估推薦模型。

請參閱 ALS Python 文件,以取得更多關於 API 的詳細資料。

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
在 Spark 回應程式中的「examples/src/main/python/ml/als_example.py」中,找到完整的範例程式碼。

如果評分矩陣來自其他資訊來源(即從其他訊號推論),您可以將 implicitPrefs 設定為 True 以取得更好的結果

als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")

在以下範例中,我們從 MovieLens 資料集 載入評分資料,每一列包含使用者、電影、評分和時間戳記。然後,我們訓練一個 ALS 模型,預設假設評分是明確的(implicitPrefsfalse)。我們透過評量評分預測的均方根誤差來評估推薦模型。

請參閱 ALS Scala 文件 以取得更多 API 詳細資料。

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
在 Spark 儲存庫中的「examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala」中尋找完整的範例程式碼。

如果評分矩陣來自其他資訊來源(也就是從其他訊號推論而來),您可以將 implicitPrefs 設為 true 以取得更好的結果

val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

在以下範例中,我們從 MovieLens 資料集 載入評分資料,每一列包含使用者、電影、評分和時間戳記。然後,我們訓練一個 ALS 模型,預設假設評分是明確的(implicitPrefsfalse)。我們透過評量評分預測的均方根誤差來評估推薦模型。

請參閱 ALS Java 文件 以取得更多 API 詳細資料。

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;

public static class Rating implements Serializable {
  private int userId;
  private int movieId;
  private float rating;
  private long timestamp;

  public Rating() {}

  public Rating(int userId, int movieId, float rating, long timestamp) {
    this.userId = userId;
    this.movieId = movieId;
    this.rating = rating;
    this.timestamp = timestamp;
  }

  public int getUserId() {
    return userId;
  }

  public int getMovieId() {
    return movieId;
  }

  public float getRating() {
    return rating;
  }

  public long getTimestamp() {
    return timestamp;
  }

  public static Rating parseRating(String str) {
    String[] fields = str.split("::");
    if (fields.length != 4) {
      throw new IllegalArgumentException("Each line must contain 4 fields");
    }
    int userId = Integer.parseInt(fields[0]);
    int movieId = Integer.parseInt(fields[1]);
    float rating = Float.parseFloat(fields[2]);
    long timestamp = Long.parseLong(fields[3]);
    return new Rating(userId, movieId, rating, timestamp);
  }
}

JavaRDD<Rating> ratingsRDD = spark
  .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
  .map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

// Build the recommendation model using ALS on the training data
ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");
ALSModel model = als.fit(training);

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);

RegressionEvaluator evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);

// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);

// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
在 Spark 儲存庫中的「examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java」中尋找完整的範例程式碼。

如果評分矩陣來自其他資訊來源(也就是從其他訊號推論而來),您可以將 implicitPrefs 設為 true 以取得更好的結果

ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");

請參閱 R API 文件 以取得更多詳細資料。

# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
             list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df

# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
                   itemCol = "movieId", ratingCol = "rating")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 儲存庫中的「examples/src/main/r/ml/als.R」中尋找完整的範例程式碼。