PySpark Graph Frames

Posted on June 2, 2019
Tags: machinelearning

0.1 GraphFrame on Social Media

Analyze people’s social data using GraphFrames.

small sample data of “socialgraph.dat” from https://archive.org/download/201309_foursquare_dataset_umn/fsq.zip, inside the “umn_foursquare_datasets” folder.

The “socialgraph.dat” file contains the social graph edges (connections) that exist between users. Each social connection is relation represented by a tuple of two unique ids (first_user_id and second_user_id). The connnections are directed. Supposed we have data shown as:

first_user_id second_user_id
1 2
2 1

This data set shows that there is a connection from user1 (whose id is 1) to user2 (whose id is 2), and another connection from user2 to user1.

https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#module-graphframes.

from graphframes import GraphFrame
from pyspark.sql import SparkSession, Row
import re
from pyspark.sql.functions import *

spark = SparkSession.builder\
    .master('local[*]')\
    .appName('main')\
    .getOrCreate()

sampleFile = "socialgraph_sample.dat"

# Path of smaller data set
testFile = "socialgraph_testsample.dat"
---------------------------------------------------------------------------

ModuleNotFoundError                       Traceback (most recent call last)

<ipython-input-6-5e321d6a43ae> in <module>
----> 1 from graphframes import GraphFrame
      2 from pyspark.sql import SparkSession, Row
      3 import re
      4 from pyspark.sql.functions import *
      5 


ModuleNotFoundError: No module named 'graphframes'
# Variable and methods that will be used in more than one test

sc = spark.sparkContext
sampleDat = "socialgraph_randomsample.dat"

# Test if two arrays that contain Rows are equal
def equalArray(array1, array2):
    for i in range(0, len(array2)):
        assert array1[i].asDict() == array2[i].asDict(), "the row was expected to be %s but it was %s" % (array2[i].asDict(), array1[i].asDict())

# Test if two dataframes contain same rows
def equalDF(df1, df2, *columns):
    
    # sort dfs before converting them to lists
    array1 = df1.orderBy(list(columns)).collect()
    array2 = df2.orderBy(list(columns)).collect()
    equalArray(array1, array2)
---------------------------------------------------------------------------

NameError                                 Traceback (most recent call last)

<ipython-input-7-312e62ea857f> in <module>
      1 # Variable and methods that will be used in more than one test
      2 
----> 3 sc = spark.sparkContext
      4 sampleDat = "socialgraph_randomsample.dat"
      5 


NameError: name 'spark' is not defined

0.2 Create Graph

createGraph creates a GraphFrame. Use user_id as both vertex id and vertex attribute. Use number of unique connections between users as edge weight.

Example: Supposed we have data shown below:

first_user_id second_user_id
1 2
1 2
2 1
1 3
2 3

The graph should be:

param path: path of file whose data should be used to create the GraphFrame

return: GraphFrame

Load file into RDD then use RDD transformations to extract and parse data.
use Regex for fine grain selection.

def createGraph(path):
    path = testFile
    GraphRDD = sc.textFile(path)
    #a = a0.distinct()
    FilteredRDD = GraphRDD.filter(lambda x: len(re.findall(r" *([0-9]+) \| *([0-9]+)",x)) != 0)  #filter out data
    ProcessedRDD = FilteredRDD.map(lambda x: re.findall(r" *([0-9]+) \| *([0-9]+)",x) )  #process data to list
    PairsRDD = ProcessedRDD.map(lambda x: (x[0],1)) #convert list to pair
    PreVertex0 = PairsRDD.map(lambda x: [x[0][0],x[0][1]])
    PreVertex1 = PreVertex0.flatMap(lambda x: [x[0],x[1]])
    Vertex = list(set(PreVertex1.collect()))
    #PreVertex1 = PreVertex0.reduce(lambda x,y: [x].extend([y]))
    print(PreVertex0.take(5))
    print(PreVertex1.take(4))
    print(Vertex)
    d3 = d.reduceByKey(lambda x,y: x + y)
    #print(Vertex.take(15))
    Edge = d3.map(lambda x: (int(x[0][0]),int(x[0][1]),x[1]))
    print(Edge.take(15))
    Vertex2 = [(int(i),int(i)) for i in Vertex]
    GraphVertex = spark.createDataFrame(Vertex2, ["id", "name"])
    GraphEdge = spark.createDataFrame(Edge, ["src", "dst", "relationship"])
    Graph = GraphFrame(GraphVertex,GraphEdge)
    return Graph
