PySpark MLlib

Posted on June 2, 2019
Tags: machinelearning
NAME = "UserJY"

Jupyter notebook:

0.1 Mllib

We will use multiple small methods that to predict if a customer is going to be a paid_customer. We use logistic regression (https://en.wikipedia.org/wiki/Logistic_regression) for solving the classification problem.

sample of data from http://cs.hut.fi/u/arasalo1/resources/osge_pool-1-thread-1.data.zip.

A machine learning pipeline that transform the data so that mllib’s logistic regression can make predictions on if a customer is going to pay for the service. First method convert transforms the file into a dataframe so that it is easier to process. Categorical features are transformed using method indexer. featureAssembler creates a single feature vector. Most mllib’s machine learning algorithms require this. scaler scales the variables. createModelcreates and trains the logistic regression model. Training data has been transformed properly using the previous methods. Finally predict is used to make predictions whether the user is a paying customer using the trained model.

0.1.1 Data schema

column_header type description
cid uuid customer id
cname string name of the user
email string email address of the user
gender string customer’s gender
age int age of the customer
address string user provided address during registration, stores only US based addresses other countries gets ‘N/A’
country string country to which customer belongs to
register_date long date on which user registered with us in milliseconds
friend_count int number of friends a user has
lifetime int number of days a user has been active since registration date
citygame_played int number of times citygame has been played by user
pictionarygame_played int number of times pictionary game has been played by user
scramblegame_played int number of times scaramble game has been played by user
snipergame_played int number of times sniper game has been played by user
revenue int revenue generated by the user
paid_subscriber string whether the customer is paid customer or not, represented by yes or no

Use Spark’s machine learning library mllib’s Binomial Logistic Regression algorithm.
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression

Use these features for training your model: * gender * age * country * friend_count * lifetime * citygame_played * pictionarygame_played * scramblegame_played * snipergame_played * paid_subscriber(this is the feature to predict)

The data contains categorical features, so you need to change them accordingly.
https://spark.apache.org/docs/latest/ml-features.html

from pyspark.sql import SparkSession
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("gaming")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.shuffle.service.enabled", "true")\
    .getOrCreate()

sampleDataPath = "testData.data"
#Generate random sample
import random

randomData = "randomsample.data"

with open(sampleDataPath) as sampleFile:
    lines = random.sample(sampleFile.readlines(), 4000)

outF = open(randomData, "w")
outF.writelines(lines)
outF.close()

0.2 Convert

convert creates a dataframe, removes unnecessary colums and converts the rest to right format.
Data schema: * gender: Double (1 if male else 0) * age: Double * country: String * friend_count: Double * lifetime: Double * game1: Double (citygame_played) * game2: Double (pictionarygame_played) * game3: Double (scramblegame_played) * game4: Double (snipergame_played) * paid_customer: Double (1 if yes else 0)

The function already creates a SQL table called “gaming”, your job is to remove unneccesary columns and convert the rest to right format. Hint: SQL SELECT query and CAST. You will also need to use IF to properly parse and read some of the variables. e.g. IF(gender='male',1,0).

param path: path to file
return: converted DataFrame


def convert(path):
    originalCols = StructType([\
    StructField("session_id", StringType(),False),\
    StructField("cname", StringType(),False),\
    StructField("email",StringType(),False),\
    StructField("gender",StringType(),False),\
    StructField("age",DoubleType(),False),\
    StructField("address",StringType(),False),\
    StructField("country",StringType(),True),\
    StructField("register_date",StringType(),False),\
    StructField("friend_count",DoubleType(),False),\
    StructField("lifetime",DoubleType(),False),\
    StructField("game1",DoubleType(),False),\
    StructField("game2",DoubleType(),False),\
    StructField("game3",DoubleType(),False),\
    StructField("game4",DoubleType(),False),\
    StructField("revenue",DoubleType(),False),\
    StructField("paid_customer",StringType(),False)])
    data = spark.read.option("header","false").schema(originalCols).csv(path)
    data.createOrReplaceTempView("gaming")
    a = spark.sql("""
        SELECT 
            CAST (IF(gender='male',1,0) AS Double) AS gender,
            age,
            country,
            friend_count,
            lifetime,
            game1,
            game2,
            game3,
            game4,
            CAST (IF(paid_customer='yes',1,0) AS Double) AS paid_customer
        
        FROM gaming
        
        """)
    # YOUR CODE HERE
    return a
data = convert(sampleDataPath)
data.cache()
data.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
|   1.0|23.0|    USA|         0.0|     5.0|  0.0|  1.0|  1.0|  3.0|          0.0|
|   1.0|20.0|     UK|         2.0|     4.0|  0.0|  0.0|  0.0|  4.0|          0.0|
|   1.0|24.0|     UK|         9.0|    13.0|  1.0|  0.0|  2.0| 10.0|          0.0|
|   1.0|21.0|    USA|       412.0|    80.0|  8.0|  7.0| 17.0| 48.0|          1.0|
|   1.0|20.0|    USA|         0.0|    38.0|  0.0|  3.0|  6.0| 29.0|          0.0|
|   0.0|22.0| FRANCE|        86.0|     2.0|  0.0|  2.0|  0.0|  0.0|          0.0|
|   1.0|27.0|GERMANY|         2.0|     5.0|  1.0|  1.0|  1.0|  2.0|          0.0|
|   0.0|30.0|    USA|         0.0|    45.0| 19.0| 21.0|  3.0|  2.0|          0.0|
|   0.0|26.0|GERMANY|         7.0|    20.0|  6.0|  8.0|  5.0|  1.0|          0.0|
|   0.0|18.0|     UK|         0.0|     7.0|  5.0|  2.0|  0.0|  0.0|          0.0|
|   1.0|22.0|    USA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|
|   1.0|24.0|GERMANY|       417.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|
|   0.0|21.0|    USA|         3.0|     3.0|  2.0|  1.0|  0.0|  0.0|          0.0|
|   1.0|18.0|    USA|         8.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|
|   0.0|23.0|    USA|         7.0|     8.0|  4.0|  2.0|  2.0|  0.0|          0.0|
|   1.0|22.0|     UK|        60.0|     8.0|  0.0|  0.0|  4.0|  4.0|          0.0|
|   1.0|18.0|    USA|         3.0|     7.0|  0.0|  2.0|  1.0|  4.0|          0.0|
|   0.0|20.0| CANADA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|
|   1.0|22.0|    USA|         5.0|    83.0|  6.0|  9.0| 10.0| 58.0|          0.0|
|   1.0|23.0|    USA|       351.0|     5.0|  0.0|  0.0|  0.0|  5.0|          0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
only showing top 20 rows
'''convert schema test'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),True),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),True),\
StructField("lifetime",DoubleType(),True),\
StructField("game1",DoubleType(),True),\
StructField("game2",DoubleType(),True),\
StructField("game3",DoubleType(),True),\
StructField("game4",DoubleType(),True),\
StructField("paid_customer",DoubleType(),False)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert data.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, data.dtypes)

0.3 Indexer

indexer converts categorical features into doubles.
https://spark.apache.org/docs/latest/ml-features.html#stringindexer country is the only categorical feature.
After these modifications schema should be: * gender: Double (1 if male else 0) * age: Double * country: String * friend_count: Double * lifetime: Double * game1: Double (citygame_played) * game2: Double (pictionarygame_played) * game3: Double (scramblegame_played) * game4: Double (snipergame_played) * paid_customer: Double (1 if yes else 0) * country_index: Double

param df: DataFrame
return: transformed Dataframe. The returned dataframe should have a new column called “country_index”.

def indexer(df):
    # YOUR CODE HERE
    #a=df[['country']]
    #b = a.drop_duplicates()
    
    stringIndexer = StringIndexer(inputCol="country", outputCol="country_index", stringOrderType="frequencyDesc")
    model = stringIndexer.fit(df).transform(df)
    return model
indexed = indexer(data)
indexed.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
|   1.0|23.0|    USA|         0.0|     5.0|  0.0|  1.0|  1.0|  3.0|          0.0|          0.0|
|   1.0|20.0|     UK|         2.0|     4.0|  0.0|  0.0|  0.0|  4.0|          0.0|          1.0|
|   1.0|24.0|     UK|         9.0|    13.0|  1.0|  0.0|  2.0| 10.0|          0.0|          1.0|
|   1.0|21.0|    USA|       412.0|    80.0|  8.0|  7.0| 17.0| 48.0|          1.0|          0.0|
|   1.0|20.0|    USA|         0.0|    38.0|  0.0|  3.0|  6.0| 29.0|          0.0|          0.0|
|   0.0|22.0| FRANCE|        86.0|     2.0|  0.0|  2.0|  0.0|  0.0|          0.0|          3.0|
|   1.0|27.0|GERMANY|         2.0|     5.0|  1.0|  1.0|  1.0|  2.0|          0.0|          2.0|
|   0.0|30.0|    USA|         0.0|    45.0| 19.0| 21.0|  3.0|  2.0|          0.0|          0.0|
|   0.0|26.0|GERMANY|         7.0|    20.0|  6.0|  8.0|  5.0|  1.0|          0.0|          2.0|
|   0.0|18.0|     UK|         0.0|     7.0|  5.0|  2.0|  0.0|  0.0|          0.0|          1.0|
|   1.0|22.0|    USA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|          0.0|
|   1.0|24.0|GERMANY|       417.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|          2.0|
|   0.0|21.0|    USA|         3.0|     3.0|  2.0|  1.0|  0.0|  0.0|          0.0|          0.0|
|   1.0|18.0|    USA|         8.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|          0.0|
|   0.0|23.0|    USA|         7.0|     8.0|  4.0|  2.0|  2.0|  0.0|          0.0|          0.0|
|   1.0|22.0|     UK|        60.0|     8.0|  0.0|  0.0|  4.0|  4.0|          0.0|          1.0|
|   1.0|18.0|    USA|         3.0|     7.0|  0.0|  2.0|  1.0|  4.0|          0.0|          0.0|
|   0.0|20.0| CANADA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|          4.0|
|   1.0|22.0|    USA|         5.0|    83.0|  6.0|  9.0| 10.0| 58.0|          0.0|          0.0|
|   1.0|23.0|    USA|       351.0|     5.0|  0.0|  0.0|  0.0|  5.0|          0.0|          0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
only showing top 20 rows
'''indexer schema test'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert indexed.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, indexed.dtypes)

0.4 Feature Assembler

featureAssembler combines features into one vector. Most mllib algorithms require this step.
https://spark.apache.org/docs/latest/ml-features.html#vectorassembler. In this task your vector assembler should take and combine the following columns in the same order listed: ["gender", "age","friend_count","lifetime","game1","game2","game3","game4","country_index"].

param df: Dataframe that is transformed using indexer
return transformed Dataframe. The returned dataframe should have a new column called “features”

def featureAssembler(df):
    assembler = VectorAssembler(
        inputCols = ["gender", "age","friend_count","lifetime","game1","game2","game3","game4","country_index"],
        outputCol = "features"
    )
    output = assembler.transform(df)
    # YOUR CODE HERE
    return output
assembled = featureAssembler(indexed)
assembled.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index|            features|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
|   1.0|23.0|    USA|         0.0|     5.0|  0.0|  1.0|  1.0|  3.0|          0.0|          0.0|[1.0,23.0,0.0,5.0...|
|   1.0|20.0|     UK|         2.0|     4.0|  0.0|  0.0|  0.0|  4.0|          0.0|          1.0|[1.0,20.0,2.0,4.0...|
|   1.0|24.0|     UK|         9.0|    13.0|  1.0|  0.0|  2.0| 10.0|          0.0|          1.0|[1.0,24.0,9.0,13....|
|   1.0|21.0|    USA|       412.0|    80.0|  8.0|  7.0| 17.0| 48.0|          1.0|          0.0|[1.0,21.0,412.0,8...|
|   1.0|20.0|    USA|         0.0|    38.0|  0.0|  3.0|  6.0| 29.0|          0.0|          0.0|[1.0,20.0,0.0,38....|
|   0.0|22.0| FRANCE|        86.0|     2.0|  0.0|  2.0|  0.0|  0.0|          0.0|          3.0|[0.0,22.0,86.0,2....|
|   1.0|27.0|GERMANY|         2.0|     5.0|  1.0|  1.0|  1.0|  2.0|          0.0|          2.0|[1.0,27.0,2.0,5.0...|
|   0.0|30.0|    USA|         0.0|    45.0| 19.0| 21.0|  3.0|  2.0|          0.0|          0.0|[0.0,30.0,0.0,45....|
|   0.0|26.0|GERMANY|         7.0|    20.0|  6.0|  8.0|  5.0|  1.0|          0.0|          2.0|[0.0,26.0,7.0,20....|
|   0.0|18.0|     UK|         0.0|     7.0|  5.0|  2.0|  0.0|  0.0|          0.0|          1.0|[0.0,18.0,0.0,7.0...|
|   1.0|22.0|    USA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|          0.0|(9,[0,1],[1.0,22.0])|
|   1.0|24.0|GERMANY|       417.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|          2.0|[1.0,24.0,417.0,1...|
|   0.0|21.0|    USA|         3.0|     3.0|  2.0|  1.0|  0.0|  0.0|          0.0|          0.0|[0.0,21.0,3.0,3.0...|
|   1.0|18.0|    USA|         8.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|          0.0|[1.0,18.0,8.0,1.0...|
|   0.0|23.0|    USA|         7.0|     8.0|  4.0|  2.0|  2.0|  0.0|          0.0|          0.0|[0.0,23.0,7.0,8.0...|
|   1.0|22.0|     UK|        60.0|     8.0|  0.0|  0.0|  4.0|  4.0|          0.0|          1.0|[1.0,22.0,60.0,8....|
|   1.0|18.0|    USA|         3.0|     7.0|  0.0|  2.0|  1.0|  4.0|          0.0|          0.0|[1.0,18.0,3.0,7.0...|
|   0.0|20.0| CANADA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|          4.0|(9,[1,8],[20.0,4.0])|
|   1.0|22.0|    USA|         5.0|    83.0|  6.0|  9.0| 10.0| 58.0|          0.0|          0.0|[1.0,22.0,5.0,83....|
|   1.0|23.0|    USA|       351.0|     5.0|  0.0|  0.0|  0.0|  5.0|          0.0|          0.0|[1.0,23.0,351.0,5...|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
only showing top 20 rows
'''assembler schema test'''
from pyspark.ml.linalg import *
from pyspark.ml.linalg import VectorUDT

correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])))]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert assembled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, assembled.dtypes)

0.5 Scaler

scaler standardizes data to improve performance.
https://spark.apache.org/docs/latest/ml-features.html#standardscaler. For this task please remember to set the withStd and withMean parameters to true.

param df Dataframe that is transformed using featureAssembler
param outputColName name of the scaled feature vector (output column name)
return transformed Dataframe. The returned dataframe should have a new column named after the passed outputColName parameter.

def scaler(df, outputColName):
    # YOUR CODE HERE
    scaler = StandardScaler(inputCol="features",outputCol=outputColName,withStd=True,withMean=True)
    scalerModel = scaler.fit(df)
    scaledData = scalerModel.transform(df)
    return scaledData
scaled = scaler(assembled, "scaledFeatures")
scaled.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index|            features|      scaledFeatures|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
|   1.0|23.0|    USA|         0.0|     5.0|  0.0|  1.0|  1.0|  3.0|          0.0|          0.0|[1.0,23.0,0.0,5.0...|[0.90079513989473...|
|   1.0|20.0|     UK|         2.0|     4.0|  0.0|  0.0|  0.0|  4.0|          0.0|          1.0|[1.0,20.0,2.0,4.0...|[0.90079513989473...|
|   1.0|24.0|     UK|         9.0|    13.0|  1.0|  0.0|  2.0| 10.0|          0.0|          1.0|[1.0,24.0,9.0,13....|[0.90079513989473...|
|   1.0|21.0|    USA|       412.0|    80.0|  8.0|  7.0| 17.0| 48.0|          1.0|          0.0|[1.0,21.0,412.0,8...|[0.90079513989473...|
|   1.0|20.0|    USA|         0.0|    38.0|  0.0|  3.0|  6.0| 29.0|          0.0|          0.0|[1.0,20.0,0.0,38....|[0.90079513989473...|
|   0.0|22.0| FRANCE|        86.0|     2.0|  0.0|  2.0|  0.0|  0.0|          0.0|          3.0|[0.0,22.0,86.0,2....|[-1.1099082973702...|
|   1.0|27.0|GERMANY|         2.0|     5.0|  1.0|  1.0|  1.0|  2.0|          0.0|          2.0|[1.0,27.0,2.0,5.0...|[0.90079513989473...|
|   0.0|30.0|    USA|         0.0|    45.0| 19.0| 21.0|  3.0|  2.0|          0.0|          0.0|[0.0,30.0,0.0,45....|[-1.1099082973702...|
|   0.0|26.0|GERMANY|         7.0|    20.0|  6.0|  8.0|  5.0|  1.0|          0.0|          2.0|[0.0,26.0,7.0,20....|[-1.1099082973702...|
|   0.0|18.0|     UK|         0.0|     7.0|  5.0|  2.0|  0.0|  0.0|          0.0|          1.0|[0.0,18.0,0.0,7.0...|[-1.1099082973702...|
|   1.0|22.0|    USA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|          0.0|(9,[0,1],[1.0,22.0])|[0.90079513989473...|
|   1.0|24.0|GERMANY|       417.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|          2.0|[1.0,24.0,417.0,1...|[0.90079513989473...|
|   0.0|21.0|    USA|         3.0|     3.0|  2.0|  1.0|  0.0|  0.0|          0.0|          0.0|[0.0,21.0,3.0,3.0...|[-1.1099082973702...|
|   1.0|18.0|    USA|         8.0|     1.0|  0.0|  0.0|  0.0|  1.0|          0.0|          0.0|[1.0,18.0,8.0,1.0...|[0.90079513989473...|
|   0.0|23.0|    USA|         7.0|     8.0|  4.0|  2.0|  2.0|  0.0|          0.0|          0.0|[0.0,23.0,7.0,8.0...|[-1.1099082973702...|
|   1.0|22.0|     UK|        60.0|     8.0|  0.0|  0.0|  4.0|  4.0|          0.0|          1.0|[1.0,22.0,60.0,8....|[0.90079513989473...|
|   1.0|18.0|    USA|         3.0|     7.0|  0.0|  2.0|  1.0|  4.0|          0.0|          0.0|[1.0,18.0,3.0,7.0...|[0.90079513989473...|
|   0.0|20.0| CANADA|         0.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|          4.0|(9,[1,8],[20.0,4.0])|[-1.1099082973702...|
|   1.0|22.0|    USA|         5.0|    83.0|  6.0|  9.0| 10.0| 58.0|          0.0|          0.0|[1.0,22.0,5.0,83....|[0.90079513989473...|
|   1.0|23.0|    USA|       351.0|     5.0|  0.0|  0.0|  0.0|  5.0|          0.0|          0.0|[1.0,23.0,351.0,5...|[0.90079513989473...|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
only showing top 20 rows
'''scaler schema test'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True),\
StructField("scaledFeatures", VectorUDT(),True)])

fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])),(Vectors.dense([2.0, 0.0])))]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert scaled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, scaled.dtypes)

0.6 Create Model

createModel creates a Logistic Regression model. When training, 5 iterations should be enough.
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression

param training transformed dataframe
param featuresCol name of the features column
param labelCol name of the label col (paid_customer)
param predCol name of the prediction col
return trained Logistic Regression model

def createModel(training, featuresCol, labelCol, predCol):
    lr = LogisticRegression(maxIter = 5)
    lr.setFeaturesCol(featuresCol)
    lr.setLabelCol(labelCol)
    lr.setPredictionCol(predCol)
    lrModel = lr.fit(training)
    # YOUR CODE HERE
    #print(lrModel)
    return lrModel
#split the dataset into training(70%) and prediction(30%) sets
splitted = scaled.randomSplit([0.7,0.3])

model = createModel(splitted[0],"scaledFeatures","paid_customer","prediction")
LogisticRegressionModel: uid=LogisticRegression_4951db8e602c, numClasses=2, numFeatures=9

0.7 Predict

Given a transformed and normalized dataset predict predicts if the customer is going to subscribe to the service.

85% correct will give you 3 points.
70% correct will give you 2 points.
50% correct will give you 1 point.

param model trained logistic regression model
param dataToPredict normalized dataframe for prediction
return DataFrame with predicted scores (1.0 == yes, 0.0 == no)

def predict(model, dataToPredict):
    output = model.transform(dataToPredict)
    # YOUR CODE HERE
    return output
predictions = predict(model, splitted[1])
correct = predictions.where("prediction == paid_customer").count()
total = predictions.count()
print((correct / total) * 100, "% predicted correctly")
predictions.show()
95.80327868852459 % predicted correctly
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index|            features|      scaledFeatures|       rawPrediction|         probability|prediction|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|   0.0|18.0| CANADA|         0.0|     1.0|  0.0|  0.0|  1.0|  0.0|          0.0|          4.0|(9,[1,3,6,8],[18....|[-1.1099082973702...|[4.99559262560287...|[0.99327778480500...|       0.0|
|   0.0|18.0| CANADA|         0.0|    54.0| 28.0| 15.0|  7.0|  4.0|          0.0|          4.0|[0.0,18.0,0.0,54....|[-1.1099082973702...|[3.57955249526850...|[0.97286847322793...|       0.0|
|   0.0|18.0| CANADA|         3.0|    26.0| 11.0|  6.0|  8.0|  1.0|          0.0|          4.0|[0.0,18.0,3.0,26....|[-1.1099082973702...|[4.14194339582123...|[0.98435666573439...|       0.0|
|   0.0|18.0| CANADA|         7.0|     6.0|  2.0|  4.0|  0.0|  0.0|          0.0|          4.0|[0.0,18.0,7.0,6.0...|[-1.1099082973702...|[4.87021457153555...|[0.99238668820979...|       0.0|
|   0.0|18.0| CANADA|        71.0|    47.0| 26.0| 11.0|  6.0|  4.0|          0.0|          4.0|[0.0,18.0,71.0,47...|[-1.1099082973702...|[3.23307263069343...|[0.96206006484016...|       0.0|
|   0.0|18.0|  EGYPT|         6.0|   100.0| 45.0| 32.0| 16.0|  7.0|          0.0|          6.0|[0.0,18.0,6.0,100...|[-1.1099082973702...|[1.93735846291887...|[0.87406165588822...|       0.0|
|   0.0|18.0|  EGYPT|         9.0|     4.0|  3.0|  0.0|  1.0|  0.0|          0.0|          6.0|[0.0,18.0,9.0,4.0...|[-1.1099082973702...|[4.67385048293821...|[0.99075010816165...|       0.0|
|   0.0|18.0|  EGYPT|         9.0|     8.0|  3.0|  0.0|  4.0|  1.0|          0.0|          6.0|[0.0,18.0,9.0,8.0...|[-1.1099082973702...|[4.45248997207777...|[0.98848462479986...|       0.0|
|   0.0|18.0| FRANCE|         0.0|     9.0|  3.0|  4.0|  2.0|  0.0|          0.0|          3.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[4.87584026342622...|[0.99242907475375...|       0.0|
|   0.0|18.0| FRANCE|         0.0|    81.0| 46.0| 17.0| 12.0|  6.0|          0.0|          3.0|[0.0,18.0,0.0,81....|[-1.1099082973702...|[2.89556186017502...|[0.94762660725686...|       0.0|
|   0.0|18.0| FRANCE|         2.0|    87.0| 43.0| 23.0| 16.0|  5.0|          0.0|          3.0|[0.0,18.0,2.0,87....|[-1.1099082973702...|[2.57911443534171...|[0.92950526434730...|       0.0|
|   0.0|18.0| FRANCE|         9.0|     0.0|  0.0|  0.0|  0.0|  0.0|          0.0|          3.0|(9,[1,2,8],[18.0,...|[-1.1099082973702...|[5.08632173265312...|[0.99385725410626...|       0.0|
|   0.0|18.0|GERMANY|         0.0|     7.0|  5.0|  2.0|  0.0|  0.0|          0.0|          2.0|[0.0,18.0,0.0,7.0...|[-1.1099082973702...|[5.10646771499869...|[0.99397902988660...|       0.0|
|   0.0|18.0|GERMANY|         0.0|     9.0|  4.0|  2.0|  1.0|  2.0|          0.0|          2.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[4.99666173139974...|[0.99328491949002...|       0.0|
|   0.0|18.0|GERMANY|         0.0|     9.0|  6.0|  2.0|  0.0|  1.0|          0.0|          2.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[5.05345088485828...|[0.99365328438896...|       0.0|
|   0.0|18.0|GERMANY|         0.0|    19.0|  5.0|  7.0|  5.0|  2.0|          0.0|          2.0|[0.0,18.0,0.0,19....|[-1.1099082973702...|[4.60742115368499...|[0.99012105172261...|       0.0|
|   0.0|18.0|GERMANY|         0.0|    24.0| 13.0|  6.0|  3.0|  2.0|          0.0|          2.0|[0.0,18.0,0.0,24....|[-1.1099082973702...|[4.59950569851941...|[0.99004332671759...|       0.0|
|   0.0|18.0|GERMANY|         0.0|    38.0| 20.0| 13.0|  3.0|  2.0|          0.0|          2.0|[0.0,18.0,0.0,38....|[-1.1099082973702...|[4.29553129537215...|[0.98655393230056...|       0.0|
|   0.0|18.0|GERMANY|         0.0|    66.0| 29.0| 20.0| 11.0|  6.0|          0.0|          2.0|[0.0,18.0,0.0,66....|[-1.1099082973702...|[3.31748753051268...|[0.96502388763751...|       0.0|
|   0.0|18.0|GERMANY|         1.0|     2.0|  0.0|  1.0|  1.0|  0.0|          0.0|          2.0|[0.0,18.0,1.0,2.0...|[-1.1099082973702...|[5.15809258630837...|[0.99428024190099...|       0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
'''prediction correctness test'''
data = convert(randomData)
data.cache()
indexed = indexer(data)
assembled = featureAssembler(indexed)
scaled = scaler(assembled, "scaledFeatures")
splitted = scaled.randomSplit([0.7,0.3])
model = createModel(splitted[0],"scaledFeatures","paid_customer","prediction")
predictions = predict(model, splitted[1])
correct = predictions.where("prediction == paid_customer").count()
total = predictions.count()
answer = (correct / total) * 100
print(answer, "% predicted correctly")
assert answer >= 50.0, "less than 50% predicted correctly, you get 0 points"
LogisticRegressionModel: uid=LogisticRegression_9c24b6de42a1, numClasses=2, numFeatures=9
95.787700084246 % predicted correctly
assert answer >= 70.0, "less than 70% predicted correctly, you get 1 point"
assert answer >= 85.0, "less than 85% predicted correctly, you get 2 points"
spark.stop()