PySpark Structured Streaming
= "UserJY" NAME
Jupyter notebook:
0.1 Structured Streaming
structured streaming to analyze made-up trail camera data. We will simulate real-time streaming by having multiple data files and loading them one by one.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
from pyspark.sql import SparkSession, Row
import pyspark.sql
import json
from pyspark.sql.types import *
import time
from IPython.display import display, clear_output
import pandas as pd
= SparkSession \
spark \
.builder "StructuredStreaming") \
.appName(
.getOrCreate()
= "data/" path
0.2 Load data
First we’ll start with normal dataframe exercises. Create a method that loads the trail camera data into a dataframe. The data is in JSON format. You might have to specify the schema with the StructType methods. The dataframe will have null values called ‘null’, you can either remove them or leave them be. This dataframe simulates the input dataframe that we will use for streaming.
param path
: path to the JSON dataset.
return
: dataframe containing trail camera information.
schema:
Name | Type |
---|---|
time | Timestamp (nullable = true) |
animal_name | String (nullable = true) |
weather | String (nullable = true) |
battery | Double (nullable = true) |
# Create DataFrame representing data in the JSON files
def loadData(path):
#path = "testdata/"
= (StructType()
jsonSchema "time",TimestampType())
.add("animal_name",StringType())
.add("weather",StringType())
.add("battery",DoubleType())
.add(
)= spark.read.json(path, schema = jsonSchema).dropna("any")
outdf
return outdf
#example print
loadData(path).show()
+-------------------+-----------+-------+-------+
| time|animal_name|weather|battery|
+-------------------+-----------+-------+-------+
|2020-04-18 21:50:40| Deer| Clear| 7.0|
|2020-03-10 11:23:34| Squirrel| Clear| 86.0|
|2019-11-09 20:36:04| Squirrel| Clear| 55.0|
|2019-10-31 07:22:20| Squirrel| Cloudy| 12.0|
|2020-05-04 10:59:35| Squirrel| Clear| 64.0|
|2020-01-30 14:21:35| Squirrel| Clear| 34.0|
|2019-10-29 19:20:05| Rabbit| Rainy| 6.0|
|2020-01-30 11:34:51| Squirrel| Rainy| 96.0|
|2020-07-17 19:32:23| Deer| Cloudy| 17.0|
|2020-05-27 09:42:41| Squirrel| Rainy| 5.0|
|2020-05-28 09:00:05| Bear| Storm| 53.0|
|2020-08-07 11:02:38| Deer| Cloudy| 14.0|
|2019-06-22 16:02:52| Rabbit| Clear| 13.0|
|2019-09-23 11:56:31| Squirrel| Clear| 95.0|
|2020-05-31 07:27:03| Squirrel| Clear| 100.0|
|2019-11-29 19:11:53| Rabbit| Clear| 71.0|
|2020-03-23 20:06:24| Rabbit| Clear| 19.0|
|2019-07-12 10:45:52| Squirrel| Rainy| 15.0|
|2020-05-30 13:07:53| Rabbit| Rainy| 84.0|
|2019-06-09 04:34:10| Rabbit| Clear| 95.0|
+-------------------+-----------+-------+-------+
only showing top 20 rows
'''loadData tests'''
= StructType([ StructField("time", TimestampType(), True),
cols "animal_name", StringType(), True),
StructField("weather", StringType(), True),
StructField("battery", DoubleType(), True)])
StructField(
from datetime import datetime
= datetime(2020, 1, 1)
testTs
= [(testTs, "dog", "cloudy", 100.0)]
fakeData
= spark.createDataFrame(fakeData, cols)
fakeDf
= loadData(path)
df
assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)
0.3 Animal count
Next we will simulate the output dataframe that we will use for streaming. Create a method that counts the number of appearences for each animal. The dataframe should be sorted by count in descending order. You should remove the null rows now if you didn’t do it in the last method.
param df
: trail camera dataframe created using loadData
.
return
: dataframe containing number of appearences per animal. The dataframe should include columns “animal_name” and “count”. “count” should be in Long format, it should happen automatically with spark functions. The dataframe must not include count for null values.
example output:
animal_name | count |
---|---|
Dog | 1234 |
Cat | 1111 |
Mouse | 999 |
def animalCount(df):
"ntable")
df.createOrReplaceTempView(= spark.sql("""
newdf SELECT animal_name, COUNT(animal_name) AS count FROM ntable
GROUP BY animal_name
ORDER BY count DESC
""")
return newdf
# YOUR CODE HERE
#raise NotImplementedError()
#example print
animalCount(loadData(path)).show()
+-----------+-----+
|animal_name|count|
+-----------+-----+
| Rabbit| 1187|
| Squirrel| 1147|
| Deer| 351|
| Bear| 119|
| Wolf| 111|
+-----------+-----+
'''animalCount tests'''
= StructType([ StructField("animal_name", StringType(), True),
cols "count", LongType(), False)])
StructField(
= [("dog", 1)]
fakeData
= spark.createDataFrame(fakeData, cols)
fakeDf
= animalCount(loadData(path))
df
assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)
assert df.count() == 5, "the number of rows was expected to be 5 but it was %s" % df.count()
= df.toPandas()
df
assert df.loc[0][1] >= df.loc[1][1], "the first item was expected to have higher count than the second"
assert df.loc[3][1] >= df.loc[4][1], "the fourth item was expected to have higher count than the last"
assert df.loc[0][0] == "Rabbit", "the first item was expected to be Rabbit but it was %s" % df.loc[0][0]
assert df.loc[4][0] == "Wolf", "the last item was expected to be Wolf but it was %s" % df.loc[4][0]
0.4 inputDf
Now we will finally do the streaming. First you should specify the schema for the input dataframe. The schema is the same as in the Load Data exercise. Then you should create the input dataframe with spark.readStream
method. Remember to include the schema and the path. You will also have to include .option("maxFilesPerTrigger", 1)
so that we can simulate real-time streaming by loading one file at a time.
param path
: path to the JSON dataset.
return
: input dataframe containing trail camera information.
def inputDf(path):
= (StructType()
jsonSchema "time",TimestampType())
.add("animal_name",StringType())
.add("weather",StringType())
.add("battery",DoubleType())
.add(
)= (spark
inputDF
.readStream
.schema(jsonSchema)"maxFilesPerTrigger",1)
.option(
.json(path)"any")
.dropna(
)# YOUR CODE HERE
return inputDF
0.5 outputDf
Next you should create the output dataframe, similar to the Animal Count exercise. You will have to exclude the null values and sort the dataframe by count, descending order.
param inputDF
: input dataframe created by inputDf()
.
return
: filtered and sorted dataframe containing the number of appearences per animal.
def outputDf(inputDF):
= inputDF.createOrReplaceTempView("streamTable")
filterDF = spark.sql("""
newdf SELECT animal_name, COUNT(animal_name) AS count
FROM streamTable
GROUP BY animal_name
ORDER BY count DESC
""")
return newdf
# YOUR CODE HERE
#raise NotImplementedError()
0.6 createQuery
Finally, you should start streaming the output dataframe with the writeStream
method. You will have to include the options format
=“memory”, queryName
=“counts” and outputMode
=“complete”.
param outputDF
: output dataframe created by outputDf()
.
return
: a query on the output dataframe
def createQuery(outputDF):
= (outputDF
query
.writeStream format("memory")
."complete")
.outputMode("counts")
.queryName(
.start()
)# YOUR CODE HERE
0.5)
time.sleep(return query
'''streaming tests'''
= inputDf(path)
inputStreamDf = outputDf(inputStreamDf)
outputStreamDf= createQuery(outputStreamDf)
query
assert outputStreamDf.isStreaming, "the outputDF was expected to be streaming"
= spark.sql("select * from counts")
df
assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)
= {'message': 'Processing new data',
status 'isDataAvailable': True,
'isTriggerActive': True}
assert query.status == status, "the status was expected to be %s but it was %s" % (status, query.status)
assert df.count() == 0, "the number of rows was expected to be 0 when the streaming just started but it was %s" % df.count()
20)
time.sleep(assert df.count() == 5, "the number of rows was expected to be 5 when the streaming has been going for a while but it was %s" % df.count()
= df.toPandas()
df
assert df.loc[0][1] >= df.loc[1][1], "the first item was expected to have higher count than the second"
assert df.loc[3][1] >= df.loc[4][1], "the fourth item was expected to have higher count than the last"
assert df.loc[0][0] == "Rabbit" or df.loc[0][0] == "Squirrel", "the first item was expected to be Rabbit or Squirrel but it was %s" % df.loc[0][0]
assert df.loc[4][0] == "Wolf" or df.loc[4][0] == "Bear", "the last item was expected to be Wolf or Bear but it was %s" % df.loc[4][0]
# stream printing in real-time
# you can increase the total streaming simulation duration by increasing "nIter"
# However, when finally submitting the assignment make sure to change "nIter" back to a smaller value.
= 5
nIter for i in range(nIter):
=True)
clear_output(wait
display(query.status)
display()3) time.sleep(
{'message': 'Processing new data',
'isDataAvailable': True,
'isTriggerActive': True}
spark.stop()