PySpark

Posted on June 2, 2019
Tags: machinelearning
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import pandas as pd
import numpy as np
%matplotlib inline


spark = SparkSession.builder\
    .master("local[*]")\
    .appName("main")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.shuffle.service.enabled", "true")\
    .getOrCreate()\

#names of tables
airTraffic = "airtraffic"
carriers = "carriers"
airports = "airports"

carriersTable = spark.read.csv("carriers.csv", inferSchema="true", header="true")
carriersTable.createOrReplaceTempView(carriers)

airportsTable = spark.read.csv("airports.csv", inferSchema="true", header="true")
airportsTable.createOrReplaceTempView(airports)

sampleFile = "2008_sample.csv"
testFile = "2008_testsample.csv"
# Test if arrays that contain Row are equal
def correctRows(testArray, correctArray):
    for i in range(0, len(correctArray)):
        assert testArray[i].asDict() == correctArray[i].asDict(), "the row was expected to be %s but it was %s" % (correctArray[i].asDict(), testArray[i].asDict())

0.1 Load Data and Bind

Schema for airTraffic

Name Type
Year integer (nullable = true)
Month integer (nullable = true)
DayofMonth integer (nullable = true)
DayOfWeek integer (nullable = true)
DepTime integer (nullable = true)
CRSDepTime integer (nullable = true)
ArrTime integer (nullable = true)
CRSArrTime integer (nullable = true)
UniqueCarrier string (nullable = true)
FlightNum integer (nullable = true)
TailNum string (nullable = true)
ActualElapsedTime integer (nullable = true)
CRSElapsedTime integer (nullable = true)
AirTime integer (nullable = true)
ArrDelay integer (nullable = true)
DepDelay integer (nullable = true)
Origin string (nullable = true)
Dest string (nullable = true)
Distance integer (nullable = true)
TaxiIn integer (nullable = true)
TaxiOut integer (nullable = true)
Cancelled integer (nullable = true)
CancellationCode string (nullable = true)
Diverted integer (nullable = true)
CarrierDelay integer (nullable = true)
WeatherDelay integer (nullable = true)
NASDelay integer (nullable = true)
SecurityDelay integer (nullable = true)
LateAircraftDelay integer (nullable = true)
def loadBind(path,name):
    #load the raw dataSet to Spark dataframe
    df = spark.read.csv(path,nullValue="NA",inferSchema="true", header="true") 
    
    #bind the Spark dataframe to a name, which can be referred to in SQL
    #Notice gotcha, it's the Spark dataframe that binds the name
    df.createOrReplaceTempView(name) 
    
    return df
loadDataAndRegister = lambda x: loadBind(x,airTraffic)
# example print
data = loadDataAndRegister(sampleFile)
data.show(2) 
print("schema")
#carrierData = loadBind(sampleFile,carriers)
#print(carrierData.schema)
#spark.read.table(airports)
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    2|        28|        4|   1806|      1805|   1818|      1820|           WN|     1642| N392SW|               72|            75|     55|      -2|       1|   SLC| LAS|     368|     6|     11|        1|               D|       0|        null|        null|    null|         null|             null|
|2008|    4|         6|        7|   1527|      1531|   1636|      1627|           NW|     1757|  N9337|               69|            56|     30|       9|      -4|   DTW| CMH|     155|     2|     37|        0|            null|       0|        null|        null|    null|         null|             null|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
only showing top 2 rows

schema
'''loadDataAndRegister tests'''

df = loadBind(testFile,airTraffic)

# Table "airtraffic" should exists
assert spark.sql("SHOW TABLES Like 'airtraffic'").count() == 1, "there was expected to be a table called 'airtraffic'"

# Columns should have correct values
third = df.collect()[2]
correctRow = Row(Year=2008, Month=5, DayofMonth=6, DayOfWeek=2, DepTime=611,
                             CRSDepTime=615, ArrTime=729, CRSArrTime=735, UniqueCarrier='EV',
                             FlightNum=4794, TailNum='N916EV', ActualElapsedTime=78,
                             CRSElapsedTime=80, AirTime=58, ArrDelay=-6, DepDelay=-4,
                             Origin='ROA', Dest='ATL', Distance=357, TaxiIn=9, TaxiOut=11,
                             Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=None,
                             WeatherDelay=None, NASDelay=None, SecurityDelay=None,
                             LateAircraftDelay=None).asDict()

