Abracadabra

Movie Recommendation with MLlib

Spark Summit 2014

https://databricks-training.s3.amazonaws.com/index.html

we will use MLlib to make personalized movie recommendations tailored for you. We will work with 10 million ratings from 72,000 users on 10,000 movies, collected by MovieLens. This dataset is pre-loaded in your USB drive under data/movielens/large. For quick testing of your code, you may want to use a smaller dataset under data/movielens/medium, which contains 1 million ratings from 6000 users on 4000 movies.

DataSet

We will use two files from this MovieLens dataset: “ratings.dat” and “movies.dat”. All ratings are contained in the file “ratings.dat” and are in the following format:

1
UserID::MovieID::Rating::Timestamp

Movie information is in the file “movies.dat” and is in the following format:

1
MovieID::Title::Genres

Collaborative filtering

Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix, in our case, the user-movie rating matrix. MLlib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. In particular, we implement the alternating least squares (ALS) algorithm to learn these latent factors.

cf

Create training examples

https://github.com/ewanlee/spark-training

To make recommendation for you, we are going to learn your taste by asking you to rate a few movies. We have selected a small set of movies that have received the most ratings from users in the MovieLens dataset. You can rate those movies by running bin/rateMovies:

1
python bin/rateMovies

When you run the script, you should see prompt similar to the following:

1
2
Please rate the following movie (1-5 (best), or 0 if not seen):
Toy Story (1995):

After you’re done rating the movies, we save your ratings in personalRatings.txt in the MovieLens format, where a special user id 0 is assigned to you.

rateMovies allows you to re-rate the movies if you’d like to see how your ratings affect your recommendations.

If you don’t have python installed, please copy personalRatings.txt.template to personalRatings.txt and replace ?s with your ratings.

Setup

We will be using a standalone project template for this exercise.

  • In the training USB drive, this has been setup in

  • 1
    machine-learning/python/
  • You should find the following items in the directory:

  • MovieLensALS.py: Main Python program that you are going to edit, compile and run

  • solution: Directory containing the solution code

MovieLensALS.py should look as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
def parseRating(line):
"""
Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
"""
# ...
def parseMovie(line):
"""
Parses a movie record in MovieLens format movieId::movieTitle .
"""
# ...
def loadRatings(ratingsFile):
"""
Load ratings from file.
"""
# ...
def computeRmse(model, data, n):
"""
Compute RMSE (Root Mean Squared Error).
"""
# ...
if __name__ == "__main__":
if (len(sys.argv) != 3):
print "Usage: [usb root directory]/spark/bin/spark-submit --driver-memory 2g " + \
"MovieLensALS.py movieLensDataDir personalRatingsFile"
sys.exit(1)
# set up environment
conf = SparkConf() \
.setAppName("MovieLensALS") \
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
# load personal ratings
myRatings = loadRatings(sys.argv[2])
myRatingsRDD = sc.parallelize(myRatings, 1)
# load ratings and movie titles
movieLensHomeDir = sys.argv[1]
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
# movies is an RDD of (movieId, movieTitle)
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
# your code here
# clean up
sc.stop()

Let’s first take a closer look at our template code in a text editor, then we’ll start adding code to the template. Locate theMovieLensALS class and open it with a text editor.

1
2
usb/$ cd machine-learning/python
vim MovieLensALS.py # Or your editor of choice

For any Spark computation, we first create a SparkConf object and use it to create a SparkContext object. Since we will be using spark-submit to execute the programs in this tutorial (more on spark-submit in the next section), we only need to configure the executor memory allocation and give the program a name, e.g. “MovieLensALS”, to identify it in Spark’s web UI. In local mode, the web UI can be access at localhost:4040 during the execution of a program.

This is what it looks like in our template code:

1
2
3
4
conf = SparkConf() \
.setAppName("MovieLensALS") \
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

Next, the code uses the SparkContext to read in ratings. Recall that the rating file is a text file with “::” as the delimiter. The code parses each line to create a RDD for ratings that contains (Int, Rating) pairs. We only keep the last digit of the timestamp as a random key. The Rating class is a wrapper around the tuple (user: Int, product: Int, rating: Double).

1
2
3
4
movieLensHomeDir = sys.argv[1]
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)

Next, the code read in movie ids and titles, collect them into a movie id to title map.

1
2
3
4
5
def parseMovie(line):
fields = line.split("::")
return int(fields[0]), fields[1]
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())

Now, let’s make our first edit to add code to get a summary of the ratings.

1
2
3
4
5
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)

Running the program

Before we compute movie recommendations, here is a quick reminder on how you can run the program at any point during this exercise. As mentioned above, we will use spark-submit to execute your program in local mode for this tutorial.

Starting with Spark 1.0, spark-submit is the recommended way for running Spark applications, both on clusters and locally in standalone mode.

1
2
3
4
usb/$ cd machine-learning/python
# change the folder name from "medium" to "large" to run on the large data set
[usb root directory]/spark/bin/spark-submit MovieLensALS.py [usb root directory]/data/movielens/medium/ ../personalRatings.txt

You should see output similar to the following on your screen:

1
Got 1000209 ratings from 6040 users on 3706 movies.

Splitting training data

We will use MLlib’s ALS to train a MatrixFactorizationModel, which takes a RDD[Rating] object as input in Scala and RDD[(user, product, rating)] in Python. ALS has training parameters such as rank for matrix factors and regularization constants. To determine a good combination of the training parameters, we split the data into three non-overlapping subsets, named training, test, and validation, based on the last digit of the timestamp, and cache them. We will train multiple models based on the training set, select the best model on the validation set based on RMSE (Root Mean Squared Error), and finally evaluate the best model on the test set. We also add your ratings to the training set to make recommendations for you. We hold the training, validation, and test sets in memory by calling cache because we need to visit them multiple times.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6) \
.values() \
.union(myRatingsRDD) \
.repartition(numPartitions) \
.cache()
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
.values() \
.repartition(numPartitions) \
.cache()
test = ratings.filter(lambda x: x[0] >= 8).values().cache()
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)

