PySpark ML recommender

Posted on June 2, 2019
Tags: machinelearning
import pyspark.mllib
from pyspark.sql import *
from pyspark import *
from pyspark.rdd import *
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *
from pyspark.mllib.recommendation import *
import random


sc = SparkContext("local","music")
spark = SparkSession(sc)

sampleUsersPath = "sampleUsers.txt"
sampleTracksPath = "sampleTracks.txt"
#Generate random data from sample
import random

randomUsersPath = "randomusers.txt"

with open(sampleUsersPath) as sampleFile:
    lines = random.sample(sampleFile.readlines(), 2000)

outF = open(randomUsersPath, "w")
outF.writelines(lines)
outF.close()

0.1 Load

Load data as Spark’s dataframe.
Columns separated by “.
Counts higher than 20 are reduced to 20.

Name Type
user StringType()
song StringType()
count IntegerType()
def load(path):
    csvSchema = (StructType() 
                .add("user",StringType())
                .add("song",StringType())
                .add("count",IntegerType()))
   
    df = spark.read.load(path,format="csv",sep = "\t", schema = csvSchema)

    newdf = df.withColumn("count",
                          pyspark.sql.functions.when(
                              pyspark.sql.functions.col("count") > 20,20     #if row with count > 20 True, replace with 20
                          ).otherwise(
                      pyspark.sql.functions.col("count")                     #otherwise replace with default count
                          )
                         )

    
    #newdf[newdf['count'] > 20].show(5) 
    #newdf.show(5) 
    #print(df[df['count'] > 20].show(5))
    return newdf
    
