PySpark ML recommender
import pyspark.mllib
from pyspark.sql import *
from pyspark import *
from pyspark.rdd import *
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *
from pyspark.mllib.recommendation import *
import random
= SparkContext("local","music")
sc = SparkSession(sc)
spark
= "sampleUsers.txt"
sampleUsersPath = "sampleTracks.txt" sampleTracksPath
#Generate random data from sample
import random
= "randomusers.txt"
randomUsersPath
with open(sampleUsersPath) as sampleFile:
= random.sample(sampleFile.readlines(), 2000)
lines
= open(randomUsersPath, "w")
outF
outF.writelines(lines) outF.close()
0.1 Load
Load data as Spark’s dataframe.
Columns separated by “.
Counts higher than 20 are reduced to 20.
Name | Type |
---|---|
user | StringType() |
song | StringType() |
count | IntegerType() |
def load(path):
= (StructType()
csvSchema "user",StringType())
.add("song",StringType())
.add("count",IntegerType()))
.add(
= spark.read.load(path,format="csv",sep = "\t", schema = csvSchema)
df
= df.withColumn("count",
newdf
pyspark.sql.functions.when("count") > 20,20 #if row with count > 20 True, replace with 20
pyspark.sql.functions.col(
).otherwise("count") #otherwise replace with default count
pyspark.sql.functions.col(
)
)
#newdf[newdf['count'] > 20].show(5)
#newdf.show(5)
#print(df[df['count'] > 20].show(5))
return newdf
= load(sampleUsersPath).persist()
loaded loaded.show()
+--------------------+------------------+-----+
| user| song|count|
+--------------------+------------------+-----+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B| 2|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4| 1|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701| 1|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A| 1|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3| 1|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0| 1|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90| 1|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD| 2|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC| 2|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E| 1|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D| 2|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE| 2|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C| 5|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114| 2|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71| 2|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57| 2|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB| 2|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036| 1|
|17aa9f6dbdf753831...|SOJPFPR12AB018109D| 1|
|17aa9f6dbdf753831...|SOJUVYA12AB01860BA| 2|
+--------------------+------------------+-----+
only showing top 20 rows
'''load test'''
= StructType([\
correctCols "user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True)])
StructField(
= [("abc123", "123abc", 2)]
fakeData
= spark.createDataFrame(fakeData, correctCols)
fakeDf
assert loaded.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, loaded.dtypes)
assert loaded.filter('count>20').count() == 0, "counts higher than 20 was expected to be 0 but it was %s" % loaded.filter('count>20').count()
0.2 Convert
Convert user and song fields into doubles. Use mllib’s StringIndexer.
Name | Type |
---|---|
user | StringType() |
song | StringType() |
count | IntegerType() |
user_indexed | DoubleType() |
song_indexed | DoubleType() |
param df
Dataframe which has been created using method load()
return
Dataframe
def convert(df):
= ["user","song"]
ColInputs = ["user_indexed","song_indexed"]
ColOutputs = StringIndexer(inputCols=ColInputs,outputCols=ColOutputs)
stringIndexer = stringIndexer.fit(df)
model = model.transform(df)
newdf
#sorted(set([(i[0], i[1]) for i in td.select(td.id, td.indexed).collect()]),key=lambda x: x[0])
return newdf
#raise NotImplementedError()
= convert(loaded).persist()
converted converted.show()
+--------------------+------------------+-----+------------+------------+
| user| song|count|user_indexed|song_indexed|
+--------------------+------------------+-----+------------+------------+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B| 2| 162.0| 577.0|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4| 1| 162.0| 1053.0|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701| 1| 162.0| 1646.0|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A| 1| 162.0| 1945.0|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3| 1| 162.0| 2306.0|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0| 1| 162.0| 2702.0|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90| 1| 162.0| 3124.0|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD| 2| 810.0| 951.0|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC| 2| 810.0| 1728.0|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E| 1| 1151.0| 3824.0|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D| 2| 839.0| 803.0|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE| 2| 839.0| 5.0|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C| 5| 1317.0| 3948.0|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114| 2| 560.0| 586.0|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71| 2| 560.0| 245.0|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57| 2| 560.0| 4289.0|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB| 2| 115.0| 934.0|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036| 1| 115.0| 1013.0|
|17aa9f6dbdf753831...|SOJPFPR12AB018109D| 1| 115.0| 1958.0|
|17aa9f6dbdf753831...|SOJUVYA12AB01860BA| 2| 115.0| 1988.0|
+--------------------+------------------+-----+------------+------------+
only showing top 20 rows
'''convert test'''
= StructType([\
correctCols "user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True),\
StructField("user_indexed",DoubleType(),True),\
StructField("song_indexed",DoubleType(),True)])
StructField(
= [("abc123", "123abc", 2, 1.0, 2.0)]
fakeData
= spark.createDataFrame(fakeData, correctCols)
fakeDf
assert converted.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, converted.dtypes)
0.3 To Rating
create RDD of Rating classes.
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.Rating
The params of the Rating function (user=user_indexed
, product=song_indexed
and rating=count
)
param df
Dataframe which has user_indexed
and song_indexed
fields (output from convert()
method)
return
RDD containg Rating objects
def toRating(df):
= df.rdd
rowRDD #print(rowRDD.map(lambda x: x["user"]).take(4))
= rowRDD.map(lambda x: Rating(user=x["user_indexed"],product=x["song_indexed"],rating=x["count"]))
map1 #print(map1.take(4))
#r = Rating
return map1
= toRating(converted).persist()
rated 10) rated.take(
[Rating(user=162, product=577, rating=2.0),
Rating(user=162, product=1053, rating=1.0),
Rating(user=162, product=1646, rating=1.0),
Rating(user=162, product=1945, rating=1.0),
Rating(user=162, product=2306, rating=1.0),
Rating(user=162, product=2702, rating=1.0),
Rating(user=162, product=3124, rating=1.0),
Rating(user=810, product=951, rating=2.0),
Rating(user=810, product=1728, rating=2.0),
Rating(user=1151, product=3824, rating=1.0)]
'''toRating tests'''
= [Rating(user=162, product=577, rating=2.0),
rows =162, product=1053, rating=1.0),
Rating(user=162, product=1646, rating=1.0),
Rating(user=162, product=1945, rating=1.0),
Rating(user=162, product=2306, rating=1.0)]
Rating(userassert rated.take(5) == rows, "the first 5 rows were expected to be %s but they were %s" % (rows, rated.take(5))
0.4 Train ALS
train ALS model. https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering
param data
RDD of Rating objects that is created using toRating()
method.
param seed
value used for testing purposes.
return
trained ALS model
def trainALS(data, seed):
= ALS.train(data,rank=2,seed=seed)
model
return model
= random.randint(0, 10000)
rSeed = trainALS(rated, rSeed) model
0.5 Recommend Songs
Recommend 5 songs to a given user.
param model
trained ALS model
param user
user id converted (user_indexed) to Integer (with convert()
)
return
recommendations in Array
def recommendSongs(model, user):
= model.recommendProducts(user,5)
prediction
return prediction
= recommendSongs(model, 162)
recommends recommends
[Rating(user=162, product=391, rating=20.684566349984323),
Rating(user=162, product=1334, rating=19.216739435985964),
Rating(user=162, product=3283, rating=18.964715987473852),
Rating(user=162, product=157, rating=18.746516460716293),
Rating(user=162, product=3674, rating=18.70421335719181)]
'''model and recommendSongs tests'''
assert type(recommends[0]) == pyspark.mllib.recommendation.Rating, "the type was expected to be pyspark.mllib.recommendation.Rating but it was %s" % type(recommends[0])
assert recommends[0].user == 162, "the user was expected to be 162 but it was %s" % recommends[0].user
assert len(recommends) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recommends)
0.6 Get Song Names
Input: List RatingObject
Output: List (List SongName Artist)
Map each RatingObject to a Pair of SongName and Artist
Example Input:
[Rating(182412,218057,29.471691093542958), Rating(182412,206693,28.39630617887081), Rating(182412,230654,28.090021579109706), Rating(182412,200542,27.43900484648816), Rating(182412,254771,24.82362529344695)]
Output:
[["My Business","Guy"], ["The Man With The Bag","Floyd/Animal/Zoot"], ["Challenger","Growing"], ["Una Ragazza In Due", "I Giganti"], ["That Is Why", "Say Anything"]]
Return [[SongName,NameOfBand]…]
Convert unique_tracks into dataframe, Cols seperated by <SEP>
. The schema is shown b elow
Name | Type |
---|---|
track_id | StringType() |
song_id | StringType() |
artist | StringType() |
title | StringType() |
- Filter
converted
dataframe based on if thesong_indexed
value is found in the Rating object list.
Joinconverted
withsong_indexed
on “title” and “artist” columns. - Remove duplicates
- convert dataframe into rdd using collect()
param converted
Dataframe created using convert()
method
param ar
Array of Rating object produced using recommendSongs()
param path
path to unique track names file
return
corresponding song + artist names inside array, e.g., [[‘Our Song’, ‘Taylor Swift’],
[‘Boom (2006 Remastered Album Version)’, ‘P.O.D.’]]
def getSongNames(converted, ar, path):
= [i.product for i in ar]
RecommendedSongIndexList #print(RecommendedSongIndexList)
= (StructType()
csvSchema "track_id",StringType())
.add("song_id",StringType())
.add("artist",StringType())
.add("title",StringType())
.add(
)= spark.read.load(path,format="csv",sep = "<SEP>", schema = csvSchema)
df
#df.show(5)
#converted.show(5)
= converted[converted["song_indexed"].isin(RecommendedSongIndexList)]
filteredDF = filteredDF.join(df,filteredDF.song == df.song_id,"INNER").drop(df.song_id)
joinedDF = joinedDF[["title","artist"]].drop_duplicates()
outDF #filteredDF.show(3)
= outDF.rdd.map(list)
outRDD return outRDD.collect()
= getSongNames(converted, recommends, sampleTracksPath)
songNames songNames
[["The Emperor's New Clothes", "Sinéad O'Connor"],
['Alhos Verdes', 'GNR'],
['Whataya Want From Me', 'Adam Lambert'],
['Street Justice', 'MSTRKRFT'],
['North Sea Storm (Live)', 'Amon Amarth']]
'''getSongNames test'''
assert len(songNames) == 5, "the amount of song names was expected to be 5 but it was %s" % len(songNames)
assert type(songNames[0]) == list, "the type of a songNames element was expected to be list but it was %s" % type(songNames[0])
0.7 Recommend
Input: user_id Output: List (List SongName Artist) representing recommendations
Example return
[["My Business","Guy"], ["The Man With The Bag","Floyd/Animal/Zoot"], ["Challenger","Growing"], ["Una Ragazza In Due", "I Giganti"], ["That Is Why", "Say Anything"]]
param path
path to user data
param userId
user_id (String) that can be found from user dataset
param tracksPath
path to unique song names dataset
param seed
used for testing, put it into the trainsALS()
method
return
corresponding song + artist names inside array
def recommend(path, userId, tracksPath, seed):
def getUserIndex(uId,conversionTable):
= conversionTable.where(conversionTable.user == uId).select(conversionTable.user_indexed)
cDF #.where is same as .select
return int(cDF.rdd.first()["user_indexed"])
= load(path).persist()
loaded = convert(loaded).persist()
converted
= getUserIndex(userId,converted)
ConvertedUserId
= toRating(converted).persist()
rated = trainALS(rated,seed)
model = recommendSongs(model,ConvertedUserId)
recommends = getSongNames(converted,recommends,tracksPath)
songNames
return songNames
= recommend(sampleUsersPath, "b80344d063b5ccb3212f76538f3d9e43d87dca9e" ,sampleTracksPath, rSeed)
recom recom
[["The Emperor's New Clothes", "Sinéad O'Connor"],
['Alhos Verdes', 'GNR'],
['Whataya Want From Me', 'Adam Lambert'],
['Street Justice', 'MSTRKRFT'],
['North Sea Storm (Live)', 'Amon Amarth']]
'''recommend test'''
assert len(recom) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recom)
assert type(recom[0]) == list, "the type of a 'recommend' element was expected to be list but it was %s" % type(recom[0])
#test if the same user and seed returns the same as songNames
assert recom == songNames, "the song names were expected to be %s but they were %s" % (songNames, recom)
spark.catalog.clearCache()
sc.stop() spark.stop()