# example print

graph = createGraph(sampleFile).persist()
graph.vertices.show()
graph.edges.show()
---------------------------------------------------------------------------

NameError                                 Traceback (most recent call last)

<ipython-input-9-4ec2ad282db8> in <module>
      1 # example print
      2 
----> 3 graph = createGraph(sampleFile).persist()
      4 graph.vertices.show()
      5 graph.edges.show()


NameError: name 'sampleFile' is not defined
'''createGraph tests'''

import random

correctVertices = sc.parallelize([Row(id=2, name=2),
                                  Row(id=10, name=10),
                                  Row(id=8, name=8),
                                  Row(id=3, name=3),
                                  Row(id=7, name=7),
                                  Row(id=4, name=4),
                                  Row(id=1, name=1),
                                  Row(id=9, name=9)]).toDF()

correctEdges = sc.parallelize([Row(src=2, dst=10, relationship=1),
                               Row(src=2, dst=8, relationship=1),
                               Row(src=3, dst=7, relationship=1),
                               Row(src=3, dst=10, relationship=1),
                               Row(src=2, dst=3, relationship=1),
                               Row(src=10, dst=4, relationship=1),
                               Row(src=4, dst=10, relationship=1),
                               Row(src=4, dst=2, relationship=1),
                               Row(src=1, dst=9, relationship=1),
                               Row(src=1, dst=10, relationship=2),
                               Row(src=7, dst=9, relationship=1),
                               Row(src=1, dst=3, relationship=1),
                               Row(src=10, dst=1, relationship=1)]).toDF()

testGraph = createGraph(testFile).persist()
testVertices = testGraph.vertices
testEdges = testGraph.edges

assert testVertices.count() == correctVertices.count(), "the vertices count was expected to be %s but it was %s" % (correctVertices.count(), testVertices.count())
assert testEdges.count() == correctEdges.count(), "the edges count was expected to be %s but it was %s" % (correctEdges.count(), testEdges.count())
equalDF(testGraph.vertices, correctVertices, "id")
equalDF(testGraph.edges, correctEdges, "src", "dst", "relationship")

0.3 Both Directions

bothDirections finds pairs of users who are connected by an edge in both directions.

param graph: GraphFrame containing social data (created by createGraph).

return: DataFrame which has columns “start”, “end” and “connections”, corresponding to the starting user id, ending user id and number of connections between two users.

Example: Supposed we have a graph as below: The result is

start end connections
10 4 1
4 10 1
1 10 2
10 1 1

find function from GraphFrames

def bothDirections(graph):
    #motifs = graph.find(r"(src)->[relationship]->(dst); (dst)->[relationship]->(src)")
    motifs = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
    motifs.createOrReplaceTempView("arrows")
    postprocess = spark.sql("""
        SELECT a FROM arrows
    """)
    motifs.show()
    output = motifs.select(motifs.e.src.alias("start"),motifs.e.dst.alias("end"),motifs.e.relationship.alias("connections"))
    #motifs.selectExpr(r"a[1]").show()
    #print(motifs.select("*").show())
    print(motifs.collect())
    print(motifs.a.id)
    #print(GraphFrame(postprocess))
  
    return output
# example print

bothDirections(graph).show()
'''bothDirections tests'''

correctEdges = sc.parallelize([Row(start=10, end=4, connections=1),
                               Row(start=4, end=10, connections=1),
                               Row(start=1, end=10, connections=2),
                               Row(start=10, end=1, connections=1)]).toDF()
equalDF(bothDirections(testGraph), correctEdges, "start", "end", "connections")

0.4 Most Active User

mostActiveUser finds which user has the most outward connections.

param graph: GraphFrame containing social data.

return: id of user who has the most outward connections. Return the smallest id if more than one users have the same number of outward connections.

def mostActiveUser(graph):
    print(graph.outDegrees.show())
    newDF = graph.outDegrees
    newDF.select(newDF.id).show()
    outDF = newDF.select(newDF.id).orderBy(newDF.outDegree.desc(),newDF.id.asc())
    outDF.show()

    return outDF.first().id
