PySpark MLlib
= "UserJY" NAME
Jupyter notebook:
0.1 Mllib
We will use multiple small methods that to predict if a customer is going to be a paid_customer
. We use logistic regression (https://en.wikipedia.org/wiki/Logistic_regression) for solving the classification problem.
sample of data from http://cs.hut.fi/u/arasalo1/resources/osge_pool-1-thread-1.data.zip.
A machine learning pipeline that transform the data so that mllib’s logistic regression can make predictions on if a customer is going to pay for the service. First method convert
transforms the file into a dataframe so that it is easier to process. Categorical features are transformed using method indexer
. featureAssembler
creates a single feature vector. Most mllib’s machine learning algorithms require this. scaler
scales the variables. createModel
creates and trains the logistic regression model. Training data has been transformed properly using the previous methods. Finally predict
is used to make predictions whether the user is a paying customer using the trained model.
0.1.1 Data schema
column_header | type | description |
---|---|---|
cid | uuid | customer id |
cname | string | name of the user |
string | email address of the user | |
gender | string | customer’s gender |
age | int | age of the customer |
address | string | user provided address during registration, stores only US based addresses other countries gets ‘N/A’ |
country | string | country to which customer belongs to |
register_date | long | date on which user registered with us in milliseconds |
friend_count | int | number of friends a user has |
lifetime | int | number of days a user has been active since registration date |
citygame_played | int | number of times citygame has been played by user |
pictionarygame_played | int | number of times pictionary game has been played by user |
scramblegame_played | int | number of times scaramble game has been played by user |
snipergame_played | int | number of times sniper game has been played by user |
revenue | int | revenue generated by the user |
paid_subscriber | string | whether the customer is paid customer or not, represented by yes or no |
Use Spark’s machine learning library mllib’s Binomial Logistic Regression algorithm.
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression
Use these features for training your model: * gender * age * country * friend_count * lifetime * citygame_played * pictionarygame_played * scramblegame_played * snipergame_played * paid_subscriber(this is the feature to predict)
The data contains categorical features, so you need to change them accordingly.
https://spark.apache.org/docs/latest/ml-features.html
from pyspark.sql import SparkSession
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *
= SparkSession.builder\
spark "local[*]")\
.master("gaming")\
.appName("spark.dynamicAllocation.enabled", "true")\
.config("spark.shuffle.service.enabled", "true")\
.config(
.getOrCreate()
= "testData.data" sampleDataPath
#Generate random sample
import random
= "randomsample.data"
randomData
with open(sampleDataPath) as sampleFile:
= random.sample(sampleFile.readlines(), 4000)
lines
= open(randomData, "w")
outF
outF.writelines(lines) outF.close()
0.2 Convert
convert
creates a dataframe, removes unnecessary colums and converts the rest to right format.
Data schema:
* gender: Double (1 if male else 0)
* age: Double
* country: String
* friend_count: Double
* lifetime: Double
* game1: Double (citygame_played)
* game2: Double (pictionarygame_played)
* game3: Double (scramblegame_played)
* game4: Double (snipergame_played)
* paid_customer: Double (1 if yes else 0)
The function already creates a SQL table called “gaming”, your job is to remove unneccesary columns and convert the rest to right format. Hint: SQL SELECT
query and CAST
. You will also need to use IF
to properly parse and read some of the variables. e.g. IF(gender='male',1,0)
.
param path
: path to file
return
: converted DataFrame
def convert(path):
= StructType([\
originalCols "session_id", StringType(),False),\
StructField("cname", StringType(),False),\
StructField("email",StringType(),False),\
StructField("gender",StringType(),False),\
StructField("age",DoubleType(),False),\
StructField("address",StringType(),False),\
StructField("country",StringType(),True),\
StructField("register_date",StringType(),False),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("revenue",DoubleType(),False),\
StructField("paid_customer",StringType(),False)])
StructField(= spark.read.option("header","false").schema(originalCols).csv(path)
data "gaming")
data.createOrReplaceTempView(= spark.sql("""
a SELECT
CAST (IF(gender='male',1,0) AS Double) AS gender,
age,
country,
friend_count,
lifetime,
game1,
game2,
game3,
game4,
CAST (IF(paid_customer='yes',1,0) AS Double) AS paid_customer
FROM gaming
""")
# YOUR CODE HERE
return a
= convert(sampleDataPath)
data
data.cache() data.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
only showing top 20 rows
'''convert schema test'''
= StructType([\
correctCols "gender",DoubleType(),False),\
StructField("age",DoubleType(),True),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),True),\
StructField("lifetime",DoubleType(),True),\
StructField("game1",DoubleType(),True),\
StructField("game2",DoubleType(),True),\
StructField("game3",DoubleType(),True),\
StructField("game4",DoubleType(),True),\
StructField("paid_customer",DoubleType(),False)])
StructField(
= [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0)]
fakeData
= spark.createDataFrame(fakeData, correctCols)
fakeDf
assert data.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, data.dtypes)
0.3 Indexer
indexer
converts categorical features into doubles.
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
country
is the only categorical feature.
After these modifications schema should be:
* gender: Double (1 if male else 0)
* age: Double
* country: String
* friend_count: Double
* lifetime: Double
* game1: Double (citygame_played)
* game2: Double (pictionarygame_played)
* game3: Double (scramblegame_played)
* game4: Double (snipergame_played)
* paid_customer: Double (1 if yes else 0)
* country_index: Double
param df
: DataFrame
return
: transformed Dataframe. The returned dataframe should have a new column called “country_index”.
def indexer(df):
# YOUR CODE HERE
#a=df[['country']]
#b = a.drop_duplicates()
= StringIndexer(inputCol="country", outputCol="country_index", stringOrderType="frequencyDesc")
stringIndexer = stringIndexer.fit(df).transform(df)
model return model
= indexer(data)
indexed indexed.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0| 0.0|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0| 1.0|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0| 1.0|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0| 0.0|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0| 0.0|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0| 3.0|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0| 2.0|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0| 0.0|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0| 2.0|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 1.0|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 2.0|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0| 0.0|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0| 1.0|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0| 0.0|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 4.0|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0| 0.0|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0| 0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
only showing top 20 rows
'''indexer schema test'''
= StructType([\
correctCols "gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False)])
StructField(
= [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0)]
fakeData
= spark.createDataFrame(fakeData, correctCols)
fakeDf
assert indexed.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, indexed.dtypes)
0.4 Feature Assembler
featureAssembler
combines features into one vector. Most mllib algorithms require this step.
https://spark.apache.org/docs/latest/ml-features.html#vectorassembler. In this task your vector
assembler should take and combine the following columns in the same order listed:
["gender", "age","friend_count","lifetime","game1","game2","game3","game4","country_index"]
.
param df
: Dataframe that is transformed using indexer
return
transformed Dataframe. The returned dataframe should have a new column called “features”
def featureAssembler(df):
= VectorAssembler(
assembler = ["gender", "age","friend_count","lifetime","game1","game2","game3","game4","country_index"],
inputCols = "features"
outputCol
)= assembler.transform(df)
output # YOUR CODE HERE
return output
= featureAssembler(indexed)
assembled assembled.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index| features|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0| 0.0|[1.0,23.0,0.0,5.0...|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0| 1.0|[1.0,20.0,2.0,4.0...|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0| 1.0|[1.0,24.0,9.0,13....|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0| 0.0|[1.0,21.0,412.0,8...|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0| 0.0|[1.0,20.0,0.0,38....|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0| 3.0|[0.0,22.0,86.0,2....|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0| 2.0|[1.0,27.0,2.0,5.0...|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0| 0.0|[0.0,30.0,0.0,45....|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0| 2.0|[0.0,26.0,7.0,20....|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 1.0|[0.0,18.0,0.0,7.0...|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|(9,[0,1],[1.0,22.0])|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 2.0|[1.0,24.0,417.0,1...|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0| 0.0|[0.0,21.0,3.0,3.0...|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|[1.0,18.0,8.0,1.0...|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0| 0.0|[0.0,23.0,7.0,8.0...|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0| 1.0|[1.0,22.0,60.0,8....|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0| 0.0|[1.0,18.0,3.0,7.0...|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 4.0|(9,[1,8],[20.0,4.0])|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0| 0.0|[1.0,22.0,5.0,83....|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0| 0.0|[1.0,23.0,351.0,5...|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
only showing top 20 rows
'''assembler schema test'''
from pyspark.ml.linalg import *
from pyspark.ml.linalg import VectorUDT
= StructType([\
correctCols "gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True)])
StructField(
= [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])))]
fakeData
= spark.createDataFrame(fakeData, correctCols)
fakeDf
assert assembled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, assembled.dtypes)
0.5 Scaler
scaler
standardizes data to improve performance.
https://spark.apache.org/docs/latest/ml-features.html#standardscaler.
For this task please remember to set the withStd
and withMean
parameters to true.
param df
Dataframe that is transformed using featureAssembler
param outputColName
name of the scaled feature vector (output column name)
return
transformed Dataframe. The returned dataframe should have a new column named after the passed outputColName
parameter.
def scaler(df, outputColName):
# YOUR CODE HERE
= StandardScaler(inputCol="features",outputCol=outputColName,withStd=True,withMean=True)
scaler = scaler.fit(df)
scalerModel = scalerModel.transform(df)
scaledData return scaledData
= scaler(assembled, "scaledFeatures")
scaled scaled.show()
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index| features| scaledFeatures|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0| 0.0|[1.0,23.0,0.0,5.0...|[0.90079513989473...|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0| 1.0|[1.0,20.0,2.0,4.0...|[0.90079513989473...|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0| 1.0|[1.0,24.0,9.0,13....|[0.90079513989473...|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0| 0.0|[1.0,21.0,412.0,8...|[0.90079513989473...|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0| 0.0|[1.0,20.0,0.0,38....|[0.90079513989473...|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0| 3.0|[0.0,22.0,86.0,2....|[-1.1099082973702...|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0| 2.0|[1.0,27.0,2.0,5.0...|[0.90079513989473...|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0| 0.0|[0.0,30.0,0.0,45....|[-1.1099082973702...|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0| 2.0|[0.0,26.0,7.0,20....|[-1.1099082973702...|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 1.0|[0.0,18.0,0.0,7.0...|[-1.1099082973702...|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|(9,[0,1],[1.0,22.0])|[0.90079513989473...|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 2.0|[1.0,24.0,417.0,1...|[0.90079513989473...|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0| 0.0|[0.0,21.0,3.0,3.0...|[-1.1099082973702...|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|[1.0,18.0,8.0,1.0...|[0.90079513989473...|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0| 0.0|[0.0,23.0,7.0,8.0...|[-1.1099082973702...|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0| 1.0|[1.0,22.0,60.0,8....|[0.90079513989473...|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0| 0.0|[1.0,18.0,3.0,7.0...|[0.90079513989473...|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 4.0|(9,[1,8],[20.0,4.0])|[-1.1099082973702...|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0| 0.0|[1.0,22.0,5.0,83....|[0.90079513989473...|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0| 0.0|[1.0,23.0,351.0,5...|[0.90079513989473...|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
only showing top 20 rows
'''scaler schema test'''
= StructType([\
correctCols "gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True),\
StructField("scaledFeatures", VectorUDT(),True)])
StructField(
= [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])),(Vectors.dense([2.0, 0.0])))]
fakeData
= spark.createDataFrame(fakeData, correctCols)
fakeDf
assert scaled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, scaled.dtypes)
0.6 Create Model
createModel
creates a Logistic Regression model. When training, 5 iterations should be enough.
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression
param training
transformed dataframe
param featuresCol
name of the features column
param labelCol
name of the label col (paid_customer)
param predCol
name of the prediction col
return
trained Logistic Regression model
def createModel(training, featuresCol, labelCol, predCol):
= LogisticRegression(maxIter = 5)
lr
lr.setFeaturesCol(featuresCol)
lr.setLabelCol(labelCol)
lr.setPredictionCol(predCol)= lr.fit(training)
lrModel # YOUR CODE HERE
#print(lrModel)
return lrModel
#split the dataset into training(70%) and prediction(30%) sets
= scaled.randomSplit([0.7,0.3])
splitted
= createModel(splitted[0],"scaledFeatures","paid_customer","prediction") model
LogisticRegressionModel: uid=LogisticRegression_4951db8e602c, numClasses=2, numFeatures=9
0.7 Predict
Given a transformed and normalized dataset predict
predicts if the customer is going to subscribe to the service.
85% correct will give you 3 points.
70% correct will give you 2 points.
50% correct will give you 1 point.
param model
trained logistic regression model
param dataToPredict
normalized dataframe for prediction
return
DataFrame with predicted scores (1.0 == yes, 0.0 == no)
def predict(model, dataToPredict):
= model.transform(dataToPredict)
output # YOUR CODE HERE
return output
= predict(model, splitted[1])
predictions = predictions.where("prediction == paid_customer").count()
correct = predictions.count()
total print((correct / total) * 100, "% predicted correctly")
predictions.show()
95.80327868852459 % predicted correctly
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index| features| scaledFeatures| rawPrediction| probability|prediction|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
| 0.0|18.0| CANADA| 0.0| 1.0| 0.0| 0.0| 1.0| 0.0| 0.0| 4.0|(9,[1,3,6,8],[18....|[-1.1099082973702...|[4.99559262560287...|[0.99327778480500...| 0.0|
| 0.0|18.0| CANADA| 0.0| 54.0| 28.0| 15.0| 7.0| 4.0| 0.0| 4.0|[0.0,18.0,0.0,54....|[-1.1099082973702...|[3.57955249526850...|[0.97286847322793...| 0.0|
| 0.0|18.0| CANADA| 3.0| 26.0| 11.0| 6.0| 8.0| 1.0| 0.0| 4.0|[0.0,18.0,3.0,26....|[-1.1099082973702...|[4.14194339582123...|[0.98435666573439...| 0.0|
| 0.0|18.0| CANADA| 7.0| 6.0| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0|[0.0,18.0,7.0,6.0...|[-1.1099082973702...|[4.87021457153555...|[0.99238668820979...| 0.0|
| 0.0|18.0| CANADA| 71.0| 47.0| 26.0| 11.0| 6.0| 4.0| 0.0| 4.0|[0.0,18.0,71.0,47...|[-1.1099082973702...|[3.23307263069343...|[0.96206006484016...| 0.0|
| 0.0|18.0| EGYPT| 6.0| 100.0| 45.0| 32.0| 16.0| 7.0| 0.0| 6.0|[0.0,18.0,6.0,100...|[-1.1099082973702...|[1.93735846291887...|[0.87406165588822...| 0.0|
| 0.0|18.0| EGYPT| 9.0| 4.0| 3.0| 0.0| 1.0| 0.0| 0.0| 6.0|[0.0,18.0,9.0,4.0...|[-1.1099082973702...|[4.67385048293821...|[0.99075010816165...| 0.0|
| 0.0|18.0| EGYPT| 9.0| 8.0| 3.0| 0.0| 4.0| 1.0| 0.0| 6.0|[0.0,18.0,9.0,8.0...|[-1.1099082973702...|[4.45248997207777...|[0.98848462479986...| 0.0|
| 0.0|18.0| FRANCE| 0.0| 9.0| 3.0| 4.0| 2.0| 0.0| 0.0| 3.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[4.87584026342622...|[0.99242907475375...| 0.0|
| 0.0|18.0| FRANCE| 0.0| 81.0| 46.0| 17.0| 12.0| 6.0| 0.0| 3.0|[0.0,18.0,0.0,81....|[-1.1099082973702...|[2.89556186017502...|[0.94762660725686...| 0.0|
| 0.0|18.0| FRANCE| 2.0| 87.0| 43.0| 23.0| 16.0| 5.0| 0.0| 3.0|[0.0,18.0,2.0,87....|[-1.1099082973702...|[2.57911443534171...|[0.92950526434730...| 0.0|
| 0.0|18.0| FRANCE| 9.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 3.0|(9,[1,2,8],[18.0,...|[-1.1099082973702...|[5.08632173265312...|[0.99385725410626...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 2.0|[0.0,18.0,0.0,7.0...|[-1.1099082973702...|[5.10646771499869...|[0.99397902988660...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 9.0| 4.0| 2.0| 1.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[4.99666173139974...|[0.99328491949002...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 9.0| 6.0| 2.0| 0.0| 1.0| 0.0| 2.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[5.05345088485828...|[0.99365328438896...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 19.0| 5.0| 7.0| 5.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,19....|[-1.1099082973702...|[4.60742115368499...|[0.99012105172261...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 24.0| 13.0| 6.0| 3.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,24....|[-1.1099082973702...|[4.59950569851941...|[0.99004332671759...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 38.0| 20.0| 13.0| 3.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,38....|[-1.1099082973702...|[4.29553129537215...|[0.98655393230056...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 66.0| 29.0| 20.0| 11.0| 6.0| 0.0| 2.0|[0.0,18.0,0.0,66....|[-1.1099082973702...|[3.31748753051268...|[0.96502388763751...| 0.0|
| 0.0|18.0|GERMANY| 1.0| 2.0| 0.0| 1.0| 1.0| 0.0| 0.0| 2.0|[0.0,18.0,1.0,2.0...|[-1.1099082973702...|[5.15809258630837...|[0.99428024190099...| 0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
'''prediction correctness test'''
= convert(randomData)
data
data.cache()= indexer(data)
indexed = featureAssembler(indexed)
assembled = scaler(assembled, "scaledFeatures")
scaled = scaled.randomSplit([0.7,0.3])
splitted = createModel(splitted[0],"scaledFeatures","paid_customer","prediction")
model = predict(model, splitted[1])
predictions = predictions.where("prediction == paid_customer").count()
correct = predictions.count()
total = (correct / total) * 100
answer print(answer, "% predicted correctly")
assert answer >= 50.0, "less than 50% predicted correctly, you get 0 points"
LogisticRegressionModel: uid=LogisticRegression_9c24b6de42a1, numClasses=2, numFeatures=9
95.787700084246 % predicted correctly
assert answer >= 70.0, "less than 70% predicted correctly, you get 1 point"
assert answer >= 85.0, "less than 85% predicted correctly, you get 2 points"
spark.stop()