PySpark Structured Streaming

Posted on June 2, 2019
Tags: machinelearning
NAME = "UserJY"

Jupyter notebook:

0.1 Structured Streaming

structured streaming to analyze made-up trail camera data. We will simulate real-time streaming by having multiple data files and loading them one by one.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

from pyspark.sql import SparkSession, Row
import pyspark.sql
import json
from pyspark.sql.types import *
import time
from IPython.display import display, clear_output
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .getOrCreate()

path = "data/"

0.2 Load data

First we’ll start with normal dataframe exercises. Create a method that loads the trail camera data into a dataframe. The data is in JSON format. You might have to specify the schema with the StructType methods. The dataframe will have null values called ‘null’, you can either remove them or leave them be. This dataframe simulates the input dataframe that we will use for streaming.

param path: path to the JSON dataset.

return: dataframe containing trail camera information.

schema:

Name Type
time Timestamp (nullable = true)
animal_name String (nullable = true)
weather String (nullable = true)
battery Double (nullable = true)
# Create DataFrame representing data in the JSON files
def loadData(path):
    #path = "testdata/"
    jsonSchema = (StructType()
                 .add("time",TimestampType())
                 .add("animal_name",StringType())
                 .add("weather",StringType())
                 .add("battery",DoubleType())
                 )
    outdf = spark.read.json(path, schema = jsonSchema).dropna("any")
   
    return outdf
#example print
loadData(path).show()
+-------------------+-----------+-------+-------+
|               time|animal_name|weather|battery|
+-------------------+-----------+-------+-------+
|2020-04-18 21:50:40|       Deer|  Clear|    7.0|
|2020-03-10 11:23:34|   Squirrel|  Clear|   86.0|
|2019-11-09 20:36:04|   Squirrel|  Clear|   55.0|
|2019-10-31 07:22:20|   Squirrel| Cloudy|   12.0|
|2020-05-04 10:59:35|   Squirrel|  Clear|   64.0|
|2020-01-30 14:21:35|   Squirrel|  Clear|   34.0|
|2019-10-29 19:20:05|     Rabbit|  Rainy|    6.0|
|2020-01-30 11:34:51|   Squirrel|  Rainy|   96.0|
|2020-07-17 19:32:23|       Deer| Cloudy|   17.0|
|2020-05-27 09:42:41|   Squirrel|  Rainy|    5.0|
|2020-05-28 09:00:05|       Bear|  Storm|   53.0|
|2020-08-07 11:02:38|       Deer| Cloudy|   14.0|
|2019-06-22 16:02:52|     Rabbit|  Clear|   13.0|
|2019-09-23 11:56:31|   Squirrel|  Clear|   95.0|
|2020-05-31 07:27:03|   Squirrel|  Clear|  100.0|
|2019-11-29 19:11:53|     Rabbit|  Clear|   71.0|
|2020-03-23 20:06:24|     Rabbit|  Clear|   19.0|
|2019-07-12 10:45:52|   Squirrel|  Rainy|   15.0|
|2020-05-30 13:07:53|     Rabbit|  Rainy|   84.0|
|2019-06-09 04:34:10|     Rabbit|  Clear|   95.0|
+-------------------+-----------+-------+-------+
only showing top 20 rows
'''loadData tests'''

cols = StructType([ StructField("time", TimestampType(), True),
                    StructField("animal_name", StringType(), True),
                    StructField("weather", StringType(), True),
                    StructField("battery", DoubleType(), True)])


from datetime import datetime
testTs = datetime(2020, 1, 1)

