PySpark MLlib
NAME = "UserJY"Jupyter notebook:

0.1 Mllib
We will use multiple small methods that to predict if a customer is going to be a paid_customer. We use logistic regression (https://en.wikipedia.org/wiki/Logistic_regression) for solving the classification problem.
sample of data from http://cs.hut.fi/u/arasalo1/resources/osge_pool-1-thread-1.data.zip.
A machine learning pipeline that transform the data so that mllib’s logistic regression can make predictions on if a customer is going to pay for the service. First method convert transforms the file into a dataframe so that it is easier to process. Categorical features are transformed using method indexer. featureAssembler creates a single feature vector. Most mllib’s machine learning algorithms require this. scaler scales the variables. createModelcreates and trains the logistic regression model. Training data has been transformed properly using the previous methods. Finally predict is used to make predictions whether the user is a paying customer using the trained model.
0.1.1 Data schema
| column_header | type | description |
|---|---|---|
| cid | uuid | customer id |
| cname | string | name of the user |
| string | email address of the user | |
| gender | string | customer’s gender |
| age | int | age of the customer |
| address | string | user provided address during registration, stores only US based addresses other countries gets ‘N/A’ |
| country | string | country to which customer belongs to |
| register_date | long | date on which user registered with us in milliseconds |
| friend_count | int | number of friends a user has |
| lifetime | int | number of days a user has been active since registration date |
| citygame_played | int | number of times citygame has been played by user |
| pictionarygame_played | int | number of times pictionary game has been played by user |
| scramblegame_played | int | number of times scaramble game has been played by user |
| snipergame_played | int | number of times sniper game has been played by user |
| revenue | int | revenue generated by the user |
| paid_subscriber | string | whether the customer is paid customer or not, represented by yes or no |
Use Spark’s machine learning library mllib’s Binomial Logistic Regression algorithm.
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression
Use these features for training your model: * gender * age * country * friend_count * lifetime * citygame_played * pictionarygame_played * scramblegame_played * snipergame_played * paid_subscriber(this is the feature to predict)
The data contains categorical features, so you need to change them accordingly.
https://spark.apache.org/docs/latest/ml-features.html
from pyspark.sql import SparkSession
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *
spark = SparkSession.builder\
.master("local[*]")\
.appName("gaming")\
.config("spark.dynamicAllocation.enabled", "true")\
.config("spark.shuffle.service.enabled", "true")\
.getOrCreate()
sampleDataPath = "testData.data"#Generate random sample
import random
randomData = "randomsample.data"
with open(sampleDataPath) as sampleFile:
lines = random.sample(sampleFile.readlines(), 4000)
outF = open(randomData, "w")
outF.writelines(lines)
outF.close()0.2 Convert
convert creates a dataframe, removes unnecessary colums and converts the rest to right format.
Data schema:
* gender: Double (1 if male else 0)
* age: Double
* country: String
* friend_count: Double
* lifetime: Double
* game1: Double (citygame_played)
* game2: Double (pictionarygame_played)
* game3: Double (scramblegame_played)
* game4: Double (snipergame_played)
* paid_customer: Double (1 if yes else 0)
The function already creates a SQL table called “gaming”, your job is to remove unneccesary columns and convert the rest to right format. Hint: SQL SELECT query and CAST. You will also need to use IF to properly parse and read some of the variables. e.g. IF(gender='male',1,0).
param path: path to file
return: converted DataFrame
def convert(path):
originalCols = StructType([\
StructField("session_id", StringType(),False),\
StructField("cname", StringType(),False),\
StructField("email",StringType(),False),\
StructField("gender",StringType(),False),\
StructField("age",DoubleType(),False),\
StructField("address",StringType(),False),\
StructField("country",StringType(),True),\
StructField("register_date",StringType(),False),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("revenue",DoubleType(),False),\
StructField("paid_customer",StringType(),False)])
data = spark.read.option("header","false").schema(originalCols).csv(path)
data.createOrReplaceTempView("gaming")
a = spark.sql("""
SELECT
CAST (IF(gender='male',1,0) AS Double) AS gender,
age,
country,
friend_count,
lifetime,
game1,
game2,
game3,
game4,
CAST (IF(paid_customer='yes',1,0) AS Double) AS paid_customer
FROM gaming
""")
# YOUR CODE HERE
return adata = convert(sampleDataPath)
data.cache()
data.show()+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+
only showing top 20 rows
'''convert schema test'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),True),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),True),\
StructField("lifetime",DoubleType(),True),\
StructField("game1",DoubleType(),True),\
StructField("game2",DoubleType(),True),\
StructField("game3",DoubleType(),True),\
StructField("game4",DoubleType(),True),\
StructField("paid_customer",DoubleType(),False)])
fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0)]
fakeDf = spark.createDataFrame(fakeData, correctCols)
assert data.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, data.dtypes)0.3 Indexer
indexer converts categorical features into doubles.
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
country is the only categorical feature.
After these modifications schema should be:
* gender: Double (1 if male else 0)
* age: Double
* country: String
* friend_count: Double
* lifetime: Double
* game1: Double (citygame_played)
* game2: Double (pictionarygame_played)
* game3: Double (scramblegame_played)
* game4: Double (snipergame_played)
* paid_customer: Double (1 if yes else 0)
* country_index: Double
param df: DataFrame
return: transformed Dataframe. The returned dataframe should have a new column called “country_index”.
def indexer(df):
# YOUR CODE HERE
#a=df[['country']]
#b = a.drop_duplicates()
stringIndexer = StringIndexer(inputCol="country", outputCol="country_index", stringOrderType="frequencyDesc")
model = stringIndexer.fit(df).transform(df)
return modelindexed = indexer(data)
indexed.show()+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0| 0.0|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0| 1.0|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0| 1.0|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0| 0.0|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0| 0.0|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0| 3.0|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0| 2.0|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0| 0.0|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0| 2.0|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 1.0|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 2.0|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0| 0.0|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0| 0.0|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0| 1.0|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0| 0.0|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 4.0|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0| 0.0|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0| 0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+
only showing top 20 rows
'''indexer schema test'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False)])
fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0)]
fakeDf = spark.createDataFrame(fakeData, correctCols)
assert indexed.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, indexed.dtypes)0.4 Feature Assembler
featureAssembler combines features into one vector. Most mllib algorithms require this step.
https://spark.apache.org/docs/latest/ml-features.html#vectorassembler. In this task your vector
assembler should take and combine the following columns in the same order listed:
["gender", "age","friend_count","lifetime","game1","game2","game3","game4","country_index"].
param df: Dataframe that is transformed using indexer
return transformed Dataframe. The returned dataframe should have a new column called “features”
def featureAssembler(df):
assembler = VectorAssembler(
inputCols = ["gender", "age","friend_count","lifetime","game1","game2","game3","game4","country_index"],
outputCol = "features"
)
output = assembler.transform(df)
# YOUR CODE HERE
return outputassembled = featureAssembler(indexed)
assembled.show()+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index| features|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0| 0.0|[1.0,23.0,0.0,5.0...|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0| 1.0|[1.0,20.0,2.0,4.0...|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0| 1.0|[1.0,24.0,9.0,13....|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0| 0.0|[1.0,21.0,412.0,8...|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0| 0.0|[1.0,20.0,0.0,38....|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0| 3.0|[0.0,22.0,86.0,2....|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0| 2.0|[1.0,27.0,2.0,5.0...|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0| 0.0|[0.0,30.0,0.0,45....|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0| 2.0|[0.0,26.0,7.0,20....|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 1.0|[0.0,18.0,0.0,7.0...|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|(9,[0,1],[1.0,22.0])|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 2.0|[1.0,24.0,417.0,1...|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0| 0.0|[0.0,21.0,3.0,3.0...|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|[1.0,18.0,8.0,1.0...|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0| 0.0|[0.0,23.0,7.0,8.0...|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0| 1.0|[1.0,22.0,60.0,8....|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0| 0.0|[1.0,18.0,3.0,7.0...|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 4.0|(9,[1,8],[20.0,4.0])|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0| 0.0|[1.0,22.0,5.0,83....|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0| 0.0|[1.0,23.0,351.0,5...|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+
only showing top 20 rows
'''assembler schema test'''
from pyspark.ml.linalg import *
from pyspark.ml.linalg import VectorUDT
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True)])
fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])))]
fakeDf = spark.createDataFrame(fakeData, correctCols)
assert assembled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, assembled.dtypes)0.5 Scaler
scaler standardizes data to improve performance.
https://spark.apache.org/docs/latest/ml-features.html#standardscaler.
For this task please remember to set the withStd and withMean parameters to true.
param df Dataframe that is transformed using featureAssembler
param outputColName name of the scaled feature vector (output column name)
return transformed Dataframe. The returned dataframe should have a new column named after the passed outputColName parameter.
def scaler(df, outputColName):
# YOUR CODE HERE
scaler = StandardScaler(inputCol="features",outputCol=outputColName,withStd=True,withMean=True)
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)
return scaledDatascaled = scaler(assembled, "scaledFeatures")
scaled.show()+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index| features| scaledFeatures|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
| 1.0|23.0| USA| 0.0| 5.0| 0.0| 1.0| 1.0| 3.0| 0.0| 0.0|[1.0,23.0,0.0,5.0...|[0.90079513989473...|
| 1.0|20.0| UK| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0| 0.0| 1.0|[1.0,20.0,2.0,4.0...|[0.90079513989473...|
| 1.0|24.0| UK| 9.0| 13.0| 1.0| 0.0| 2.0| 10.0| 0.0| 1.0|[1.0,24.0,9.0,13....|[0.90079513989473...|
| 1.0|21.0| USA| 412.0| 80.0| 8.0| 7.0| 17.0| 48.0| 1.0| 0.0|[1.0,21.0,412.0,8...|[0.90079513989473...|
| 1.0|20.0| USA| 0.0| 38.0| 0.0| 3.0| 6.0| 29.0| 0.0| 0.0|[1.0,20.0,0.0,38....|[0.90079513989473...|
| 0.0|22.0| FRANCE| 86.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0| 3.0|[0.0,22.0,86.0,2....|[-1.1099082973702...|
| 1.0|27.0|GERMANY| 2.0| 5.0| 1.0| 1.0| 1.0| 2.0| 0.0| 2.0|[1.0,27.0,2.0,5.0...|[0.90079513989473...|
| 0.0|30.0| USA| 0.0| 45.0| 19.0| 21.0| 3.0| 2.0| 0.0| 0.0|[0.0,30.0,0.0,45....|[-1.1099082973702...|
| 0.0|26.0|GERMANY| 7.0| 20.0| 6.0| 8.0| 5.0| 1.0| 0.0| 2.0|[0.0,26.0,7.0,20....|[-1.1099082973702...|
| 0.0|18.0| UK| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 1.0|[0.0,18.0,0.0,7.0...|[-1.1099082973702...|
| 1.0|22.0| USA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|(9,[0,1],[1.0,22.0])|[0.90079513989473...|
| 1.0|24.0|GERMANY| 417.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 2.0|[1.0,24.0,417.0,1...|[0.90079513989473...|
| 0.0|21.0| USA| 3.0| 3.0| 2.0| 1.0| 0.0| 0.0| 0.0| 0.0|[0.0,21.0,3.0,3.0...|[-1.1099082973702...|
| 1.0|18.0| USA| 8.0| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|[1.0,18.0,8.0,1.0...|[0.90079513989473...|
| 0.0|23.0| USA| 7.0| 8.0| 4.0| 2.0| 2.0| 0.0| 0.0| 0.0|[0.0,23.0,7.0,8.0...|[-1.1099082973702...|
| 1.0|22.0| UK| 60.0| 8.0| 0.0| 0.0| 4.0| 4.0| 0.0| 1.0|[1.0,22.0,60.0,8....|[0.90079513989473...|
| 1.0|18.0| USA| 3.0| 7.0| 0.0| 2.0| 1.0| 4.0| 0.0| 0.0|[1.0,18.0,3.0,7.0...|[0.90079513989473...|
| 0.0|20.0| CANADA| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 4.0|(9,[1,8],[20.0,4.0])|[-1.1099082973702...|
| 1.0|22.0| USA| 5.0| 83.0| 6.0| 9.0| 10.0| 58.0| 0.0| 0.0|[1.0,22.0,5.0,83....|[0.90079513989473...|
| 1.0|23.0| USA| 351.0| 5.0| 0.0| 0.0| 0.0| 5.0| 0.0| 0.0|[1.0,23.0,351.0,5...|[0.90079513989473...|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+
only showing top 20 rows
'''scaler schema test'''
correctCols = StructType([\
StructField("gender",DoubleType(),False),\
StructField("age",DoubleType(),False),\
StructField("country",StringType(),True),\
StructField("friend_count",DoubleType(),False),\
StructField("lifetime",DoubleType(),False),\
StructField("game1",DoubleType(),False),\
StructField("game2",DoubleType(),False),\
StructField("game3",DoubleType(),False),\
StructField("game4",DoubleType(),False),\
StructField("paid_customer",DoubleType(),False),\
StructField("country_index",DoubleType(),False),\
StructField("features", VectorUDT(),True),\
StructField("scaledFeatures", VectorUDT(),True)])
fakeData = [(0.0,1.0,"A",1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,(Vectors.dense([1.0, 2.0])),(Vectors.dense([2.0, 0.0])))]
fakeDf = spark.createDataFrame(fakeData, correctCols)
assert scaled.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, scaled.dtypes)0.6 Create Model
createModel creates a Logistic Regression model. When training, 5 iterations should be enough.
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression
param training transformed dataframe
param featuresCol name of the features column
param labelCol name of the label col (paid_customer)
param predCol name of the prediction col
return trained Logistic Regression model
def createModel(training, featuresCol, labelCol, predCol):
lr = LogisticRegression(maxIter = 5)
lr.setFeaturesCol(featuresCol)
lr.setLabelCol(labelCol)
lr.setPredictionCol(predCol)
lrModel = lr.fit(training)
# YOUR CODE HERE
#print(lrModel)
return lrModel#split the dataset into training(70%) and prediction(30%) sets
splitted = scaled.randomSplit([0.7,0.3])
model = createModel(splitted[0],"scaledFeatures","paid_customer","prediction")LogisticRegressionModel: uid=LogisticRegression_4951db8e602c, numClasses=2, numFeatures=9
0.7 Predict
Given a transformed and normalized dataset predict predicts if the customer is going to subscribe to the service.
85% correct will give you 3 points.
70% correct will give you 2 points.
50% correct will give you 1 point.
param model trained logistic regression model
param dataToPredict normalized dataframe for prediction
return DataFrame with predicted scores (1.0 == yes, 0.0 == no)
def predict(model, dataToPredict):
output = model.transform(dataToPredict)
# YOUR CODE HERE
return outputpredictions = predict(model, splitted[1])
correct = predictions.where("prediction == paid_customer").count()
total = predictions.count()
print((correct / total) * 100, "% predicted correctly")
predictions.show()95.80327868852459 % predicted correctly
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|gender| age|country|friend_count|lifetime|game1|game2|game3|game4|paid_customer|country_index| features| scaledFeatures| rawPrediction| probability|prediction|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
| 0.0|18.0| CANADA| 0.0| 1.0| 0.0| 0.0| 1.0| 0.0| 0.0| 4.0|(9,[1,3,6,8],[18....|[-1.1099082973702...|[4.99559262560287...|[0.99327778480500...| 0.0|
| 0.0|18.0| CANADA| 0.0| 54.0| 28.0| 15.0| 7.0| 4.0| 0.0| 4.0|[0.0,18.0,0.0,54....|[-1.1099082973702...|[3.57955249526850...|[0.97286847322793...| 0.0|
| 0.0|18.0| CANADA| 3.0| 26.0| 11.0| 6.0| 8.0| 1.0| 0.0| 4.0|[0.0,18.0,3.0,26....|[-1.1099082973702...|[4.14194339582123...|[0.98435666573439...| 0.0|
| 0.0|18.0| CANADA| 7.0| 6.0| 2.0| 4.0| 0.0| 0.0| 0.0| 4.0|[0.0,18.0,7.0,6.0...|[-1.1099082973702...|[4.87021457153555...|[0.99238668820979...| 0.0|
| 0.0|18.0| CANADA| 71.0| 47.0| 26.0| 11.0| 6.0| 4.0| 0.0| 4.0|[0.0,18.0,71.0,47...|[-1.1099082973702...|[3.23307263069343...|[0.96206006484016...| 0.0|
| 0.0|18.0| EGYPT| 6.0| 100.0| 45.0| 32.0| 16.0| 7.0| 0.0| 6.0|[0.0,18.0,6.0,100...|[-1.1099082973702...|[1.93735846291887...|[0.87406165588822...| 0.0|
| 0.0|18.0| EGYPT| 9.0| 4.0| 3.0| 0.0| 1.0| 0.0| 0.0| 6.0|[0.0,18.0,9.0,4.0...|[-1.1099082973702...|[4.67385048293821...|[0.99075010816165...| 0.0|
| 0.0|18.0| EGYPT| 9.0| 8.0| 3.0| 0.0| 4.0| 1.0| 0.0| 6.0|[0.0,18.0,9.0,8.0...|[-1.1099082973702...|[4.45248997207777...|[0.98848462479986...| 0.0|
| 0.0|18.0| FRANCE| 0.0| 9.0| 3.0| 4.0| 2.0| 0.0| 0.0| 3.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[4.87584026342622...|[0.99242907475375...| 0.0|
| 0.0|18.0| FRANCE| 0.0| 81.0| 46.0| 17.0| 12.0| 6.0| 0.0| 3.0|[0.0,18.0,0.0,81....|[-1.1099082973702...|[2.89556186017502...|[0.94762660725686...| 0.0|
| 0.0|18.0| FRANCE| 2.0| 87.0| 43.0| 23.0| 16.0| 5.0| 0.0| 3.0|[0.0,18.0,2.0,87....|[-1.1099082973702...|[2.57911443534171...|[0.92950526434730...| 0.0|
| 0.0|18.0| FRANCE| 9.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 3.0|(9,[1,2,8],[18.0,...|[-1.1099082973702...|[5.08632173265312...|[0.99385725410626...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 7.0| 5.0| 2.0| 0.0| 0.0| 0.0| 2.0|[0.0,18.0,0.0,7.0...|[-1.1099082973702...|[5.10646771499869...|[0.99397902988660...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 9.0| 4.0| 2.0| 1.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[4.99666173139974...|[0.99328491949002...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 9.0| 6.0| 2.0| 0.0| 1.0| 0.0| 2.0|[0.0,18.0,0.0,9.0...|[-1.1099082973702...|[5.05345088485828...|[0.99365328438896...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 19.0| 5.0| 7.0| 5.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,19....|[-1.1099082973702...|[4.60742115368499...|[0.99012105172261...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 24.0| 13.0| 6.0| 3.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,24....|[-1.1099082973702...|[4.59950569851941...|[0.99004332671759...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 38.0| 20.0| 13.0| 3.0| 2.0| 0.0| 2.0|[0.0,18.0,0.0,38....|[-1.1099082973702...|[4.29553129537215...|[0.98655393230056...| 0.0|
| 0.0|18.0|GERMANY| 0.0| 66.0| 29.0| 20.0| 11.0| 6.0| 0.0| 2.0|[0.0,18.0,0.0,66....|[-1.1099082973702...|[3.31748753051268...|[0.96502388763751...| 0.0|
| 0.0|18.0|GERMANY| 1.0| 2.0| 0.0| 1.0| 1.0| 0.0| 0.0| 2.0|[0.0,18.0,1.0,2.0...|[-1.1099082973702...|[5.15809258630837...|[0.99428024190099...| 0.0|
+------+----+-------+------------+--------+-----+-----+-----+-----+-------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
'''prediction correctness test'''
data = convert(randomData)
data.cache()
indexed = indexer(data)
assembled = featureAssembler(indexed)
scaled = scaler(assembled, "scaledFeatures")
splitted = scaled.randomSplit([0.7,0.3])
model = createModel(splitted[0],"scaledFeatures","paid_customer","prediction")
predictions = predict(model, splitted[1])
correct = predictions.where("prediction == paid_customer").count()
total = predictions.count()
answer = (correct / total) * 100
print(answer, "% predicted correctly")
assert answer >= 50.0, "less than 50% predicted correctly, you get 0 points"LogisticRegressionModel: uid=LogisticRegression_9c24b6de42a1, numClasses=2, numFeatures=9
95.787700084246 % predicted correctly
assert answer >= 70.0, "less than 70% predicted correctly, you get 1 point"assert answer >= 85.0, "less than 85% predicted correctly, you get 2 points"spark.stop()