# example print

mostActiveUser(graph)

0.5 Connection Ratio

connectionRatio shows which user has the highest ratio of inward connections but fewest outward connections.

param graph: GraphFrame containing social data.

return DataFrame which has columns “id” and “connectionRatio”, where “id” is the id of a user and “connectionRatio” = number of inward connections/number of outward connections. Users without inward or outward connections are be filtered out.
The DataFrame is sorted by connectionRatio in descending order. If more than one users have the same connection ratio, these users are sorted by their id in ascending order.

example output:

id connectionRatio
10 2.0
3 1.0
7 1.0
4 0.5
1 0.3333333333333333
2 0.3333333333333333
def connectionRatio(graph):
    #graph.persist()
    #print(graph.show())
    OUTdf = graph.outDegrees
    FILOUT = OUTdf.filter(OUTdf.outDegree > 0)
    INdf = graph.inDegrees
    OUTdf.show()
    FILIN = INdf.filter(INdf.inDegree > 0)
    OUTdf.join(INdf,OUTdf.id == INdf.id,"INNER").show()
    JOINdf = OUTdf.join(INdf,OUTdf.id == INdf.id,"INNER").drop(OUTdf.id)
    print(JOINdf.collect())
    FILJOINdf = JOINdf.select(JOINdf.id,(JOINdf.inDegree/JOINdf.outDegree).alias("connectionRatio"))
    ORDERdf = FILJOINdf.orderBy(FILJOINdf.connectionRatio.desc(),FILJOINdf.id.asc())
    ORDERdf.show()
    #print(graph.edges.collect())
    #graph.filter("").show()

    # YOUR CODE HERE
    return ORDERdf
# example print

connectionRatio(graph).show()
'''connectionRatio tests'''
correct = [Row(id=10, connectionRatio=2.0),
           Row(id=3, connectionRatio=1.0),
           Row(id=7, connectionRatio=1.0),
           Row(id=4, connectionRatio=0.5),
           Row(id=1, connectionRatio=1/3),
           Row(id=2, connectionRatio=1/3)]

test = connectionRatio(testGraph)
equalArray(test.collect(), correct)

0.6 Communities

communities uses label propagation algorithm (LPA) to detect communities for a graph.

param graph: GraphFrame containing social data.

return: DataFrame containing columns “community” and “count”. “community” is the label assigned by LPA and “count” is the number of users who belong to the community. The Dataframe should be sorted by “count” in descending order. If more than one communities have same number of users, these communities should be sorted by label in ascending order.

Note: set 5 as the number of iterations to be performed when running LPA.

Example output:

community count
1 4
3 2
10 2
def communities(graph):
    graph = testGraph
    #print(graph.)
    result = graph.labelPropagation(maxIter=5)
    result.createOrReplaceTempView("ctable")
    newdf = spark.sql("""
        SELECT label AS community,COUNT(label) AS count FROM ctable 
        GROUP BY label
        ORDER BY count DESC, community ASC
    """)
    #print(newdf.take(10))
    #graph.edges.show()
    # YOUR CODE HERE
    return newdf
# example print

communities(graph).show()
'''communities tests'''
correct = [Row(community=2, count=4),
           Row(community=8, count=3),
           Row(community=10, count=1)]

equalArray(communities(testGraph).collect(), correct)

0.7 Highest Page Rank

highestPageRank finds which user has the highest PageRank.

param graph: GraphFrame containing social data.

return: id of user with the highest PageRank.

Set tolerance “tol” as 0.0001 when using the pageRank algorithm.

def highestPageRank(graph):
    results = graph.pageRank(tol=0.0001)
    vertexRank = results.vertices
    results.vertices.show()
    #results.edges.show()
    maxid = vertexRank.select(vertexRank.id).orderBy(vertexRank.pagerank.desc())
    #maxid.show()
    output = maxid.collect()[0]
    # YOUR CODE HERE
    return output.id
# example print

highestPageRank(graph)
'''highestPageRank tests'''

#graph = createGraph(testFile)
assert highestPageRank(testGraph) == 10, "the highest page rank was expected to be 10 but it was %s" % highestPageRank(testGraph)
spark.catalog.clearCache()
spark.stop()