PySpark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import pandas as pd
import numpy as np
%matplotlib inline
spark = SparkSession.builder\
.master("local[*]")\
.appName("main")\
.config("spark.dynamicAllocation.enabled", "true")\
.config("spark.shuffle.service.enabled", "true")\
.getOrCreate()\
#names of tables
airTraffic = "airtraffic"
carriers = "carriers"
airports = "airports"
carriersTable = spark.read.csv("carriers.csv", inferSchema="true", header="true")
carriersTable.createOrReplaceTempView(carriers)
airportsTable = spark.read.csv("airports.csv", inferSchema="true", header="true")
airportsTable.createOrReplaceTempView(airports)
sampleFile = "2008_sample.csv"
testFile = "2008_testsample.csv"# Test if arrays that contain Row are equal
def correctRows(testArray, correctArray):
for i in range(0, len(correctArray)):
assert testArray[i].asDict() == correctArray[i].asDict(), "the row was expected to be %s but it was %s" % (correctArray[i].asDict(), testArray[i].asDict())0.1 Load Data and Bind
Schema for airTraffic
| Name | Type |
|---|---|
| Year | integer (nullable = true) |
| Month | integer (nullable = true) |
| DayofMonth | integer (nullable = true) |
| DayOfWeek | integer (nullable = true) |
| DepTime | integer (nullable = true) |
| CRSDepTime | integer (nullable = true) |
| ArrTime | integer (nullable = true) |
| CRSArrTime | integer (nullable = true) |
| UniqueCarrier | string (nullable = true) |
| FlightNum | integer (nullable = true) |
| TailNum | string (nullable = true) |
| ActualElapsedTime | integer (nullable = true) |
| CRSElapsedTime | integer (nullable = true) |
| AirTime | integer (nullable = true) |
| ArrDelay | integer (nullable = true) |
| DepDelay | integer (nullable = true) |
| Origin | string (nullable = true) |
| Dest | string (nullable = true) |
| Distance | integer (nullable = true) |
| TaxiIn | integer (nullable = true) |
| TaxiOut | integer (nullable = true) |
| Cancelled | integer (nullable = true) |
| CancellationCode | string (nullable = true) |
| Diverted | integer (nullable = true) |
| CarrierDelay | integer (nullable = true) |
| WeatherDelay | integer (nullable = true) |
| NASDelay | integer (nullable = true) |
| SecurityDelay | integer (nullable = true) |
| LateAircraftDelay | integer (nullable = true) |
def loadBind(path,name):
#load the raw dataSet to Spark dataframe
df = spark.read.csv(path,nullValue="NA",inferSchema="true", header="true")
#bind the Spark dataframe to a name, which can be referred to in SQL
#Notice gotcha, it's the Spark dataframe that binds the name
df.createOrReplaceTempView(name)
return df
loadDataAndRegister = lambda x: loadBind(x,airTraffic)# example print
data = loadDataAndRegister(sampleFile)
data.show(2)
print("schema")
#carrierData = loadBind(sampleFile,carriers)
#print(carrierData.schema)
#spark.read.table(airports)+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008| 2| 28| 4| 1806| 1805| 1818| 1820| WN| 1642| N392SW| 72| 75| 55| -2| 1| SLC| LAS| 368| 6| 11| 1| D| 0| null| null| null| null| null|
|2008| 4| 6| 7| 1527| 1531| 1636| 1627| NW| 1757| N9337| 69| 56| 30| 9| -4| DTW| CMH| 155| 2| 37| 0| null| 0| null| null| null| null| null|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
only showing top 2 rows
schema
'''loadDataAndRegister tests'''
df = loadBind(testFile,airTraffic)
# Table "airtraffic" should exists
assert spark.sql("SHOW TABLES Like 'airtraffic'").count() == 1, "there was expected to be a table called 'airtraffic'"
# Columns should have correct values
third = df.collect()[2]
correctRow = Row(Year=2008, Month=5, DayofMonth=6, DayOfWeek=2, DepTime=611,
CRSDepTime=615, ArrTime=729, CRSArrTime=735, UniqueCarrier='EV',
FlightNum=4794, TailNum='N916EV', ActualElapsedTime=78,
CRSElapsedTime=80, AirTime=58, ArrDelay=-6, DepDelay=-4,
Origin='ROA', Dest='ATL', Distance=357, TaxiIn=9, TaxiOut=11,
Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=None,
WeatherDelay=None, NASDelay=None, SecurityDelay=None,
LateAircraftDelay=None).asDict()
assert third.asDict() == correctRow, "the row was expected to be %s but it was %s" % (correctRow, third.asDict())0.2 Count Flights
TailNum = Identitification Code of Airplane
Each appearance of a TailNum in Airtraffic dataframe represents 1 flight
CountFlight returns number of flight for each airplane
By returning the number of occurrences of TailNum in Airtraffic DataFrame
| TailNum | count |
|---|---|
| N693BR | 1526 |
| N646BR | 1505 |
| N476HA | 1490 |
| N485HA | 1441 |
| N486HA | 1439 |
def CountFlight(df):
# YOUR CODE HERE
CountDesc = (df
.groupby("TailNum")
.count()
.sort(f.desc("count"))
)
#groupby() aggregates dataframe to groups by TailNum
#count() counts elements in each group
#sort(f.desc("count")) sorts by the count
CountDescExNull = (CountDesc
.filter(CountDesc.TailNum.isNotNull())
)
#filter() will only include non-Null TailNum
return CountDescExNull# example print
data = loadBind(testFile,airTraffic)
CountFlight(data).show(20)
#flightCount(data)[:5]+-------+-----+
|TailNum|count|
+-------+-----+
| N824AS| 2|
| N873AS| 1|
| N856AS| 1|
| N916EV| 1|
| N886AS| 1|
| N881AS| 1|
+-------+-----+
'''flightCount tests'''
data = loadBind(testFile,airTraffic)
correct = [Row(TailNum='N824AS', count=2),
Row(TailNum='N856AS', count=1),
Row(TailNum='N886AS', count=1),
Row(TailNum='N916EV', count=1),
Row(TailNum='N873AS', count=1),
Row(TailNum='N881AS', count=1)]
correctRows(CountFlight(data).collect(), correct)0.3 Cancelled Flights
CancellationCode = ‘D’
returns dataframe showing which flights were cancelled and their destination
| FlightNum | Dest |
|---|---|
| 4285 | DHN |
| 4790 | ATL |
| 3631 | LEX |
| 3632 | DFW |
def SecurityCancel(df):
output= spark.sql("""SELECT FlightNum, Dest from airtraffic WHERE CancellationCode='D';""")
return output# example print
data = loadBind(sampleFile,airTraffic)
SecurityCancel(data).show(5)+---------+----+
|FlightNum|Dest|
+---------+----+
| 1642| LAS|
| 585| MSP|
+---------+----+
'''cancelledDueToSecurity tests'''
data = loadDataAndRegister(testFile)
correct = [Row(FlightNum=4794, Dest='JFK'), Row(FlightNum=4794, Dest='ATL')]
correctRows(SecurityCancel(data).collect(), correct)0.4 Weather Delay
MaxWeatherDelay :: Airtraffic_df -> delay_df
delay_df is a singular dataframe containing the longest weather delay
Between start of January and end of March
| _c0 |
|---|
| 1148 |
def MaxWeatherDelay(df):
output = spark.sql(""" SELECT MAX(WeatherDelay) as _c0 from airtraffic WHERE Month >= 1 and Month <= 3""")
return output
# example print
data = loadBind(sampleFile,airTraffic)
MaxWeatherDelay(data).show()
delay = longestWeatherDelay(data).first()[0]
print(f"longest weather delay between jan and mar is {delay}" )+---+
|_c0|
+---+
| 40|
+---+
longest weather delay between jan and mar is 40
'''longestWeatherDelay tests'''
data = loadDataAndRegister(testFile)
test = longestWeatherDelay(data).first()[0]
assert test == 7, "the longest weather delay was expected to be 7 but it was %s" % test
0.5 No Flights
NoFlight_df is a dataframe that contains the name of airlines that did not fly.
carriers_df has relation (Description,Code)
airtraffic_df has Code in it’s schema for all planes that flew
Goal. Get subsets of carriers_df that did not fly
every element of airtraffic_df flew
carriers_df - (carriers_df intersect airtraffic_df) = planes that did not fly
def didNotFly(df):
#a = spark.sql("SELECT DESCRIPTION from AirTraffic INNER JOIN carriers on carriers.Code = AirTraffic.UniqueCarrier;")
a = spark.sql("SELECT Description FROM carriers WHERE Code NOT IN (SELECT Code from airtraffic INNER JOIN carriers on carriers.Code = airtraffic.UniqueCarrier);")
#a = spark.sql("Select Code FROM carrierse")
return adata = loadBind(testFile,airTraffic)
didNotFly(data).show(5)+--------------------+
| Description|
+--------------------+
| Tradewind Aviation|
| Comlux Aviation|
|Master Top Linhas...|
| Flair Airlines Ltd.|
| Swift Air|
+--------------------+
only showing top 5 rows
'''didNotFly tests'''
data = loadDataAndRegister(testFile)
test = didNotFly(data).count()
assert test == 1489, "the amount of airlines that didn't fly was expected to be 1489 but it was %s" % test0.6 Flights Vegas to JFK
Return description and number of flights from Vegas to JFK
Countflight_df Schema (Description,Count)
carriers_df Schema (Code)
| Description | Num |
|---|---|
| JetBlue Airways | 566 |
| Delta Air Lines Inc. | 441 |
| US Airways Inc. (… | 344 |
| American Airlines… | 121 |
def flightsFromVegasToJFK(df):
fdata = CountFlight(df)
#loadDataAndRegister("flightCount")
fdata.createOrReplaceTempView("flightCount")
output = spark.sql("""
SELECT Description, count as Num FROM
flightCount
INNER JOIN
(SELECT * FROM
carriers
INNER JOIN
(SELECT DISTINCT UniqueCarrier,TailNum FROM airtraffic WHERE Origin = 'LAS' AND Dest = 'JFK') as T1
ON
carriers.Code = T1.UniqueCarrier) as T2
ON flightCount.TailNum = T2.TailNum
ORDER BY Num DESC
""")
# YOUR CODE HERE
return output# example print
data = loadBind(testFile,airTraffic)
flightsFromVegasToJFK(data).show(5)+--------------------+---+
| Description|Num|
+--------------------+---+
| Titan Airways| 1|
|Atlantic Southeas...| 1|
+--------------------+---+
'''flightsFromVegasToJFK tests'''
data = loadDataAndRegister(testFile)
correct = [Row(Description='Titan Airways', Num=1),
Row(Description='Atlantic Southeast Airlines', Num=1)]
correctRows(flightsFromVegasToJFK(data).collect(), correct)0.7 Taxi Time
TaxiTime : Returns dataframe of average time in taxi at each airport
Airtraffic relevant Schema(Origin, Dest, TaxiIn, TaxiOut)
Origin = Airport of origin
Dest = Airport of Dest
TaxiIn = Time spent in taxi at Origin
TaxiOut = Time spent in taxi at Dest
| airport | taxi |
|---|---|
| DLG | 4.0 |
| BRW | 5.051010191310567 |
| OME | 6.034800675790983 |
| AKN | 6.75 |
| SCC | 6.842553191489362 |
def TaxiTime(df):
output = spark.sql("""
SELECT T1.Origin as airport,(A1+A2)/2 as taxi FROM
(SELECT Origin,AVG(TaxiIn) AS A1 FROM AirTraffic GROUP BY Origin) AS T1
INNER JOIN
(SELECT Dest,AVG(TaxiOut) AS A2 FROM AirTraffic GROUP BY Dest) AS T2
ON
T1.Origin = T2.Dest
ORDER BY
taxi ASC
""")
return outputdata = loadBind(testFile,airTraffic)
TaxiTime(data).show(5)+-------+-----+
|airport| taxi|
+-------+-----+
| LAS| 11.0|
| JFK|13.25|
+-------+-----+
'''timeSpentTaxiing tests'''
data = loadDataAndRegister(testFile)
correct = [Row(airport='LAS', taxi=11.0), Row(airport='JFK', taxi=13.25)]
correctRows(TaxiTime(data).collect(), correct)0.8 Median Distance
distanceMedian returns a DataFrame containing the median travel distance.
| _ c0 |
|---|
| 583.0 |
def distanceMedian(df):
a = spark.sql("""
SELECT percentile(T1,0.5)as `_ c0` FROM
(SELECT Distance FROM AirTraffic) AS Tab(T1)
""")
# YOUR CODE HERE
return adata = loadBind(testFile,airTraffic)
distanceMedian(data).show()+-----+
| _ c0|
+-----+
|357.0|
+-----+
'''distanceMedian tests'''
data = loadDataAndRegister(testFile)
test = distanceMedian(data).first()[0]
assert test == 357.0, "the distance median was expected to be 357.0 but it was %s" % test0.9 95th Percentile
DataFrame containing the 95th percentile of carrier delay.
Example output:
| _ c0 |
|---|
| 77.0 |
def score95(df):
a = spark.sql("""
SELECT percentile(T1,0.95)as `_ c0` FROM
(SELECT CarrierDelay FROM AirTraffic) AS Tab(T1)
""")
# YOUR CODE HERE
return adata = loadBind(testFile,airTraffic)
score95(data).show()+----+
|_ c0|
+----+
|17.0|
+----+
'''score95 tests'''
data = loadDataAndRegister(testFile)
test = score95(data).first()[0]
assert test == 17.0, "the score95 was expected to be 17.0 but it was %s" % test0.10 Cancelled Flights
cancelledFlights finds airports where flights were cancelled.
return: DataFrame containing columns “airport”, “city” and “percentage”.
airports relevant schema (airport,city)
percentage = (# cancelled flight/total flight)
Example output:
| airport | city | percentage |
|---|---|---|
| Pellston Regional… | Pellston | 0.3157894736842105 |
| Waterloo Municipal | Waterloo | 0.25 |
| Telluride Regional | Telluride | 0.21084337349397592 |
| Houghton County M… | Hancock | 0.19834710743801653 |
| Rhinelander-Oneid… | Rhinelander | 0.15625 |
def cancelledFlights(df):
a = spark.sql("""
SELECT airports.airport, airports.city, T1.percentage
FROM
(SELECT Origin,SUM(Cancelled)/COUNT(Origin) as percentage FROM AirTraffic
GROUP BY Origin) AS T1
INNER JOIN
airports
ON
T1.Origin = airports.iata
ORDER BY
T1.percentage DESC,
airports.airport DESC
""")
# YOUR CODE HERE
return adata = loadBind(testFile,airTraffic)
cancelledFlights(data).show(5)+--------------------+---------+----------+
| airport| city|percentage|
+--------------------+---------+----------+
|McCarran Internat...|Las Vegas| 0.5|
|Roanoke Regional/...| Roanoke| 0.25|
| John F Kennedy Intl| New York| 0.0|
+--------------------+---------+----------+
'''cancelledFlights tests'''
data = loadDataAndRegister(testFile)
correct = [Row(airport='McCarran International', city='Las Vegas', percentage=0.5),
Row(airport='Roanoke Regional/ Woodrum ', city='Roanoke', percentage=0.25)]
correctRows(cancelledFlights(data).collect(), correct)0.11 Least Squares
leastSquares calculates the linear least squares approximation for relationship between DepDelay and WeatherDelay.
y = Bx + c where B is the slope
WeatherDelay = B*(DepDelay) + c
returns (y-intercept,B)
filter out DepDelay < 0. If multiple WeatherDelay then average them.
def leastSquares(df):
try:
a = spark.sql("""
SELECT DD, WD, DD*WD as Prod, DD*DD as Sqx
FROM
(SELECT DepDelay as DD,AVG(WeatherDelay) as WD FROM AirTraffic
WHERE DepDelay > 0
GROUP BY DepDelay) AS T1
""")
# YOUR CODE HERE
xsum = a.groupBy().sum('DD').collect()[0][0]
ysum = a.groupBy().sum('WD').collect()[0][0]
psum = a.groupBy().sum('Prod').collect()[0][0]
sqxsum = a.groupBy().sum('Sqx').collect()[0][0]
#print(xsum)
N = a.count()
varM = ((N * psum)-(xsum*ysum))/((N * sqxsum)-(xsum**2))
varB = ((ysum*sqxsum)-(xsum*psum))/((N*sqxsum)-(xsum**2))
return varB,varM
except error:
return 0.0,0.0data = loadBind(testFile,airTraffic)
leastSquares(data)
#leastSquares(data)[0].show(4)
#print(leastSquares(data)[1])
#print(leastSquares(data)[2])(952.0, -56.0)
'''leastSquaresTests'''
data = loadDataAndRegister(testFile)
test = leastSquares(data)
assert test == (952.0, -56.0), "the answer was expected to be (952.0, -56.0) but it was %s" % testspark.stop()