PySpark ML recommender
import pyspark.mllib
from pyspark.sql import *
from pyspark import *
from pyspark.rdd import *
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *
from pyspark.mllib.recommendation import *
import random
sc = SparkContext("local","music")
spark = SparkSession(sc)
sampleUsersPath = "sampleUsers.txt"
sampleTracksPath = "sampleTracks.txt"#Generate random data from sample
import random
randomUsersPath = "randomusers.txt"
with open(sampleUsersPath) as sampleFile:
lines = random.sample(sampleFile.readlines(), 2000)
outF = open(randomUsersPath, "w")
outF.writelines(lines)
outF.close()0.1 Load
Load data as Spark’s dataframe.
Columns separated by “.
Counts higher than 20 are reduced to 20.
| Name | Type |
|---|---|
| user | StringType() |
| song | StringType() |
| count | IntegerType() |
def load(path):
csvSchema = (StructType()
.add("user",StringType())
.add("song",StringType())
.add("count",IntegerType()))
df = spark.read.load(path,format="csv",sep = "\t", schema = csvSchema)
newdf = df.withColumn("count",
pyspark.sql.functions.when(
pyspark.sql.functions.col("count") > 20,20 #if row with count > 20 True, replace with 20
).otherwise(
pyspark.sql.functions.col("count") #otherwise replace with default count
)
)
#newdf[newdf['count'] > 20].show(5)
#newdf.show(5)
#print(df[df['count'] > 20].show(5))
return newdf
loaded = load(sampleUsersPath).persist()
loaded.show()+--------------------+------------------+-----+
| user| song|count|
+--------------------+------------------+-----+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B| 2|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4| 1|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701| 1|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A| 1|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3| 1|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0| 1|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90| 1|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD| 2|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC| 2|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E| 1|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D| 2|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE| 2|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C| 5|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114| 2|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71| 2|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57| 2|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB| 2|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036| 1|
|17aa9f6dbdf753831...|SOJPFPR12AB018109D| 1|
|17aa9f6dbdf753831...|SOJUVYA12AB01860BA| 2|
+--------------------+------------------+-----+
only showing top 20 rows
'''load test'''
correctCols = StructType([\
StructField("user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True)])
fakeData = [("abc123", "123abc", 2)]
fakeDf = spark.createDataFrame(fakeData, correctCols)
assert loaded.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, loaded.dtypes)
assert loaded.filter('count>20').count() == 0, "counts higher than 20 was expected to be 0 but it was %s" % loaded.filter('count>20').count()0.2 Convert
Convert user and song fields into doubles. Use mllib’s StringIndexer.
| Name | Type |
|---|---|
| user | StringType() |
| song | StringType() |
| count | IntegerType() |
| user_indexed | DoubleType() |
| song_indexed | DoubleType() |
param df Dataframe which has been created using method load()
return Dataframe
def convert(df):
ColInputs = ["user","song"]
ColOutputs = ["user_indexed","song_indexed"]
stringIndexer = StringIndexer(inputCols=ColInputs,outputCols=ColOutputs)
model = stringIndexer.fit(df)
newdf = model.transform(df)
#sorted(set([(i[0], i[1]) for i in td.select(td.id, td.indexed).collect()]),key=lambda x: x[0])
return newdf
#raise NotImplementedError()converted = convert(loaded).persist()
converted.show()+--------------------+------------------+-----+------------+------------+
| user| song|count|user_indexed|song_indexed|
+--------------------+------------------+-----+------------+------------+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B| 2| 162.0| 577.0|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4| 1| 162.0| 1053.0|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701| 1| 162.0| 1646.0|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A| 1| 162.0| 1945.0|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3| 1| 162.0| 2306.0|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0| 1| 162.0| 2702.0|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90| 1| 162.0| 3124.0|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD| 2| 810.0| 951.0|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC| 2| 810.0| 1728.0|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E| 1| 1151.0| 3824.0|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D| 2| 839.0| 803.0|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE| 2| 839.0| 5.0|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C| 5| 1317.0| 3948.0|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114| 2| 560.0| 586.0|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71| 2| 560.0| 245.0|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57| 2| 560.0| 4289.0|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB| 2| 115.0| 934.0|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036| 1| 115.0| 1013.0|
|17aa9f6dbdf753831...|SOJPFPR12AB018109D| 1| 115.0| 1958.0|
|17aa9f6dbdf753831...|SOJUVYA12AB01860BA| 2| 115.0| 1988.0|
+--------------------+------------------+-----+------------+------------+
only showing top 20 rows
'''convert test'''
correctCols = StructType([\
StructField("user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True),\
StructField("user_indexed",DoubleType(),True),\
StructField("song_indexed",DoubleType(),True)])
fakeData = [("abc123", "123abc", 2, 1.0, 2.0)]
fakeDf = spark.createDataFrame(fakeData, correctCols)
assert converted.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, converted.dtypes)0.3 To Rating
create RDD of Rating classes.
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.Rating
The params of the Rating function (user=user_indexed, product=song_indexedand rating=count)
param df Dataframe which has user_indexed and song_indexed fields (output from convert() method)
return RDD containg Rating objects
def toRating(df):
rowRDD = df.rdd
#print(rowRDD.map(lambda x: x["user"]).take(4))
map1 = rowRDD.map(lambda x: Rating(user=x["user_indexed"],product=x["song_indexed"],rating=x["count"]))
#print(map1.take(4))
#r = Rating
return map1rated = toRating(converted).persist()
rated.take(10)[Rating(user=162, product=577, rating=2.0),
Rating(user=162, product=1053, rating=1.0),
Rating(user=162, product=1646, rating=1.0),
Rating(user=162, product=1945, rating=1.0),
Rating(user=162, product=2306, rating=1.0),
Rating(user=162, product=2702, rating=1.0),
Rating(user=162, product=3124, rating=1.0),
Rating(user=810, product=951, rating=2.0),
Rating(user=810, product=1728, rating=2.0),
Rating(user=1151, product=3824, rating=1.0)]
'''toRating tests'''
rows = [Rating(user=162, product=577, rating=2.0),
Rating(user=162, product=1053, rating=1.0),
Rating(user=162, product=1646, rating=1.0),
Rating(user=162, product=1945, rating=1.0),
Rating(user=162, product=2306, rating=1.0)]
assert rated.take(5) == rows, "the first 5 rows were expected to be %s but they were %s" % (rows, rated.take(5))0.4 Train ALS
train ALS model. https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering
param data RDD of Rating objects that is created using toRating() method.
param seed value used for testing purposes.
return trained ALS model
def trainALS(data, seed):
model = ALS.train(data,rank=2,seed=seed)
return modelrSeed = random.randint(0, 10000)
model = trainALS(rated, rSeed)0.5 Recommend Songs
Recommend 5 songs to a given user.
param model trained ALS model
param user user id converted (user_indexed) to Integer (with convert())
return recommendations in Array
def recommendSongs(model, user):
prediction = model.recommendProducts(user,5)
return predictionrecommends = recommendSongs(model, 162)
recommends[Rating(user=162, product=391, rating=20.684566349984323),
Rating(user=162, product=1334, rating=19.216739435985964),
Rating(user=162, product=3283, rating=18.964715987473852),
Rating(user=162, product=157, rating=18.746516460716293),
Rating(user=162, product=3674, rating=18.70421335719181)]
'''model and recommendSongs tests'''
assert type(recommends[0]) == pyspark.mllib.recommendation.Rating, "the type was expected to be pyspark.mllib.recommendation.Rating but it was %s" % type(recommends[0])
assert recommends[0].user == 162, "the user was expected to be 162 but it was %s" % recommends[0].user
assert len(recommends) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recommends)0.6 Get Song Names
Input: List RatingObject
Output: List (List SongName Artist)
Map each RatingObject to a Pair of SongName and Artist
Example Input:
[Rating(182412,218057,29.471691093542958), Rating(182412,206693,28.39630617887081), Rating(182412,230654,28.090021579109706), Rating(182412,200542,27.43900484648816), Rating(182412,254771,24.82362529344695)]
Output:
[["My Business","Guy"], ["The Man With The Bag","Floyd/Animal/Zoot"], ["Challenger","Growing"], ["Una Ragazza In Due", "I Giganti"], ["That Is Why", "Say Anything"]]
Return [[SongName,NameOfBand]…]
Convert unique_tracks into dataframe, Cols seperated by <SEP>. The schema is shown b elow
| Name | Type |
|---|---|
| track_id | StringType() |
| song_id | StringType() |
| artist | StringType() |
| title | StringType() |
- Filter
converteddataframe based on if thesong_indexedvalue is found in the Rating object list.
Joinconvertedwithsong_indexedon “title” and “artist” columns. - Remove duplicates
- convert dataframe into rdd using collect()
param converted Dataframe created using convert() method
param ar Array of Rating object produced using recommendSongs()
param path path to unique track names file
return corresponding song + artist names inside array, e.g., [[‘Our Song’, ‘Taylor Swift’],
[‘Boom (2006 Remastered Album Version)’, ‘P.O.D.’]]
def getSongNames(converted, ar, path):
RecommendedSongIndexList = [i.product for i in ar]
#print(RecommendedSongIndexList)
csvSchema = (StructType()
.add("track_id",StringType())
.add("song_id",StringType())
.add("artist",StringType())
.add("title",StringType())
)
df = spark.read.load(path,format="csv",sep = "<SEP>", schema = csvSchema)
#df.show(5)
#converted.show(5)
filteredDF = converted[converted["song_indexed"].isin(RecommendedSongIndexList)]
joinedDF = filteredDF.join(df,filteredDF.song == df.song_id,"INNER").drop(df.song_id)
outDF = joinedDF[["title","artist"]].drop_duplicates()
#filteredDF.show(3)
outRDD = outDF.rdd.map(list)
return outRDD.collect()songNames = getSongNames(converted, recommends, sampleTracksPath)
songNames[["The Emperor's New Clothes", "Sinéad O'Connor"],
['Alhos Verdes', 'GNR'],
['Whataya Want From Me', 'Adam Lambert'],
['Street Justice', 'MSTRKRFT'],
['North Sea Storm (Live)', 'Amon Amarth']]
'''getSongNames test'''
assert len(songNames) == 5, "the amount of song names was expected to be 5 but it was %s" % len(songNames)
assert type(songNames[0]) == list, "the type of a songNames element was expected to be list but it was %s" % type(songNames[0])0.7 Recommend
Input: user_id Output: List (List SongName Artist) representing recommendations
Example return
[["My Business","Guy"], ["The Man With The Bag","Floyd/Animal/Zoot"], ["Challenger","Growing"], ["Una Ragazza In Due", "I Giganti"], ["That Is Why", "Say Anything"]]
param path path to user data
param userId user_id (String) that can be found from user dataset
param tracksPath path to unique song names dataset
param seed used for testing, put it into the trainsALS() method
return corresponding song + artist names inside array
def recommend(path, userId, tracksPath, seed):
def getUserIndex(uId,conversionTable):
cDF = conversionTable.where(conversionTable.user == uId).select(conversionTable.user_indexed)
#.where is same as .select
return int(cDF.rdd.first()["user_indexed"])
loaded = load(path).persist()
converted = convert(loaded).persist()
ConvertedUserId = getUserIndex(userId,converted)
rated = toRating(converted).persist()
model = trainALS(rated,seed)
recommends = recommendSongs(model,ConvertedUserId)
songNames = getSongNames(converted,recommends,tracksPath)
return songNamesrecom = recommend(sampleUsersPath, "b80344d063b5ccb3212f76538f3d9e43d87dca9e" ,sampleTracksPath, rSeed)
recom[["The Emperor's New Clothes", "Sinéad O'Connor"],
['Alhos Verdes', 'GNR'],
['Whataya Want From Me', 'Adam Lambert'],
['Street Justice', 'MSTRKRFT'],
['North Sea Storm (Live)', 'Amon Amarth']]
'''recommend test'''
assert len(recom) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recom)
assert type(recom[0]) == list, "the type of a 'recommend' element was expected to be list but it was %s" % type(recom[0])
#test if the same user and seed returns the same as songNames
assert recom == songNames, "the song names were expected to be %s but they were %s" % (songNames, recom)spark.catalog.clearCache()
sc.stop()
spark.stop()