assert third.asDict() == correctRow, "the row was expected to be %s but it was %s" % (correctRow, third.asDict())

0.2 Count Flights

TailNum = Identitification Code of Airplane
Each appearance of a TailNum in Airtraffic dataframe represents 1 flight
CountFlight returns number of flight for each airplane
By returning the number of occurrences of TailNum in Airtraffic DataFrame

TailNum count
N693BR 1526
N646BR 1505
N476HA 1490
N485HA 1441
N486HA 1439

def CountFlight(df):
    # YOUR CODE HERE
    CountDesc = (df
                 .groupby("TailNum")
                 .count()
                 .sort(f.desc("count")) 
                )
    #groupby() aggregates dataframe to groups by TailNum
    #count() counts elements in each group
    #sort(f.desc("count")) sorts by the count 
    
    CountDescExNull = (CountDesc
                       .filter(CountDesc.TailNum.isNotNull())
                      )
    #filter() will only include non-Null TailNum
    
    return CountDescExNull
# example print
data = loadBind(testFile,airTraffic)
CountFlight(data).show(20)
#flightCount(data)[:5]
+-------+-----+
|TailNum|count|
+-------+-----+
| N824AS|    2|
| N873AS|    1|
| N856AS|    1|
| N916EV|    1|
| N886AS|    1|
| N881AS|    1|
+-------+-----+
'''flightCount tests'''

data = loadBind(testFile,airTraffic)
        
correct = [Row(TailNum='N824AS', count=2),
           Row(TailNum='N856AS', count=1),
           Row(TailNum='N886AS', count=1),
           Row(TailNum='N916EV', count=1),
           Row(TailNum='N873AS', count=1),
           Row(TailNum='N881AS', count=1)]

correctRows(CountFlight(data).collect(), correct)

0.3 Cancelled Flights

CancellationCode = ‘D’
returns dataframe showing which flights were cancelled and their destination

FlightNum Dest
4285 DHN
4790 ATL
3631 LEX
3632 DFW
def SecurityCancel(df):
    output= spark.sql("""SELECT FlightNum, Dest from airtraffic WHERE CancellationCode='D';""")
    return output
# example print
data = loadBind(sampleFile,airTraffic)
SecurityCancel(data).show(5)
+---------+----+
|FlightNum|Dest|
+---------+----+
|     1642| LAS|
|      585| MSP|
+---------+----+
'''cancelledDueToSecurity tests'''

data = loadDataAndRegister(testFile)
correct = [Row(FlightNum=4794, Dest='JFK'), Row(FlightNum=4794, Dest='ATL')]

correctRows(SecurityCancel(data).collect(), correct)

0.4 Weather Delay

MaxWeatherDelay :: Airtraffic_df -> delay_df
delay_df is a singular dataframe containing the longest weather delay
Between start of January and end of March

_c0
1148
def MaxWeatherDelay(df):
    output = spark.sql(""" SELECT MAX(WeatherDelay) as _c0 from airtraffic WHERE Month >= 1 and Month <= 3""")
    return output
    
# example print
data = loadBind(sampleFile,airTraffic)
MaxWeatherDelay(data).show()

delay = longestWeatherDelay(data).first()[0]
print(f"longest weather delay between jan and mar is {delay}"   )
+---+
|_c0|
+---+
| 40|
+---+

longest weather delay between jan and mar is 40
'''longestWeatherDelay tests'''

data = loadDataAndRegister(testFile)
test = longestWeatherDelay(data).first()[0]

assert test == 7, "the longest weather delay was expected to be 7 but it was %s" % test

0.5 No Flights

NoFlight_df is a dataframe that contains the name of airlines that did not fly.
carriers_df has relation (Description,Code)
airtraffic_df has Code in it’s schema for all planes that flew

Goal. Get subsets of carriers_df that did not fly

every element of airtraffic_df flew
carriers_df - (carriers_df intersect airtraffic_df) = planes that did not fly

def didNotFly(df):
    #a = spark.sql("SELECT DESCRIPTION from AirTraffic INNER JOIN carriers on carriers.Code = AirTraffic.UniqueCarrier;")
    a = spark.sql("SELECT Description FROM carriers WHERE Code NOT IN (SELECT Code from airtraffic INNER JOIN carriers on carriers.Code = airtraffic.UniqueCarrier);")
    #a = spark.sql("Select Code FROM carrierse")
    return a