loaded = load(sampleUsersPath).persist()
loaded.show()
+--------------------+------------------+-----+
|                user|              song|count|
+--------------------+------------------+-----+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4|    1|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701|    1|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A|    1|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3|    1|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0|    1|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90|    1|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD|    2|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC|    2|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E|    1|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D|    2|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE|    2|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C|    5|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114|    2|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71|    2|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57|    2|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB|    2|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036|    1|
|17aa9f6dbdf753831...|SOJPFPR12AB018109D|    1|
|17aa9f6dbdf753831...|SOJUVYA12AB01860BA|    2|
+--------------------+------------------+-----+
only showing top 20 rows
'''load test'''
correctCols = StructType([\
StructField("user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True)])

fakeData = [("abc123", "123abc", 2)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert loaded.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, loaded.dtypes)

assert loaded.filter('count>20').count() == 0, "counts higher than 20 was expected to be 0 but it was %s" % loaded.filter('count>20').count()

0.2 Convert

Convert user and song fields into doubles. Use mllib’s StringIndexer.

Name Type
user StringType()
song StringType()
count IntegerType()
user_indexed DoubleType()
song_indexed DoubleType()

param df Dataframe which has been created using method load()
return Dataframe

def convert(df):
    ColInputs = ["user","song"]
    ColOutputs = ["user_indexed","song_indexed"]
    stringIndexer = StringIndexer(inputCols=ColInputs,outputCols=ColOutputs)
    model = stringIndexer.fit(df)
    newdf = model.transform(df)
    
    #sorted(set([(i[0], i[1]) for i in td.select(td.id, td.indexed).collect()]),key=lambda x: x[0])
    return newdf
    #raise NotImplementedError()
converted = convert(loaded).persist()
converted.show()
+--------------------+------------------+-----+------------+------------+
|                user|              song|count|user_indexed|song_indexed|
+--------------------+------------------+-----+------------+------------+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|       162.0|       577.0|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4|    1|       162.0|      1053.0|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701|    1|       162.0|      1646.0|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A|    1|       162.0|      1945.0|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3|    1|       162.0|      2306.0|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0|    1|       162.0|      2702.0|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90|    1|       162.0|      3124.0|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD|    2|       810.0|       951.0|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC|    2|       810.0|      1728.0|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E|    1|      1151.0|      3824.0|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D|    2|       839.0|       803.0|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE|    2|       839.0|         5.0|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C|    5|      1317.0|      3948.0|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114|    2|       560.0|       586.0|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71|    2|       560.0|       245.0|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57|    2|       560.0|      4289.0|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB|    2|       115.0|       934.0|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036|    1|       115.0|      1013.0|
|17aa9f6dbdf753831...|SOJPFPR12AB018109D|    1|       115.0|      1958.0|
|17aa9f6dbdf753831...|SOJUVYA12AB01860BA|    2|       115.0|      1988.0|
+--------------------+------------------+-----+------------+------------+
only showing top 20 rows
'''convert test'''
correctCols = StructType([\
StructField("user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True),\
StructField("user_indexed",DoubleType(),True),\
StructField("song_indexed",DoubleType(),True)])

fakeData = [("abc123", "123abc", 2, 1.0, 2.0)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert converted.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, converted.dtypes)

0.3 To Rating

create RDD of Rating classes.
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.Rating

The params of the Rating function (user=user_indexed, product=song_indexedand rating=count)

param df Dataframe which has user_indexed and song_indexed fields (output from convert() method)
return RDD containg Rating objects

def toRating(df):
    rowRDD = df.rdd
    #print(rowRDD.map(lambda x: x["user"]).take(4))
    map1 = rowRDD.map(lambda x: Rating(user=x["user_indexed"],product=x["song_indexed"],rating=x["count"]))
    #print(map1.take(4))
    #r = Rating
    
    return map1
rated = toRating(converted).persist()
rated.take(10)
[Rating(user=162, product=577, rating=2.0),
 Rating(user=162, product=1053, rating=1.0),
 Rating(user=162, product=1646, rating=1.0),
 Rating(user=162, product=1945, rating=1.0),
 Rating(user=162, product=2306, rating=1.0),
 Rating(user=162, product=2702, rating=1.0),
 Rating(user=162, product=3124, rating=1.0),
 Rating(user=810, product=951, rating=2.0),
 Rating(user=810, product=1728, rating=2.0),
 Rating(user=1151, product=3824, rating=1.0)]
'''toRating tests'''
rows = [Rating(user=162, product=577, rating=2.0),
 Rating(user=162, product=1053, rating=1.0),
 Rating(user=162, product=1646, rating=1.0),
 Rating(user=162, product=1945, rating=1.0),
 Rating(user=162, product=2306, rating=1.0)]
assert rated.take(5) == rows, "the first 5 rows were expected to be %s but they were %s" % (rows, rated.take(5))

0.4 Train ALS

train ALS model. https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering

param data RDD of Rating objects that is created using toRating() method.
param seed value used for testing purposes. return trained ALS model

def trainALS(data, seed):
    model = ALS.train(data,rank=2,seed=seed)
    
    return model
rSeed = random.randint(0, 10000)
model = trainALS(rated, rSeed)

0.5 Recommend Songs

Recommend 5 songs to a given user.

param model trained ALS model
param user user id converted (user_indexed) to Integer (with convert())
return recommendations in Array

def recommendSongs(model, user):
    prediction = model.recommendProducts(user,5)
    
    return prediction
recommends = recommendSongs(model, 162)
recommends
[Rating(user=162, product=391, rating=20.684566349984323),
 Rating(user=162, product=1334, rating=19.216739435985964),
 Rating(user=162, product=3283, rating=18.964715987473852),
 Rating(user=162, product=157, rating=18.746516460716293),
 Rating(user=162, product=3674, rating=18.70421335719181)]
'''model and recommendSongs tests'''
assert type(recommends[0]) == pyspark.mllib.recommendation.Rating, "the type was expected to be pyspark.mllib.recommendation.Rating  but it was %s" % type(recommends[0]) 
assert recommends[0].user == 162, "the user was expected to be 162 but it was %s" % recommends[0].user
assert len(recommends) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recommends)

0.6 Get Song Names

Input: List RatingObject
Output: List (List SongName Artist)
Map each RatingObject to a Pair of SongName and Artist

Example Input:

[Rating(182412,218057,29.471691093542958), Rating(182412,206693,28.39630617887081), Rating(182412,230654,28.090021579109706), Rating(182412,200542,27.43900484648816), Rating(182412,254771,24.82362529344695)]

Output:

[["My Business","Guy"], ["The Man With The Bag","Floyd/Animal/Zoot"], ["Challenger","Growing"], ["Una Ragazza In Due", "I Giganti"], ["That Is Why", "Say Anything"]]

Return [[SongName,NameOfBand]…]

Convert unique_tracks into dataframe, Cols seperated by <SEP>. The schema is shown b elow

Name Type
track_id StringType()
song_id StringType()
artist StringType()
title StringType()
  1. Filter converted dataframe based on if the song_indexed value is found in the Rating object list.
    Join converted with song_indexed on “title” and “artist” columns.
  2. Remove duplicates
  3. convert dataframe into rdd using collect()

param converted Dataframe created using convert() method
param ar Array of Rating object produced using recommendSongs()
param path path to unique track names file
return corresponding song + artist names inside array, e.g., [[‘Our Song’, ‘Taylor Swift’], [‘Boom (2006 Remastered Album Version)’, ‘P.O.D.’]]

def getSongNames(converted, ar, path):
    RecommendedSongIndexList = [i.product for i in ar]
    #print(RecommendedSongIndexList)
    csvSchema = (StructType() 
                .add("track_id",StringType())
                .add("song_id",StringType())
                .add("artist",StringType())
                .add("title",StringType())
                )
    df = spark.read.load(path,format="csv",sep = "<SEP>", schema = csvSchema)
    
    #df.show(5)
    #converted.show(5)
    
    filteredDF = converted[converted["song_indexed"].isin(RecommendedSongIndexList)]
    joinedDF = filteredDF.join(df,filteredDF.song == df.song_id,"INNER").drop(df.song_id)
    outDF = joinedDF[["title","artist"]].drop_duplicates()
    #filteredDF.show(3)
    
    outRDD = outDF.rdd.map(list)
    return outRDD.collect()
songNames = getSongNames(converted, recommends, sampleTracksPath)
songNames
[["The Emperor's New Clothes", "Sinéad O'Connor"],
 ['Alhos Verdes', 'GNR'],
 ['Whataya Want From Me', 'Adam Lambert'],
 ['Street Justice', 'MSTRKRFT'],
 ['North Sea Storm (Live)', 'Amon Amarth']]
'''getSongNames test'''
assert len(songNames) == 5, "the amount of song names was expected to be 5 but it was %s" % len(songNames)
assert type(songNames[0]) == list, "the type of a songNames element was expected to be list but it was %s" % type(songNames[0])

0.7 Recommend

Input: user_id Output: List (List SongName Artist) representing recommendations

Example return

[["My Business","Guy"], ["The Man With The Bag","Floyd/Animal/Zoot"], ["Challenger","Growing"], ["Una Ragazza In Due", "I Giganti"], ["That Is Why", "Say Anything"]]

param path path to user data
param userId user_id (String) that can be found from user dataset
param tracksPath path to unique song names dataset
param seed used for testing, put it into the trainsALS() method
return corresponding song + artist names inside array

def recommend(path, userId, tracksPath, seed):
    
    def getUserIndex(uId,conversionTable):
        cDF = conversionTable.where(conversionTable.user == uId).select(conversionTable.user_indexed)
        #.where is same as .select
        return int(cDF.rdd.first()["user_indexed"])
            
    loaded = load(path).persist()
    converted = convert(loaded).persist()

    ConvertedUserId = getUserIndex(userId,converted)
    
    rated = toRating(converted).persist()
    model = trainALS(rated,seed)
    recommends = recommendSongs(model,ConvertedUserId)
    songNames = getSongNames(converted,recommends,tracksPath)
    
    return songNames
recom = recommend(sampleUsersPath, "b80344d063b5ccb3212f76538f3d9e43d87dca9e" ,sampleTracksPath, rSeed)
recom
[["The Emperor's New Clothes", "Sinéad O'Connor"],
 ['Alhos Verdes', 'GNR'],
 ['Whataya Want From Me', 'Adam Lambert'],
 ['Street Justice', 'MSTRKRFT'],
 ['North Sea Storm (Live)', 'Amon Amarth']]
'''recommend test'''
assert len(recom) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recom)
assert type(recom[0]) == list, "the type of a 'recommend' element was expected to be list but it was %s" % type(recom[0])
#test if the same user and seed returns the same as songNames
assert recom == songNames, "the song names were expected to be %s but they were %s" % (songNames, recom)
spark.catalog.clearCache()
sc.stop()
spark.stop()