PySpark RDD
Jupyter notebook:
0.1 RDD
multiple small methods that process and analyze country, city and location data.
Sample data of “allCountries.txt” data from http://download.geonames.org/export/dump/allCountries.zip. You can test your functions in the cell below them. The variable testFile
contains the data.
Read https://spark.apache.org/docs/latest/rdd-programming-guide.html for a guide
0.1.1 Data schema
Name | Description |
---|---|
geonameid | integer id of record in geonames database |
name | name of geographical point (utf8) varchar(200) |
asciiname | name of geographical point in plain ascii characters, varchar(200) |
alternatenames | alternatenames, comma separated, ascii names automatically transliterated, convenience attribute from alternatename table, varchar(10000) |
latitude | latitude in decimal degrees (wgs84) |
longitude | longitude in decimal degrees (wgs84) |
feature class | see http://www.geonames.org/export/codes.html, char(1) |
feature code | see http://www.geonames.org/export/codes.html, varchar(10) |
country code | ISO-3166 2-letter country code, 2 characters |
cc2 | alternate country codes, comma separated, ISO-3166 2-letter country code, 200 characters |
admin1 code | fipscode (subject to change to iso code), see exceptions below, see file admin1Codes.txt for display names of this code; varchar(20) |
admin2 code | code for the second administrative division, a county in the US, see file admin2Codes.txt; varchar(80) |
admin3 code | code for third level administrative division, varchar(20) |
admin4 code | code for fourth level administrative division, varchar(20) |
population | bigint (8 byte int) |
elevation | in meters, integer |
dem | digital elevation model, srtm3 or gtopo30, average elevation of 3’‘x3’’ (ca 90mx90m) or 30’‘x30’’ (ca 900mx900m) area in meters, integer. srtm processed by cgiar/ciat. |
timezone | the iana timezone id (see file timeZone.txt) varchar(40) |
modification date | date of last modification in yyyy-MM-dd format |
from pyspark import SparkContext, SparkConf
= SparkContext("local","GeoProcessor")
sc = sc.textFile("allCountries_sample.txt") testFile
0.2 Extract Data
extractData
removes unnecessary fields and splits the data so that the RDD looks like RDD(Array(“name”,“countryCode”,“dem”),…)).
Fields to include:
* name
* counryCode
* dem (digital elevation model)
param data
: data set loaded into spark as RDD[String]
return
: RDD containing filtered location data. There should be an Array for each location.
Hint: you can first split each line into an array. Columns are separated by tab (“) character. Finally you should take the appropriate fields. The fields will be numbered by the location they are ordered in the original data scheme. Despite the method’s name, you might only need the map
function.
def extractData(data):
# YOUR CODE HERE
= data.map(lambda x: x.split("\t"))
spitline = spitline.map(lambda x: [x[1],x[8],x[16]])
select return select
#raise NotImplementedError()
#Example print
5) extractData(testFile).take(
[['Tosa de la Llosada', 'AD', '2475'],
['Riu de la Llosada', 'AD', '1900'],
['Obaga de la Llosada', 'AD', '2300'],
['Emprius de la Llosada', 'AD', '2299'],
['Basers de la Llosada', 'AD', '2321']]
'''extractData tests'''
= extractData(testFile)
filtered = filtered.collect()[1]
testObject assert testObject[0] == "Riu de la Llosada", "the name value of the object was expected to be 'Riu de la Llosada' but it was %s" % testObject[0]
assert testObject[1] == "AD", "the country code value of the object was expected to be 'AD' but it was %s" % testObject[1]
assert testObject[2] == "1900", "the dem value of the object was expected to be 1900 but it was %s" % testObject[2]
assert len(testObject) == 3, "the length of the array was expected to be 3 but it was %s" % len(testObject)
assert type(testObject) is list, "the type of the RDD element was expected to be list but it was %s" % type(testObject)
0.3 Filter Elevation
filterElevation
is used to filter to given countryCode and return RDD containing only dem information. You will have to convert the dem information to int
values.
param countryCode
: country code e.g “AD”
param data
: an RDD containing multiple Array[“name”, “countryCode”, “dem”] (as in it was returned by the extractData
function)
return
: RDD[int] containing only dem information
def filterElevation(countryCode, data):
= data.filter(lambda x: x[1]==countryCode)
filt = filt.map(lambda x: int(x[2]))
convert
return convert
#raise NotImplementedError()
#Example print
"AD", extractData(testFile)).take(5)
filterElevation(#filterElevation("AD", extractData(testFile)).take(5)
[2475, 1900, 2300, 2299, 2321]
'''filterElevation tests'''
= extractData(testFile)
filtered = filterElevation("SE", filtered).first()
first assert type(first) is int, "the type of the RDD element was expected to be int but it was %s" % type(first)
assert first == 56, "the value of the RDD element was expected to be 56 but it was %s" % first
object = filterElevation("AD", filtered).collect()[4]
assert object == 2321, "the value of the RDD element was expected to be 2321 but it was %s" % object
0.4 Elevation Average
elevationAverage
calculates the dem average to specific dataset.
param data
: RDD[int] containing only dem information
return
: The average elevation
def elevationAverage(data):
= data.reduce(lambda a,b: a + b)
summ = data.count()
countr = summ/countr
avg # YOUR CODE HERE
return avg
#raise NotImplementedError()
#Example print
"AD", extractData(testFile)).take(5))) elevationAverage(sc.parallelize(filterElevation(
2259.0
'''elevationAverage tests'''
= elevationAverage(sc.parallelize([1, 2, 3 ,4]))
avg assert avg == 2.5, "the average was expected to be 2.5 but it was %s" % avg
= extractData(testFile)
filtered = filterElevation("AD", filtered)
elevations = elevationAverage(elevations)
avg2 assert avg2 == 1792.25, "the average was expected to be 1792.25 but it was %s" % avg2
assert type(avg2) is float, "the type of the RDD element was expected to be float but it was %s" % type(avg2)
0.5 Most Common Words
mostCommonWords
calculates what is the most common word in place names and returns an RDD[(String,Int)]. You can assume that words are separated by a single space ’ ’.
param data
: an RDD containing multiple Array[“name”, “countryCode”, “dem”].
return
: RDD[(String,Int)] where string is the word and Int number of occurances. RDD should be in descending order (sorted by number of occurances). e.g (“hotel”, 234), (“airport”, 120), (“new”, 12).
Example:
Assume that the place name is “Andorra la Vella Heliport”. We split the name so that we have 4 seperate words “Andorra”, “la”, “Vella” and “Heliport”.
def mostCommonWords(data):
= data.map(lambda x:x[0].split(" "))
wordlist #countwrd = wordlist.reduce(lambda acc, n: )
= wordlist.map(lambda x: [(i,1) for i in x] )
pairlist = pairlist.reduce(lambda a,b: a + b)
summ = sc.parallelize(summ)
inter = inter.reduceByKey(lambda a,b: a + b)
tot = tot.map(lambda x: (x[1],x[0]))
rev = rev.sortByKey(False)
final = final.map(lambda x: (x[1],x[0]))
rev2 return rev2
#Example print
5)
mostCommonWords(extractData(testFile)).take(#mostCommonWords(extractData(testFile))[:10]
#mostCommonWords(extractData(testFile))
[('Hotel', 22), ('de', 15), ('la', 12), ('Hotell', 7), ('dels', 6)]
'''mostCommonWords tests'''
= extractData(testFile)
filtered = mostCommonWords(filtered).collect()
words = words[0]
first = words[1]
second = words[2]
third assert type(first[0]) is str, "the type of the first value in array was expected to be str but it was %s" % type(first[0])
assert type(first[1]) is int, "the type of the second value in array was expected to be int but it was %s" % type(first[1])
assert first[1] >= second[1], "the first element in RDD was expected to have more occurances than the second"
assert first[0] == "Hotel", "the first element was expected to be named Hotel but it was %s" % first[0]
assert first[1] == 22, "the count of the first element was expected to be 22 but it was %s" % first[1]
assert third[0] == "la", "the third element was expected to be named 'la' but it was %s" % third[0]
0.6 Most Common Country
mostCommonCountry
tells which country has the most entries in geolocation data. The correct name for specific countrycode can be found from countrycodes.csv. The columns in countrycodes.csv are seperated by “,”. More specifially, the file is structured like this:
Fiji,FJ
Finland,FI
France,FR
param data
: an RDD containing multiple Array[“name”, “countryCode”, “dem”].
param codeData
: data from countrycodes.csv file
return
: most common country as String e.g Finland or empty string “” if countrycodes.csv doesn’t have that entry.
= sc.textFile("countrycodes.csv")
countryCodes
def mostCommonCountry(data, codeData):
= data.map(lambda x: (x[1],1))
cunt = cunt.reduceByKey(lambda a,b: a + b)
summ = codeData.map(lambda x: x.split(","))
iterm = iterm.map(lambda x: (x[1],x[0]))
revind = revind.join(summ).collect()
out = sc.parallelize(out)
a1 = a1.map(lambda x: (x[1][1],x[1][0]))
a2 if a2.isEmpty():
return ""
= a2.sortByKey(False)
a3
return a3.first()[1]
# YOUR CODE HERE
#raise NotImplementedError()
#Example print
mostCommonCountry(extractData(testFile), countryCodes)#mostCommonCountry(extractData(testFile), countryCodes)[:5]
#mostCommonCountry(extractData(testFile), countryCodes)[0].take(5)
#mostCommonCountry(extractData(testFile), countryCodes)[1].take(5)
'Sweden'
'''mostCommonCountry tests'''
= extractData(testFile)
filtered = mostCommonCountry(filtered, countryCodes)
mostCommon assert type(mostCommon) is str, "the type of the returned object was expected to be str but it was %s" % type(mostCommon)
assert mostCommon == "Sweden", "the most common was expected to be Sweden but it was %s" % mostCommon
= mostCommonCountry(sc.parallelize(filtered.take(40)), countryCodes)
mostCommon2 assert mostCommon2 == "Andorra", "the most common was expected to be Andorra but it was %s" % mostCommon2
= sc.parallelize([["a", "AA", 123], ["b", "AA", 1234]])
false assert mostCommonCountry(false, countryCodes) == "", "The method was expected to return empty when called with false data"
0.7 Hotels In Area
hotelsInArea
determines how many hotels there are within 10 km (<=10000.0) from given latitude and longitude.
Use Haversine formula ( https://en.wikipedia.org/wiki/Haversine_formula ). Earth radius is 6371000 meters.
Don’t use the feature code field. You should start by reading the data and getting the correct fields (name, latitude, longitude) similarly to the extractData
function. After that you should use the Haversine formula to filter the places in 10 Km radius from the latitude and longitude. You will probably want to use a helper function, Python lets you create functions inside functions. Finally, you will want to filter the places that contain the word “hotel”. Location is a hotel if the name contains the word “hotel” (can be “Hotel” or “hOtel” for instance). There can exist multiple hotels in the same location.
Note that both latitude and longitude in the data are in decimal degree so you have to change them to radians ( https://en.wikipedia.org/wiki/Decimal_degrees ). They should also be converted to double values. E.g math.radians(float(x))
param lat
: latitude as Double
param long
: longitude as Double
param data
: the original data set loaded into spark as RDD[String].
return
: number of hotels in area
import math
= sc.textFile("testdata.txt")
testFile
def hotelsInArea(lat, long, data):
# YOUR CODE HERE
def haversine(lat1,lon1,lat2,lon2):
= math.radians(float(lat1))
rlat1 = math.radians(float(lat2))
rlat2 = math.radians(float(lon1))
rlon1 = math.radians(float(lon2))
rlon2 = 6371000
radius = (rlat2 - rlat1)/2
vara1 = (rlon2 - rlon1)/2
varb1 = math.cos(rlat1) #
varc1 = math.cos(rlat2) #
vard1 = math.sin(vara1)
vara2 = vara2**2
vara3 = math.sin(varb1)
varb2 = varb2**2
varb3 = (vara3 + (varc1*vard1*varb3))**(1/2)
pre1 = 2 * radius *math.asin(pre1)
pre2 return pre2
= 10000
boundary = lambda a,b: haversine(lat,long,a,b)
distfun = data.map(lambda x: x.split("\t"))
formatt = formatt.map(lambda x: [x[2],float(x[4]),float(x[5])])
formatt2 = formatt2.filter(lambda x: distfun(x[1],x[2]) <= boundary and "hotel" in x[0].lower())
f3 = f3.map(lambda x: ((x[1],x[2]),1))
f4 = f4.reduceByKey(lambda a,b: a + b)
f5 return f5.count()
#data.map(lambda x: x.split())
#raise NotImplementedError()
#hotelsInArea(59.334591, 18.063240,testFile).take(40)
#Example print
48, 18.063240, testFile) hotelsInArea(
0
'''hotelsInArea tests'''
= hotelsInArea(42.5423, 1.5977, testFile)
a1 = hotelsInArea(59.334591, 18.063240, testFile)
a2 = hotelsInArea(63.8532, 15.5652, testFile)
a3 assert a1 == 0, "the number of hotels was expected to be 0 but it was %s" % a1
assert a2 == 3, "the number of hotels was expected to be 3 but it was %s" % a2
assert a3 == 1, "the number of hotels was expected to be 1 but it was %s" % a3
sc.stop()