Scalable Collaborative Filtering with Apache Spark on EC2 Cluster

Author: Paul Calley, paul.c.calley@gmail.com

This notebook demonstrates training a simple item based collaborative filtering algorithm on a cluster using Apache Spark. The algorithm is run on a 10 node cluster of m1.xlarge EC2 instances deployed with the spark-ec2 script.

We start by importing training data and creating a Resilient Distributed Dataset (RDD) of users grouped with their review history.

In [1]:
users_with_recs = sc.textFile('/train.txt').map(lambda x: x.split(u'\t'))\ #Short Comments Here
                      .map(lambda x: (int(x[0]), [int(x[1]), float(x[2])/5]))\
                .groupByKey()\
                .cache()
            
users_with_recs.first()
Out[1]:
(1093634, <pyspark.resultiterable.ResultIterable at 0x257c590>)

Next since we are doing Item based collaborative filtering we want to create an RDD of item-item pairs where both items have been rated by the same user. Then we group each pair along with its list of ratings. Formatted: $$((item1,item2),[(item1rating1,item2rating1),(item1rating2,item2rating2)...])$$

In [2]:
from itertools import combinations
def itemItemPairs(user_id,items_with_rating):
    for item1,item2 in combinations(items_with_rating,2):
        return (item1[0],item2[0]),(item1[1],item2[1])

item_item_pairs = users_with_recs.filter(lambda p: len(p[1]) > 1)\
                    .map(lambda p: itemItemPairs(p[0],p[1]))\
                    .groupByKey()
item_item_pairs.take(3)
Out[2]:
[((123, 137), <pyspark.resultiterable.ResultIterable at 0x2582090>),
 ((499, 1329), <pyspark.resultiterable.ResultIterable at 0x1825bd0>),
 ((1499, 5139), <pyspark.resultiterable.ResultIterable at 0x2582710>)]

From the item-item ratings lists above we create an item-item similarity matrix using the simple Euclidean distance metric. We then preprocess that matrix into a dictionary and broadcast it so that it can be used to make recommendations.

In [3]:
from math import sqrt
def calcSim(item_pair, rating_pair):
    similarity = 0
    for rating in rating_pair:
        similarity += sqrt(pow(rating[0] - rating[1], 2))
    return (item_pair, similarity)

def nearest(item_id, items_and_sims, n):
    items_and_sims = list(items_and_sims)
    items_and_sims.sort(key=lambda x: x[1], reverse=True)
    return item_id, items_and_sims[:n]

item_item_similarities = item_item_pairs.map(lambda p: calcSim(p[0], p[1]))\
               .map(lambda x: (x[0][0], (x[0][1], x[1])))\ 
               .groupByKey()\
               .map(lambda x: nearest(x[0],x[1],50)).collect()
            
similarity_dict = {}
for (item_id, items_and_sims) in item_item_similarities:
    similarity_dict[item_id] = items_and_sims
similarity_dict = sc.broadcast(similarity_dict)

Next we use the similarity dictionary to make recommendations for each user using the already existing ratings history for each one. We keep only the top 500 recommendations for simplicity.

In [4]:
def makeRecommendation(user_id, user_ratings, item_item_sim_dict):
    totals = {}
    sim_sums = {}
    for (item_id, rating) in user_ratings:
        try:
            item_similarity_list = item_item_sim_dict[item_id]
        except:
            continue
        for (item_id2, similarity) in item_similarity_list:
            if item_id2 != item_id:
                try:
                    totals[item_id2] += float(rating) * float(similarity)
                    sim_sums[item_id2] += float(similarity)
                except:
                    totals[item_id2] = float(rating) * float(similarity)
                    sim_sums[item_id2] = float(similarity)
                    
    recommendations_list = []
    for item, total in totals.items():
        if sim_sums[item] != 0:
            recommendations_list.append((total/sim_sums[item], item))
    
    recommendations_list.sort(reverse=True)
    return (user_id, recommendations_list[:500])
            
user_recs = users_with_recs.map(lambda x: makeRecommendation(x[0], x[1], similarity_dict.value))

Finally we will evaluate our recommendations using the mean average error. Since the entire user recommendations list it too large to read to the driver program we will evaluate the algorithms on a subset of the predictions.

In [5]:
user_recs_sub_list = user_recs.take(2000)
user_recs_id_list = [user_id for (user_id, recs_list) in user_recs_sub_list]

input_ratings = {}
test_file = sc.textFile('/test.txt').map(lambda x: x.split(u'\t')).map(lambda x: (int(x[0]), int(x[1]), float(x[2])/5))\
                                    .filter(lambda x: x[0] in user_recs_id_list).collect()
for line in test_file:
    user =  line[0]
    item = line[1]
    rating = line[2]
    try:
        input_ratings[user] += [(item, rating)]
    except:
        input_ratings[user] = [(item, rating)]
        
prediction_list = []
for (user, ratings) in user_recs_sub_list:
    for (rating, item) in ratings:
        for (input_item, input_rating) in input_ratings[user]:
             if str(item) == str(input_item):
                    prediction_list.append((rating, input_rating))
                    
sum_error = 0
for (prediction, rating) in prediction_list:
    sum_error += abs(prediction - rating)
print "Mean Average Error: " + str(sum_error/len(prediction_list))
Mean Average Error: 0.222786636775

Conclusion: Using Apache Spark we were able to implement a version of the collaborative filtering algorithm that easily scales to large datasets. Additionally we found a workflow that would be excellent for exploratory data science and prototyping machine learning algorithms on large datasets as it provides access to large datasets in an easy development and testing environment. We could could greatly increase the accuracy and efficiency of this algorithm by experimenting with different similarity metrics and by down sampling our training vectors. However my goal with this project was simply to demonstrate the possibility of such an implementation. Apache Spark provides an optimized implementation of collaborative filtering for anyone looking to implement this in a production environment.

I found the following links extremely valuable, especially the top two for setting up the cluster.

Provisioning the EC2 Cluster

Setting up iPython with PySpark

The original data

Collaborative Filtering on MPP systems

A nice Collaborative Filtering introduction