Yos Riady software craftsman 🌱

Collaborative Filtering with Apache Spark

Adventure Time

In this post, we’ll build a scalable movie recommendations system from scratch using Machine Learning (specifically, Collaborative Filtering) and Apache Spark.

Collaborative Filtering

Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from other users. The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B’s opinion on a different issue x than to have the opinion on x of a person chosen randomly.

The image below shows an example of predicting a user’s rating using collaborative filtering. At first, people rate different items (like videos, images, games). After that, the system is making predictions about a user’s rating for an item, which the user has not rated yet. These predictions are built upon the existing ratings of other users, who have similar ratings with the active user. For instance, in the image below the system has made a prediction, that the active user will not like the video.

Collaborative Filtering

Read more on collaborative filtering

Spark

Spark

Apache Spark is an open source cluster computing framework that lets you do large-scale data processing, with APIs in Java, Scala, Python, and R. Spark has efficient implementations of parallel transformations and actions built-in, such as map, reduce, filter, groupByKey, and many others.

It also has a set of supporting libraries and third-party packages such as GraphX and MLLib.

libraries and third-party packages

We’ll be using Python and pySpark since we’ll have convenient access to most of the scientific computing libraries such as NumPy and SciPy.

Refer here for Spark and pySpark installation instructions.

We’ll also use MLlib, Spark’s scalable machine learning library.

MLlib 1.3 contains the following algorithms:

  • linear SVM and logistic regression
  • classification and regression tree
  • random forest and gradient-boosted trees
  • recommendation via alternating least squares
  • clustering via k-means, Gaussian mixtures, and power - iteration clustering
  • topic modeling via latent Dirichlet allocation
  • singular value decomposition
  • linear regression with L1- and L2-regularization
  • isotonic regression
  • multinomial naive Bayes
  • frequent itemset mining via FP-growth
  • basic statistics
  • feature transformations

Refer to the MLlib guide for usage examples.

Machine Learning Process

Building ML applications is an iterative process that involves a sequence of steps. To build an ML application, we follow these general steps:

  1. Frame the core ML problem(s) in terms of what is observed and what answer you want the model to predict based on your needs.
  2. Collect, clean, and prepare data to make it suitable for consumption by ML model training algorithms. Visualize and analyze the data to run sanity checks to validate the quality of the data and to understand the data.
  3. Often, the raw data (input variables) and answer (target) are not represented in a way that can be used to train a highly predictive model. Therefore, you typically should attempt to construct more predictive input representations or features from the raw variables.
  4. Feed the resulting features to the learning algorithm to build models and evaluate the quality of the models on data that was held out from model building.
  5. Use the model to generate predictions of the target answer for new data instances.

Framing the Problem & Acquiring the Data

We’ll use an existing, cleaned, and validated movielens dataset to build a movie recommendations system. This way, we don’t have to to collect, clean, and validate a dataset on our own.

Feel free to start with the 100k dataset, although our system will be able to handle the 20M dataset as well.

Feature Extraction

Looking at the raw data and the accompanying README, we have the following input data:

u.data (ratings dataset)

user id item id rating timestamp
196 242 3 881250949

u.item (movies dataset)

movie id movie title release date video release date imdb url genres
1 Toy Story (1995) 01-Jan-1995 https://us.imdb.com/M/title-exact?Toy%20Story%20(1995) *omitted*

We’ll need to process this data into vectors containing relevant features or attributes before we can perform any analytics. This process is called feature extraction.

In order to use Spark and its API we will need to use a SparkContext. When running Spark, you start a new Spark application by creating a SparkContext. sc here refers to the SparkContext.

To verify you have a working Spark setup, try the following in your interpreter: type(sc) should return pyspark.context.SparkContext

Parsing the two files yields two Resilient Distributed Datasets (RDDs):

import sys
import os

baseDir = os.path.join('spark-tutorial')
inputPath = os.path.join('movielens')

ratingsFilename = os.path.join(baseDir, inputPath, 'u.data')
moviesFilename = os.path.join(baseDir, inputPath, 'u.item')

numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)

def get_ratings_tuple(entry):
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])

def get_movie_tuple(entry):
    items = entry.split('::')
    return int(items[0]), items[1]

ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()
  • For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating).
  • For each line in the movies dataset, we create a tuple of (MovieID, Title).

ratingsRDD

user id movie id rating
196 242 3

moviesRDD

user id rating timestamp
196 242 3

Split your Dataset

Before we jump into using machine learning, we need to break up the ratingsRDD dataset into three pieces:

  • A training set (RDD), which we will use to train models
  • A validation set (RDD), which we will use to choose the best model based on each’s Root Mean Squared Error (RMSE)
  • A test set (RDD), which we will use for our experiments