fakeData = [(testTs, "dog", "cloudy", 100.0)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = loadData(path)

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

0.3 Animal count

Next we will simulate the output dataframe that we will use for streaming. Create a method that counts the number of appearences for each animal. The dataframe should be sorted by count in descending order. You should remove the null rows now if you didn’t do it in the last method.

param df: trail camera dataframe created using loadData.

return: dataframe containing number of appearences per animal. The dataframe should include columns “animal_name” and “count”. “count” should be in Long format, it should happen automatically with spark functions. The dataframe must not include count for null values.

example output:

animal_name count
Dog 1234
Cat 1111
Mouse 999
def animalCount(df):
    df.createOrReplaceTempView("ntable")
    newdf = spark.sql("""
        SELECT animal_name, COUNT(animal_name) AS count FROM ntable
        GROUP BY animal_name
        ORDER BY count DESC
    """)
    return newdf
    # YOUR CODE HERE
    #raise NotImplementedError()
#example print
animalCount(loadData(path)).show()
+-----------+-----+
|animal_name|count|
+-----------+-----+
|     Rabbit| 1187|
|   Squirrel| 1147|
|       Deer|  351|
|       Bear|  119|
|       Wolf|  111|
+-----------+-----+
'''animalCount tests'''

cols = StructType([ StructField("animal_name", StringType(), True),
                    StructField("count", LongType(), False)])


fakeData = [("dog", 1)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = animalCount(loadData(path))

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

assert df.count() == 5, "the number of rows was expected to be 5 but it was %s" % df.count()

df = df.toPandas()

assert df.loc[0][1] >= df.loc[1][1], "the first item was expected to have higher count than the second"
assert df.loc[3][1] >= df.loc[4][1], "the fourth item was expected to have higher count than the last"
assert df.loc[0][0] == "Rabbit", "the first item was expected to be Rabbit but it was %s" % df.loc[0][0]
assert df.loc[4][0] == "Wolf", "the last item was expected to be Wolf but it was %s" % df.loc[4][0]

0.4 inputDf

Now we will finally do the streaming. First you should specify the schema for the input dataframe. The schema is the same as in the Load Data exercise. Then you should create the input dataframe with spark.readStream method. Remember to include the schema and the path. You will also have to include .option("maxFilesPerTrigger", 1) so that we can simulate real-time streaming by loading one file at a time.

param path: path to the JSON dataset.

return: input dataframe containing trail camera information.

def inputDf(path):  
    jsonSchema = (StructType()
                 .add("time",TimestampType())
                 .add("animal_name",StringType())
                 .add("weather",StringType())
                 .add("battery",DoubleType())
                 )
    inputDF = (spark
               .readStream 
               .schema(jsonSchema)
               .option("maxFilesPerTrigger",1)
               .json(path)
               .dropna("any")
               
              )
    # YOUR CODE HERE
    return inputDF

0.5 outputDf

Next you should create the output dataframe, similar to the Animal Count exercise. You will have to exclude the null values and sort the dataframe by count, descending order.

param inputDF: input dataframe created by inputDf().

return: filtered and sorted dataframe containing the number of appearences per animal.

def outputDf(inputDF):
    filterDF = inputDF.createOrReplaceTempView("streamTable")
    newdf = spark.sql("""
        SELECT animal_name, COUNT(animal_name) AS count 
        FROM streamTable
        GROUP BY animal_name
        ORDER BY count DESC
    """)
    return newdf
    # YOUR CODE HERE
    #raise NotImplementedError()

0.6 createQuery

Finally, you should start streaming the output dataframe with the writeStream method. You will have to include the options format=“memory”, queryName=“counts” and outputMode=“complete”.

param outputDF: output dataframe created by outputDf().

return: a query on the output dataframe

def createQuery(outputDF):
    query = (outputDF
             .writeStream 
             .format("memory")
             .outputMode("complete")
             .queryName("counts")
             .start()
            )
    # YOUR CODE HERE
    time.sleep(0.5)
    return query
'''streaming tests'''
inputStreamDf = inputDf(path)
outputStreamDf= outputDf(inputStreamDf)
query = createQuery(outputStreamDf)

assert outputStreamDf.isStreaming, "the outputDF was expected to be streaming"

df = spark.sql("select * from counts")

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

status = {'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

assert query.status == status, "the status was expected to be %s but it was %s" % (status, query.status)

assert df.count() == 0, "the number of rows was expected to be 0 when the streaming just started but it was %s" % df.count()
time.sleep(20)
assert df.count() == 5, "the number of rows was expected to be 5 when the streaming has been going for a while but it was %s" % df.count()

df = df.toPandas()

assert df.loc[0][1] >= df.loc[1][1], "the first item was expected to have higher count than the second"
assert df.loc[3][1] >= df.loc[4][1], "the fourth item was expected to have higher count than the last"
assert df.loc[0][0] == "Rabbit" or df.loc[0][0] == "Squirrel", "the first item was expected to be Rabbit or Squirrel but it was %s" % df.loc[0][0]
assert df.loc[4][0] == "Wolf" or df.loc[4][0] == "Bear", "the last item was expected to be Wolf or Bear but it was %s" % df.loc[4][0]
# stream printing in real-time
# you can increase the total streaming simulation duration by increasing "nIter"
# However, when finally submitting the assignment make sure to change "nIter" back to a smaller value. 
nIter = 5
for i in range(nIter):
    clear_output(wait=True)
    display(query.status)
    display()
    time.sleep(3)
{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}
spark.stop()