data = loadBind(testFile,airTraffic)
didNotFly(data).show(5)
+--------------------+
|         Description|
+--------------------+
|  Tradewind Aviation|
|     Comlux Aviation|
|Master Top Linhas...|
| Flair Airlines Ltd.|
|           Swift Air|
+--------------------+
only showing top 5 rows
'''didNotFly tests'''

data = loadDataAndRegister(testFile)
test = didNotFly(data).count()

assert test == 1489, "the amount of airlines that didn't fly was expected to be 1489 but it was %s" % test

0.6 Flights Vegas to JFK

Return description and number of flights from Vegas to JFK
Countflight_df Schema (Description,Count)
carriers_df Schema (Code)

Description Num
JetBlue Airways 566
Delta Air Lines Inc. 441
US Airways Inc. (… 344
American Airlines… 121
def flightsFromVegasToJFK(df):
    fdata = CountFlight(df)
    #loadDataAndRegister("flightCount")
    fdata.createOrReplaceTempView("flightCount")
    output = spark.sql("""
    SELECT Description, count as Num FROM 
        flightCount 
        INNER JOIN
        (SELECT * FROM 
            carriers 
            INNER JOIN 
            (SELECT DISTINCT UniqueCarrier,TailNum FROM airtraffic WHERE Origin = 'LAS' AND Dest = 'JFK') as T1
            ON
            carriers.Code = T1.UniqueCarrier) as T2
        ON flightCount.TailNum = T2.TailNum
            ORDER BY Num DESC
    
    """)
    # YOUR CODE HERE
    return output
# example print
data = loadBind(testFile,airTraffic)
flightsFromVegasToJFK(data).show(5)
+--------------------+---+
|         Description|Num|
+--------------------+---+
|       Titan Airways|  1|
|Atlantic Southeas...|  1|
+--------------------+---+
'''flightsFromVegasToJFK tests'''

data = loadDataAndRegister(testFile)
correct = [Row(Description='Titan Airways', Num=1),
           Row(Description='Atlantic Southeast Airlines', Num=1)]
correctRows(flightsFromVegasToJFK(data).collect(), correct)

0.7 Taxi Time

TaxiTime : Returns dataframe of average time in taxi at each airport
Airtraffic relevant Schema(Origin, Dest, TaxiIn, TaxiOut)
Origin = Airport of origin
Dest = Airport of Dest
TaxiIn = Time spent in taxi at Origin
TaxiOut = Time spent in taxi at Dest

airport taxi
DLG 4.0
BRW 5.051010191310567
OME 6.034800675790983
AKN 6.75
SCC 6.842553191489362
def TaxiTime(df):
    output = spark.sql("""
        SELECT T1.Origin as airport,(A1+A2)/2 as taxi FROM
            (SELECT Origin,AVG(TaxiIn) AS A1 FROM AirTraffic GROUP BY Origin) AS T1
            INNER JOIN
            (SELECT Dest,AVG(TaxiOut) AS A2 FROM AirTraffic GROUP BY Dest) AS T2
            ON
                T1.Origin = T2.Dest
            ORDER BY
                taxi ASC
            
    """)
    
    
    return output
data = loadBind(testFile,airTraffic)
TaxiTime(data).show(5)
+-------+-----+
|airport| taxi|
+-------+-----+
|    LAS| 11.0|
|    JFK|13.25|
+-------+-----+
'''timeSpentTaxiing tests'''

data = loadDataAndRegister(testFile)
correct = [Row(airport='LAS', taxi=11.0), Row(airport='JFK', taxi=13.25)]
correctRows(TaxiTime(data).collect(), correct)

0.8 Median Distance

distanceMedian returns a DataFrame containing the median travel distance.

_ c0
583.0
def distanceMedian(df):
    a = spark.sql("""
        SELECT percentile(T1,0.5)as `_ c0` FROM
            (SELECT Distance FROM AirTraffic) AS Tab(T1)
    """)
    # YOUR CODE HERE
    return a
data = loadBind(testFile,airTraffic)
distanceMedian(data).show()
+-----+
| _ c0|
+-----+
|357.0|
+-----+
'''distanceMedian tests'''

data = loadDataAndRegister(testFile)
test = distanceMedian(data).first()[0]
assert test == 357.0, "the distance median was expected to be 357.0 but it was %s" % test

0.9 95th Percentile

DataFrame containing the 95th percentile of carrier delay.

Example output:

_ c0
77.0
def score95(df):
    a = spark.sql("""
        SELECT percentile(T1,0.95)as `_ c0` FROM
            (SELECT CarrierDelay FROM AirTraffic) AS Tab(T1)
    """)
    # YOUR CODE HERE
    return a
data = loadBind(testFile,airTraffic)
score95(data).show()
+----+
|_ c0|
+----+
|17.0|
+----+
'''score95 tests'''

data = loadDataAndRegister(testFile)
test = score95(data).first()[0]
assert test == 17.0, "the score95 was expected to be 17.0 but it was %s" % test

0.10 Cancelled Flights

cancelledFlights finds airports where flights were cancelled.

return: DataFrame containing columns “airport”, “city” and “percentage”.
airports relevant schema (airport,city)
percentage = (# cancelled flight/total flight)

Example output:

airport city percentage
Pellston Regional… Pellston 0.3157894736842105
Waterloo Municipal Waterloo 0.25
Telluride Regional Telluride 0.21084337349397592
Houghton County M… Hancock 0.19834710743801653
Rhinelander-Oneid… Rhinelander 0.15625
def cancelledFlights(df):
    a = spark.sql("""
        SELECT airports.airport, airports.city, T1.percentage 
        FROM
            (SELECT Origin,SUM(Cancelled)/COUNT(Origin) as percentage FROM AirTraffic 
            GROUP BY Origin) AS T1
        INNER JOIN
            airports
        ON
            T1.Origin = airports.iata
        ORDER BY
            T1.percentage DESC,
            airports.airport DESC
    """)
    
    # YOUR CODE HERE
    return a
data = loadBind(testFile,airTraffic)
cancelledFlights(data).show(5)
+--------------------+---------+----------+
|             airport|     city|percentage|
+--------------------+---------+----------+
|McCarran Internat...|Las Vegas|       0.5|
|Roanoke Regional/...|  Roanoke|      0.25|
| John F Kennedy Intl| New York|       0.0|
+--------------------+---------+----------+
'''cancelledFlights tests'''

data = loadDataAndRegister(testFile)
correct = [Row(airport='McCarran International', city='Las Vegas', percentage=0.5),
           Row(airport='Roanoke Regional/ Woodrum ', city='Roanoke', percentage=0.25)]
correctRows(cancelledFlights(data).collect(), correct)

0.11 Least Squares

leastSquares calculates the linear least squares approximation for relationship between DepDelay and WeatherDelay.
y = Bx + c where B is the slope
WeatherDelay = B*(DepDelay) + c

returns (y-intercept,B)

filter out DepDelay < 0. If multiple WeatherDelay then average them.

def leastSquares(df):
    try:
        a = spark.sql("""
            SELECT DD, WD, DD*WD as Prod, DD*DD as Sqx  
            FROM
            (SELECT DepDelay as DD,AVG(WeatherDelay) as WD FROM AirTraffic
            WHERE DepDelay > 0
            GROUP BY DepDelay) AS T1
        """)
        # YOUR CODE HERE
        xsum = a.groupBy().sum('DD').collect()[0][0]
        ysum = a.groupBy().sum('WD').collect()[0][0]
        psum = a.groupBy().sum('Prod').collect()[0][0]
        sqxsum = a.groupBy().sum('Sqx').collect()[0][0]
        #print(xsum)
        N = a.count()
        varM = ((N * psum)-(xsum*ysum))/((N * sqxsum)-(xsum**2))
        varB = ((ysum*sqxsum)-(xsum*psum))/((N*sqxsum)-(xsum**2)) 
        return varB,varM
    except error:
        return 0.0,0.0
data = loadBind(testFile,airTraffic)
leastSquares(data)
#leastSquares(data)[0].show(4)
#print(leastSquares(data)[1])
#print(leastSquares(data)[2])
(952.0, -56.0)
'''leastSquaresTests'''

data = loadDataAndRegister(testFile)
test = leastSquares(data)
assert test == (952.0, -56.0), "the answer was expected to be (952.0, -56.0) but it was %s" % test
spark.stop()