PySpark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import pandas as pd
import numpy as np
%matplotlib inline
= SparkSession.builder\
spark "local[*]")\
.master("main")\
.appName("spark.dynamicAllocation.enabled", "true")\
.config("spark.shuffle.service.enabled", "true")\
.config(\
.getOrCreate()
#names of tables
= "airtraffic"
airTraffic = "carriers"
carriers = "airports"
airports
= spark.read.csv("carriers.csv", inferSchema="true", header="true")
carriersTable
carriersTable.createOrReplaceTempView(carriers)
= spark.read.csv("airports.csv", inferSchema="true", header="true")
airportsTable
airportsTable.createOrReplaceTempView(airports)
= "2008_sample.csv"
sampleFile = "2008_testsample.csv" testFile
# Test if arrays that contain Row are equal
def correctRows(testArray, correctArray):
for i in range(0, len(correctArray)):
assert testArray[i].asDict() == correctArray[i].asDict(), "the row was expected to be %s but it was %s" % (correctArray[i].asDict(), testArray[i].asDict())
0.1 Load Data and Bind
Schema for airTraffic
Name | Type |
---|---|
Year | integer (nullable = true) |
Month | integer (nullable = true) |
DayofMonth | integer (nullable = true) |
DayOfWeek | integer (nullable = true) |
DepTime | integer (nullable = true) |
CRSDepTime | integer (nullable = true) |
ArrTime | integer (nullable = true) |
CRSArrTime | integer (nullable = true) |
UniqueCarrier | string (nullable = true) |
FlightNum | integer (nullable = true) |
TailNum | string (nullable = true) |
ActualElapsedTime | integer (nullable = true) |
CRSElapsedTime | integer (nullable = true) |
AirTime | integer (nullable = true) |
ArrDelay | integer (nullable = true) |
DepDelay | integer (nullable = true) |
Origin | string (nullable = true) |
Dest | string (nullable = true) |
Distance | integer (nullable = true) |
TaxiIn | integer (nullable = true) |
TaxiOut | integer (nullable = true) |
Cancelled | integer (nullable = true) |
CancellationCode | string (nullable = true) |
Diverted | integer (nullable = true) |
CarrierDelay | integer (nullable = true) |
WeatherDelay | integer (nullable = true) |
NASDelay | integer (nullable = true) |
SecurityDelay | integer (nullable = true) |
LateAircraftDelay | integer (nullable = true) |
def loadBind(path,name):
#load the raw dataSet to Spark dataframe
= spark.read.csv(path,nullValue="NA",inferSchema="true", header="true")
df
#bind the Spark dataframe to a name, which can be referred to in SQL
#Notice gotcha, it's the Spark dataframe that binds the name
df.createOrReplaceTempView(name)
return df
= lambda x: loadBind(x,airTraffic) loadDataAndRegister
# example print
= loadDataAndRegister(sampleFile)
data 2)
data.show(print("schema")
#carrierData = loadBind(sampleFile,carriers)
#print(carrierData.schema)
#spark.read.table(airports)
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008| 2| 28| 4| 1806| 1805| 1818| 1820| WN| 1642| N392SW| 72| 75| 55| -2| 1| SLC| LAS| 368| 6| 11| 1| D| 0| null| null| null| null| null|
|2008| 4| 6| 7| 1527| 1531| 1636| 1627| NW| 1757| N9337| 69| 56| 30| 9| -4| DTW| CMH| 155| 2| 37| 0| null| 0| null| null| null| null| null|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
only showing top 2 rows
schema
'''loadDataAndRegister tests'''
= loadBind(testFile,airTraffic)
df
# Table "airtraffic" should exists
assert spark.sql("SHOW TABLES Like 'airtraffic'").count() == 1, "there was expected to be a table called 'airtraffic'"
# Columns should have correct values
= df.collect()[2]
third = Row(Year=2008, Month=5, DayofMonth=6, DayOfWeek=2, DepTime=611,
correctRow =615, ArrTime=729, CRSArrTime=735, UniqueCarrier='EV',
CRSDepTime=4794, TailNum='N916EV', ActualElapsedTime=78,
FlightNum=80, AirTime=58, ArrDelay=-6, DepDelay=-4,
CRSElapsedTime='ROA', Dest='ATL', Distance=357, TaxiIn=9, TaxiOut=11,
Origin=0, CancellationCode=None, Diverted=0, CarrierDelay=None,
Cancelled=None, NASDelay=None, SecurityDelay=None,
WeatherDelay=None).asDict()
LateAircraftDelay
assert third.asDict() == correctRow, "the row was expected to be %s but it was %s" % (correctRow, third.asDict())
0.2 Count Flights
TailNum = Identitification Code of Airplane
Each appearance of a TailNum in Airtraffic dataframe represents 1 flight
CountFlight returns number of flight for each airplane
By returning the number of occurrences of TailNum in Airtraffic DataFrame
TailNum | count |
---|---|
N693BR | 1526 |
N646BR | 1505 |
N476HA | 1490 |
N485HA | 1441 |
N486HA | 1439 |
def CountFlight(df):
# YOUR CODE HERE
= (df
CountDesc "TailNum")
.groupby(
.count()"count"))
.sort(f.desc(
)#groupby() aggregates dataframe to groups by TailNum
#count() counts elements in each group
#sort(f.desc("count")) sorts by the count
= (CountDesc
CountDescExNull filter(CountDesc.TailNum.isNotNull())
.
)#filter() will only include non-Null TailNum
return CountDescExNull
# example print
= loadBind(testFile,airTraffic)
data 20)
CountFlight(data).show(#flightCount(data)[:5]
+-------+-----+
|TailNum|count|
+-------+-----+
| N824AS| 2|
| N873AS| 1|
| N856AS| 1|
| N916EV| 1|
| N886AS| 1|
| N881AS| 1|
+-------+-----+
'''flightCount tests'''
= loadBind(testFile,airTraffic)
data
= [Row(TailNum='N824AS', count=2),
correct ='N856AS', count=1),
Row(TailNum='N886AS', count=1),
Row(TailNum='N916EV', count=1),
Row(TailNum='N873AS', count=1),
Row(TailNum='N881AS', count=1)]
Row(TailNum
correctRows(CountFlight(data).collect(), correct)
0.3 Cancelled Flights
CancellationCode = ‘D’
returns dataframe showing which flights were cancelled and their destination
FlightNum | Dest |
---|---|
4285 | DHN |
4790 | ATL |
3631 | LEX |
3632 | DFW |
def SecurityCancel(df):
= spark.sql("""SELECT FlightNum, Dest from airtraffic WHERE CancellationCode='D';""")
outputreturn output
# example print
= loadBind(sampleFile,airTraffic)
data 5) SecurityCancel(data).show(
+---------+----+
|FlightNum|Dest|
+---------+----+
| 1642| LAS|
| 585| MSP|
+---------+----+
'''cancelledDueToSecurity tests'''
= loadDataAndRegister(testFile)
data = [Row(FlightNum=4794, Dest='JFK'), Row(FlightNum=4794, Dest='ATL')]
correct
correctRows(SecurityCancel(data).collect(), correct)
0.4 Weather Delay
MaxWeatherDelay :: Airtraffic_df -> delay_df
delay_df is a singular dataframe containing the longest weather delay
Between start of January and end of March
_c0 |
---|
1148 |
def MaxWeatherDelay(df):
= spark.sql(""" SELECT MAX(WeatherDelay) as _c0 from airtraffic WHERE Month >= 1 and Month <= 3""")
output return output
# example print
= loadBind(sampleFile,airTraffic)
data
MaxWeatherDelay(data).show()
= longestWeatherDelay(data).first()[0]
delay print(f"longest weather delay between jan and mar is {delay}" )
+---+
|_c0|
+---+
| 40|
+---+
longest weather delay between jan and mar is 40
'''longestWeatherDelay tests'''
= loadDataAndRegister(testFile)
data = longestWeatherDelay(data).first()[0]
test
assert test == 7, "the longest weather delay was expected to be 7 but it was %s" % test
0.5 No Flights
NoFlight_df is a dataframe that contains the name of airlines that did not fly.
carriers_df has relation (Description,Code)
airtraffic_df has Code in it’s schema for all planes that flew
Goal. Get subsets of carriers_df that did not fly
every element of airtraffic_df flew
carriers_df - (carriers_df intersect airtraffic_df) = planes that did not fly
def didNotFly(df):
#a = spark.sql("SELECT DESCRIPTION from AirTraffic INNER JOIN carriers on carriers.Code = AirTraffic.UniqueCarrier;")
= spark.sql("SELECT Description FROM carriers WHERE Code NOT IN (SELECT Code from airtraffic INNER JOIN carriers on carriers.Code = airtraffic.UniqueCarrier);")
a #a = spark.sql("Select Code FROM carrierse")
return a
= loadBind(testFile,airTraffic)
data 5) didNotFly(data).show(
+--------------------+
| Description|
+--------------------+
| Tradewind Aviation|
| Comlux Aviation|
|Master Top Linhas...|
| Flair Airlines Ltd.|
| Swift Air|
+--------------------+
only showing top 5 rows
'''didNotFly tests'''
= loadDataAndRegister(testFile)
data = didNotFly(data).count()
test
assert test == 1489, "the amount of airlines that didn't fly was expected to be 1489 but it was %s" % test
0.6 Flights Vegas to JFK
Return description and number of flights from Vegas to JFK
Countflight_df Schema (Description,Count)
carriers_df Schema (Code)
Description | Num |
---|---|
JetBlue Airways | 566 |
Delta Air Lines Inc. | 441 |
US Airways Inc. (… | 344 |
American Airlines… | 121 |
def flightsFromVegasToJFK(df):
= CountFlight(df)
fdata #loadDataAndRegister("flightCount")
"flightCount")
fdata.createOrReplaceTempView(= spark.sql("""
output SELECT Description, count as Num FROM
flightCount
INNER JOIN
(SELECT * FROM
carriers
INNER JOIN
(SELECT DISTINCT UniqueCarrier,TailNum FROM airtraffic WHERE Origin = 'LAS' AND Dest = 'JFK') as T1
ON
carriers.Code = T1.UniqueCarrier) as T2
ON flightCount.TailNum = T2.TailNum
ORDER BY Num DESC
""")
# YOUR CODE HERE
return output
# example print
= loadBind(testFile,airTraffic)
data 5) flightsFromVegasToJFK(data).show(
+--------------------+---+
| Description|Num|
+--------------------+---+
| Titan Airways| 1|
|Atlantic Southeas...| 1|
+--------------------+---+
'''flightsFromVegasToJFK tests'''
= loadDataAndRegister(testFile)
data = [Row(Description='Titan Airways', Num=1),
correct ='Atlantic Southeast Airlines', Num=1)]
Row(Description correctRows(flightsFromVegasToJFK(data).collect(), correct)
0.7 Taxi Time
TaxiTime : Returns dataframe of average time in taxi at each airport
Airtraffic relevant Schema(Origin, Dest, TaxiIn, TaxiOut)
Origin = Airport of origin
Dest = Airport of Dest
TaxiIn = Time spent in taxi at Origin
TaxiOut = Time spent in taxi at Dest
airport | taxi |
---|---|
DLG | 4.0 |
BRW | 5.051010191310567 |
OME | 6.034800675790983 |
AKN | 6.75 |
SCC | 6.842553191489362 |
def TaxiTime(df):
= spark.sql("""
output SELECT T1.Origin as airport,(A1+A2)/2 as taxi FROM
(SELECT Origin,AVG(TaxiIn) AS A1 FROM AirTraffic GROUP BY Origin) AS T1
INNER JOIN
(SELECT Dest,AVG(TaxiOut) AS A2 FROM AirTraffic GROUP BY Dest) AS T2
ON
T1.Origin = T2.Dest
ORDER BY
taxi ASC
""")
return output
= loadBind(testFile,airTraffic)
data 5) TaxiTime(data).show(
+-------+-----+
|airport| taxi|
+-------+-----+
| LAS| 11.0|
| JFK|13.25|
+-------+-----+
'''timeSpentTaxiing tests'''
= loadDataAndRegister(testFile)
data = [Row(airport='LAS', taxi=11.0), Row(airport='JFK', taxi=13.25)]
correct correctRows(TaxiTime(data).collect(), correct)
0.8 Median Distance
distanceMedian
returns a DataFrame containing the median travel distance.
_ c0 |
---|
583.0 |
def distanceMedian(df):
= spark.sql("""
a SELECT percentile(T1,0.5)as `_ c0` FROM
(SELECT Distance FROM AirTraffic) AS Tab(T1)
""")
# YOUR CODE HERE
return a
= loadBind(testFile,airTraffic)
data distanceMedian(data).show()
+-----+
| _ c0|
+-----+
|357.0|
+-----+
'''distanceMedian tests'''
= loadDataAndRegister(testFile)
data = distanceMedian(data).first()[0]
test assert test == 357.0, "the distance median was expected to be 357.0 but it was %s" % test
0.9 95th Percentile
DataFrame containing the 95th percentile of carrier delay.
Example output:
_ c0 |
---|
77.0 |
def score95(df):
= spark.sql("""
a SELECT percentile(T1,0.95)as `_ c0` FROM
(SELECT CarrierDelay FROM AirTraffic) AS Tab(T1)
""")
# YOUR CODE HERE
return a
= loadBind(testFile,airTraffic)
data score95(data).show()
+----+
|_ c0|
+----+
|17.0|
+----+
'''score95 tests'''
= loadDataAndRegister(testFile)
data = score95(data).first()[0]
test assert test == 17.0, "the score95 was expected to be 17.0 but it was %s" % test
0.10 Cancelled Flights
cancelledFlights
finds airports where flights were cancelled.
return: DataFrame containing columns “airport”, “city” and “percentage”.
airports relevant schema (airport,city)
percentage = (# cancelled flight/total flight)
Example output:
airport | city | percentage |
---|---|---|
Pellston Regional… | Pellston | 0.3157894736842105 |
Waterloo Municipal | Waterloo | 0.25 |
Telluride Regional | Telluride | 0.21084337349397592 |
Houghton County M… | Hancock | 0.19834710743801653 |
Rhinelander-Oneid… | Rhinelander | 0.15625 |
def cancelledFlights(df):
= spark.sql("""
a SELECT airports.airport, airports.city, T1.percentage
FROM
(SELECT Origin,SUM(Cancelled)/COUNT(Origin) as percentage FROM AirTraffic
GROUP BY Origin) AS T1
INNER JOIN
airports
ON
T1.Origin = airports.iata
ORDER BY
T1.percentage DESC,
airports.airport DESC
""")
# YOUR CODE HERE
return a
= loadBind(testFile,airTraffic)
data 5) cancelledFlights(data).show(
+--------------------+---------+----------+
| airport| city|percentage|
+--------------------+---------+----------+
|McCarran Internat...|Las Vegas| 0.5|
|Roanoke Regional/...| Roanoke| 0.25|
| John F Kennedy Intl| New York| 0.0|
+--------------------+---------+----------+
'''cancelledFlights tests'''
= loadDataAndRegister(testFile)
data = [Row(airport='McCarran International', city='Las Vegas', percentage=0.5),
correct ='Roanoke Regional/ Woodrum ', city='Roanoke', percentage=0.25)]
Row(airport correctRows(cancelledFlights(data).collect(), correct)
0.11 Least Squares
leastSquares
calculates the linear least squares approximation for relationship between DepDelay and WeatherDelay.
y = Bx + c where B is the slope
WeatherDelay = B*(DepDelay) + c
returns (y-intercept,B)
filter out DepDelay < 0. If multiple WeatherDelay then average them.
def leastSquares(df):
try:
= spark.sql("""
a SELECT DD, WD, DD*WD as Prod, DD*DD as Sqx
FROM
(SELECT DepDelay as DD,AVG(WeatherDelay) as WD FROM AirTraffic
WHERE DepDelay > 0
GROUP BY DepDelay) AS T1
""")
# YOUR CODE HERE
= a.groupBy().sum('DD').collect()[0][0]
xsum = a.groupBy().sum('WD').collect()[0][0]
ysum = a.groupBy().sum('Prod').collect()[0][0]
psum = a.groupBy().sum('Sqx').collect()[0][0]
sqxsum #print(xsum)
= a.count()
N = ((N * psum)-(xsum*ysum))/((N * sqxsum)-(xsum**2))
varM = ((ysum*sqxsum)-(xsum*psum))/((N*sqxsum)-(xsum**2))
varB return varB,varM
except error:
return 0.0,0.0
= loadBind(testFile,airTraffic)
data
leastSquares(data)#leastSquares(data)[0].show(4)
#print(leastSquares(data)[1])
#print(leastSquares(data)[2])
(952.0, -56.0)
'''leastSquaresTests'''
= loadDataAndRegister(testFile)
data = leastSquares(data)
test assert test == (952.0, -56.0), "the answer was expected to be (952.0, -56.0) but it was %s" % test
spark.stop()