We start by importing training data and creating a Resilient Distributed Dataset (RDD) of users grouped with their review history.
users_with_recs = sc.textFile('/train.txt').map(lambda x: x.split(u'\t'))\ #Short Comments Here
.map(lambda x: (int(x[0]), [int(x[1]), float(x[2])/5]))\
.groupByKey()\
.cache()
users_with_recs.first()
Next since we are doing Item based collaborative filtering we want to create an RDD of item-item pairs where both items have been rated by the same user. Then we group each pair along with its list of ratings. Formatted: $$((item1,item2),[(item1rating1,item2rating1),(item1rating2,item2rating2)...])$$
from itertools import combinations
def itemItemPairs(user_id,items_with_rating):
for item1,item2 in combinations(items_with_rating,2):
return (item1[0],item2[0]),(item1[1],item2[1])
item_item_pairs = users_with_recs.filter(lambda p: len(p[1]) > 1)\
.map(lambda p: itemItemPairs(p[0],p[1]))\
.groupByKey()
item_item_pairs.take(3)
From the item-item ratings lists above we create an item-item similarity matrix using the simple Euclidean distance metric. We then preprocess that matrix into a dictionary and broadcast it so that it can be used to make recommendations.
from math import sqrt
def calcSim(item_pair, rating_pair):
similarity = 0
for rating in rating_pair:
similarity += sqrt(pow(rating[0] - rating[1], 2))
return (item_pair, similarity)
def nearest(item_id, items_and_sims, n):
items_and_sims = list(items_and_sims)
items_and_sims.sort(key=lambda x: x[1], reverse=True)
return item_id, items_and_sims[:n]
item_item_similarities = item_item_pairs.map(lambda p: calcSim(p[0], p[1]))\
.map(lambda x: (x[0][0], (x[0][1], x[1])))\
.groupByKey()\
.map(lambda x: nearest(x[0],x[1],50)).collect()
similarity_dict = {}
for (item_id, items_and_sims) in item_item_similarities:
similarity_dict[item_id] = items_and_sims
similarity_dict = sc.broadcast(similarity_dict)
Next we use the similarity dictionary to make recommendations for each user using the already existing ratings history for each one. We keep only the top 500 recommendations for simplicity.
def makeRecommendation(user_id, user_ratings, item_item_sim_dict):
totals = {}
sim_sums = {}
for (item_id, rating) in user_ratings:
try:
item_similarity_list = item_item_sim_dict[item_id]
except:
continue
for (item_id2, similarity) in item_similarity_list:
if item_id2 != item_id:
try:
totals[item_id2] += float(rating) * float(similarity)
sim_sums[item_id2] += float(similarity)
except:
totals[item_id2] = float(rating) * float(similarity)
sim_sums[item_id2] = float(similarity)
recommendations_list = []
for item, total in totals.items():
if sim_sums[item] != 0:
recommendations_list.append((total/sim_sums[item], item))
recommendations_list.sort(reverse=True)
return (user_id, recommendations_list[:500])
user_recs = users_with_recs.map(lambda x: makeRecommendation(x[0], x[1], similarity_dict.value))
Finally we will evaluate our recommendations using the mean average error. Since the entire user recommendations list it too large to read to the driver program we will evaluate the algorithms on a subset of the predictions.
user_recs_sub_list = user_recs.take(2000)
user_recs_id_list = [user_id for (user_id, recs_list) in user_recs_sub_list]
input_ratings = {}
test_file = sc.textFile('/test.txt').map(lambda x: x.split(u'\t')).map(lambda x: (int(x[0]), int(x[1]), float(x[2])/5))\
.filter(lambda x: x[0] in user_recs_id_list).collect()
for line in test_file:
user = line[0]
item = line[1]
rating = line[2]
try:
input_ratings[user] += [(item, rating)]
except:
input_ratings[user] = [(item, rating)]
prediction_list = []
for (user, ratings) in user_recs_sub_list:
for (rating, item) in ratings:
for (input_item, input_rating) in input_ratings[user]:
if str(item) == str(input_item):
prediction_list.append((rating, input_rating))
sum_error = 0
for (prediction, rating) in prediction_list:
sum_error += abs(prediction - rating)
print "Mean Average Error: " + str(sum_error/len(prediction_list))
Conclusion: Using Apache Spark we were able to implement a version of the collaborative filtering algorithm that easily scales to large datasets. Additionally we found a workflow that would be excellent for exploratory data science and prototyping machine learning algorithms on large datasets as it provides access to large datasets in an easy development and testing environment. We could could greatly increase the accuracy and efficiency of this algorithm by experimenting with different similarity metrics and by down sampling our training vectors. However my goal with this project was simply to demonstrate the possibility of such an implementation. Apache Spark provides an optimized implementation of collaborative filtering for anyone looking to implement this in a production environment.
I found the following links extremely valuable, especially the top two for setting up the cluster.
Setting up iPython with PySpark