To randomly split the dataset into the multiple groups, we can use the pySpark randomSplit() transformation. randomSplit() takes a set of splits and and seed and returns multiple RDDs.

trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)

Validating Your Model

In the next part, we’ll generate a few models, score each, and decide which model is best. We will use the Root Mean Square Error (RMSE) or Root Mean Square Deviation (RMSD) to compute the error of each model.

RMSE is a frequently used measure of the differences between values (sample and population values) predicted by a model or an estimator and the values actually observed. The RMSD represents the sample standard deviation of the differences between predicted values and observed values. These individual differences are called residuals when the calculations are performed over the data sample that was used for estimation, and are called prediction errors when computed out-of-sample. The RMSE serves to aggregate the magnitudes of the errors in predictions for various times into a single measure of predictive power. RMSE is a good measure of accuracy, but only to compare forecasting errors of different models for a particular variable and not between variables, as it is scale-dependent.

Versions of Spark MLlib beginning with Spark 1.4 include a RegressionMetrics module that can be used to compute the RMSE.

from pyspark.mllib.evaluation import RegressionMetrics

predictionAndObservationsRDD = sc.parallelize([(2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])
metrics = RegressionMetrics(predictionAndObservationsRDD)
metrics.rootMeanSquaredError
>> 0.61...

Training Your Model

The process of training an ML model involves providing an ML algorithm (that is, the learning algorithm) with training data to learn from. The term ML model refers to the model artifact that is created by the training process.

The training data must contain the correct answer, which is known as a target or target attribute. The learning algorithm finds patterns in the training data that map the input data attributes to the target (the answer that you want to predict), and it outputs an ML model that captures these patterns.

You can use the ML model to get predictions on new data for which you do not know the target. For example, let’s say that you want to train an ML model to predict if an email is spam or not spam.

We will use the MLlib implementation of Alternating Least Squares, ALS.train(). ALS takes a training dataset (RDD) and several sparameters that control the model creation process. To determine the best values for the parameters, we will use ALS to train several models before choosing the best model.

from pyspark.mllib.recommendation import ALS

# For the prediction step, create an input RDD, validationForPredictRDD, consisting of (UserID, MovieID) pairs
validationForPredictRDD = validationRDD.map(lambda t: (t[0], t[1]))

# Model Parameters
seed = 5L
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.03

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
	# Create a model with three parameters: an RDD consisting of tuples of the form (UserID, MovieID, rating) used to train the model, an integer rank (4, 8, or 12), a number of iterations to execute, and a regularization coefficient
    model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
    # Predict rating values, predictAll accepts an RDD with each entry in the format (userID, movieID) and outputs an RDD with each entry in the format (userID, movieID, rating)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    # Evaluate the quality of the model by using RMSE, using RegressionMetrics
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < minError:
        minError = error
        bestRank = rank

def computeError(predictedRDD, observedRDD):
	predictedValidatedPairRDD = (predictedRDD.map(lambda(t: t[2])).zip(observedRDD.map(lambda(t: t[2]))
	return RegressionMetrics(predictedValidatedPairRDD).rootMeanSquaredError

Testing Your Model

Finally, we can test the quality of our best model by ALS.train() using our bestRank parameter.

We used the trainingRDD and validationRDD datasets to select the best model. Since we used these two datasets to determine what model is best, we cannot use them to test how good the model is - otherwise we would be very vulnerable to overfitting.

Your model is overfitting your training data when you see that the model performs well on the training data but does not perform well on the real life data. This is because the model is memorizing the data it has seen and is unable to generalize to unseen examples.

To decide how good our model is, we need to use the testRDD dataset.

The steps are similar as the previous step:

seed = 5L
iterations = 5
regularizationParameter = 0.1
rank = bestRank

myModel = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,                       lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda t: (t[0], t[1]))
predictedTestRDD = myModel.predictAll(testForPredictingRDD)
testRMSE = computeError(testRDD, predictedTestRDD)

print 'The model had a RMSE on the test set of %s' % testRMSE

That’s it! You’ve trained, validated, and tested your ML model! Now, you can use the model to predict how users will rate any as-yet-unrated movies.

First, we can add a new user’s rated movies to the training dataset and train a new model herModel using Spark’s union transformation. Given an RDD unratedMoviesRDD containing a user’s unrated movies, we can then call herModel.predictAll(unratedMoviesRDD) to generate that user’s predicted ratings! Remember, these predictions are built upon the existing ratings of other users, who have similar ratings with the active user. Mathematical!

References

Author

Yos is a software craftsman based in Singapore.

📬 Subscribe to my newsletter

Get notified of my latest articles by providing your email below.


Going Serverless book

Interested to find out more about serverless? Going Serverless teaches you how to build scalable applications with the Serverless framework and AWS Lambda. You'll learn how to design, develop, test, deploy, and secure Serverless applications from planning to production.

Learn More →