After the split, you should see

1
Training: 602251, validation: 198919, test: 199049.

Training using ALS

In this section, we will use ALS.train to train a bunch of models, and select and evaluate the best. Among the training paramters of ALS, the most important ones are rank, lambda (regularization constant), and number of iterations. The trainmethod of ALS we are going to use is defined as the following:

1
2
3
4
5
class ALS(object):
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
# ...
return MatrixFactorizationModel(sc, mod)

deally, we want to try a large number of combinations of them in order to find the best one. Due to time constraint, we will test only 8 combinations resulting from the cross product of 2 different ranks (8 and 12), 2 different lambdas (1.0 and 10.0), and two different numbers of iterations (10 and 20). We use the provided method computeRmse to compute the RMSE on the validation set for each model. The model with the smallest RMSE on the validation set becomes the one selected and its RMSE on the test set is used as the final metric.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ranks = [8, 12]
lambdas = [1.0, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
model = ALS.train(training, rank, numIter, lmbda)
validationRmse = computeRmse(model, validation, numValidation)
print "RMSE (validation) = %f for the model trained with " % validationRmse + \
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
if (validationRmse < bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
testRmse = computeRmse(bestModel, test, numTest)
# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

Spark might take a minute or two to train the models. You should see the following on the screen:

1
The best model was trained using rank 8 and lambda 10.0, and its RMSE on test is 0.8808492431998702.

Recommending movies for you

As the last part of our tutorial, let’s take a look at what movies our model recommends for you. This is done by generating (0, movieId) pairs for all movies you haven’t rated and calling the model’s predict method to get predictions. 0 is the special user id assigned to you.

1
2
3
4
5
class MatrixFactorizationModel(object):
def predictAll(self, usersProducts):
# ...
return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
self._context, RatingDeserializer())

After we get all predictions, let us list the top 50 recommendations and see whether they look good to you.

1
2
3
4
5
6
7
8
myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]
print "Movies recommended for you:"
for i in xrange(len(recommendations)):
print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')

The output should be similar to

1
2
3
4
5
6
7
8
9
10
11
12
Movies recommended for you:
1: Silence of the Lambs, The (1991)
2: Saving Private Ryan (1998)
3: Godfather, The (1972)
4: Star Wars: Episode IV - A New Hope (1977)
5: Braveheart (1995)
6: Schindler's List (1993)
7: Shawshank Redemption, The (1994)
8: Star Wars: Episode V - The Empire Strikes Back (1980)
9: Pulp Fiction (1994)
10: Alien (1979)
...

Comparing to a naive baseline

Does ALS output a non-trivial model? We can compare the evaluation result with a naive baseline model that only outputs the average rating (or you may try one that outputs the average rating per movie). Computing the baseline’s RMSE is straightforward:

1
2
3
4
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."

The output should be similar to

1
The best model improves the baseline by 20.96%.

It seems obvious that the trained model would outperform the naive baseline. However, a bad combination of training parameters would lead to a model worse than this naive baseline. Choosing the right set of parameters is quite important for this task.

Solution code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
def parseRating(line):
"""
Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
"""
fields = line.strip().split("::")
return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))
def parseMovie(line):
"""
Parses a movie record in MovieLens format movieId::movieTitle .
"""
fields = line.strip().split("::")
return int(fields[0]), fields[1]
def loadRatings(ratingsFile):
"""
Load ratings from file.
"""
if not isfile(ratingsFile):
print "File %s does not exist." % ratingsFile
sys.exit(1)
f = open(ratingsFile, 'r')
ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
f.close()
if not ratings:
print "No ratings provided."
sys.exit(1)
else:
return ratings
def computeRmse(model, data, n):
"""
Compute RMSE (Root Mean Squared Error).
"""
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
.join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
.values()
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
if __name__ == "__main__":
if (len(sys.argv) != 3):
print "Usage: /path/to/spark/bin/spark-submit --driver-memory 2g " + \
"MovieLensALS.py movieLensDataDir personalRatingsFile"
sys.exit(1)
# set up environment
conf = SparkConf() \
.setAppName("MovieLensALS") \
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
# load personal ratings
myRatings = loadRatings(sys.argv[2])
myRatingsRDD = sc.parallelize(myRatings, 1)
# load ratings and movie titles
movieLensHomeDir = sys.argv[1]
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
# movies is an RDD of (movieId, movieTitle)
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
# split ratings into train (60%), validation (20%), and test (20%) based on the
# last digit of the timestamp, add myRatings to train, and cache them
# training, validation, test are all RDDs of (userId, movieId, rating)
numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6) \
.values() \
.union(myRatingsRDD) \
.repartition(numPartitions) \
.cache()
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
.values() \
.repartition(numPartitions) \
.cache()
test = ratings.filter(lambda x: x[0] >= 8).values().cache()
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)
# train models and evaluate them on the validation set
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
model = ALS.train(training, rank, numIter, lmbda)
validationRmse = computeRmse(model, validation, numValidation)
print "RMSE (validation) = %f for the model trained with " % validationRmse + \
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
if (validationRmse < bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
testRmse = computeRmse(bestModel, test, numTest)
# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)
# compare the best model with a naive baseline that always returns the mean rating
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."
# make personalized recommendations
myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]
print "Movies recommended for you:"
for i in xrange(len(recommendations)):
print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')
# clean up
sc.stop()