PySpark Graph Frames
0.2 Create Graph
createGraph
creates a GraphFrame. Use user_id as both vertex id and vertex attribute. Use number of unique connections between users as edge weight.
Example: Supposed we have data shown below:
first_user_id | second_user_id |
---|---|
1 | 2 |
1 | 2 |
2 | 1 |
1 | 3 |
2 | 3 |
The graph should be:
param path
: path of file whose data should be used to create the GraphFrame
return
: GraphFrame
Load file into RDD then use RDD transformations to extract and parse data.
use Regex for fine grain selection.
def createGraph(path):
= testFile
path = sc.textFile(path)
GraphRDD #a = a0.distinct()
= GraphRDD.filter(lambda x: len(re.findall(r" *([0-9]+) \| *([0-9]+)",x)) != 0) #filter out data
FilteredRDD = FilteredRDD.map(lambda x: re.findall(r" *([0-9]+) \| *([0-9]+)",x) ) #process data to list
ProcessedRDD = ProcessedRDD.map(lambda x: (x[0],1)) #convert list to pair
PairsRDD = PairsRDD.map(lambda x: [x[0][0],x[0][1]])
PreVertex0 = PreVertex0.flatMap(lambda x: [x[0],x[1]])
PreVertex1 = list(set(PreVertex1.collect()))
Vertex #PreVertex1 = PreVertex0.reduce(lambda x,y: [x].extend([y]))
print(PreVertex0.take(5))
print(PreVertex1.take(4))
print(Vertex)
= d.reduceByKey(lambda x,y: x + y)
d3 #print(Vertex.take(15))
= d3.map(lambda x: (int(x[0][0]),int(x[0][1]),x[1]))
Edge print(Edge.take(15))
= [(int(i),int(i)) for i in Vertex]
Vertex2 = spark.createDataFrame(Vertex2, ["id", "name"])
GraphVertex = spark.createDataFrame(Edge, ["src", "dst", "relationship"])
GraphEdge = GraphFrame(GraphVertex,GraphEdge)
Graph return Graph
# example print
= createGraph(sampleFile).persist()
graph
graph.vertices.show() graph.edges.show()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-9-4ec2ad282db8> in <module>
1 # example print
2
----> 3 graph = createGraph(sampleFile).persist()
4 graph.vertices.show()
5 graph.edges.show()
NameError: name 'sampleFile' is not defined
'''createGraph tests'''
import random
= sc.parallelize([Row(id=2, name=2),
correctVertices id=10, name=10),
Row(id=8, name=8),
Row(id=3, name=3),
Row(id=7, name=7),
Row(id=4, name=4),
Row(id=1, name=1),
Row(id=9, name=9)]).toDF()
Row(
= sc.parallelize([Row(src=2, dst=10, relationship=1),
correctEdges =2, dst=8, relationship=1),
Row(src=3, dst=7, relationship=1),
Row(src=3, dst=10, relationship=1),
Row(src=2, dst=3, relationship=1),
Row(src=10, dst=4, relationship=1),
Row(src=4, dst=10, relationship=1),
Row(src=4, dst=2, relationship=1),
Row(src=1, dst=9, relationship=1),
Row(src=1, dst=10, relationship=2),
Row(src=7, dst=9, relationship=1),
Row(src=1, dst=3, relationship=1),
Row(src=10, dst=1, relationship=1)]).toDF()
Row(src
= createGraph(testFile).persist()
testGraph = testGraph.vertices
testVertices = testGraph.edges
testEdges
assert testVertices.count() == correctVertices.count(), "the vertices count was expected to be %s but it was %s" % (correctVertices.count(), testVertices.count())
assert testEdges.count() == correctEdges.count(), "the edges count was expected to be %s but it was %s" % (correctEdges.count(), testEdges.count())
"id")
equalDF(testGraph.vertices, correctVertices, "src", "dst", "relationship")
equalDF(testGraph.edges, correctEdges,
0.3 Both Directions
bothDirections
finds pairs of users who are connected by an edge in both directions.
param graph
: GraphFrame containing social data (created by createGraph
).
return
: DataFrame which has columns “start”, “end” and “connections”, corresponding to the starting user id, ending user id and number of connections between two users.
Example: Supposed we have a graph as below:
The result is
start | end | connections |
---|---|---|
10 | 4 | 1 |
4 | 10 | 1 |
1 | 10 | 2 |
10 | 1 | 1 |
find function from GraphFrames
def bothDirections(graph):
#motifs = graph.find(r"(src)->[relationship]->(dst); (dst)->[relationship]->(src)")
= graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs "arrows")
motifs.createOrReplaceTempView(= spark.sql("""
postprocess SELECT a FROM arrows
""")
motifs.show()= motifs.select(motifs.e.src.alias("start"),motifs.e.dst.alias("end"),motifs.e.relationship.alias("connections"))
output #motifs.selectExpr(r"a[1]").show()
#print(motifs.select("*").show())
print(motifs.collect())
print(motifs.a.id)
#print(GraphFrame(postprocess))
return output
# example print
bothDirections(graph).show()
'''bothDirections tests'''
= sc.parallelize([Row(start=10, end=4, connections=1),
correctEdges =4, end=10, connections=1),
Row(start=1, end=10, connections=2),
Row(start=10, end=1, connections=1)]).toDF()
Row(start"start", "end", "connections") equalDF(bothDirections(testGraph), correctEdges,
0.4 Most Active User
mostActiveUser
finds which user has the most outward connections.
param graph
: GraphFrame containing social data.
return: id of user who has the most outward connections. Return the smallest id if more than one users have the same number of outward connections.
def mostActiveUser(graph):
print(graph.outDegrees.show())
= graph.outDegrees
newDF id).show()
newDF.select(newDF.= newDF.select(newDF.id).orderBy(newDF.outDegree.desc(),newDF.id.asc())
outDF
outDF.show()
return outDF.first().id
# example print
mostActiveUser(graph)
0.5 Connection Ratio
connectionRatio
shows which user has the highest ratio of inward connections but fewest outward connections.
param graph
: GraphFrame containing social data.
return
DataFrame which has columns “id” and “connectionRatio”, where “id” is the id of a user and “connectionRatio” = number of inward connections/number of outward connections. Users without inward or outward connections are be filtered out.
The DataFrame is sorted by connectionRatio in descending order. If more than one users have the same connection ratio, these users are sorted by their id in ascending order.
example output:
id | connectionRatio |
---|---|
10 | 2.0 |
3 | 1.0 |
7 | 1.0 |
4 | 0.5 |
1 | 0.3333333333333333 |
2 | 0.3333333333333333 |
def connectionRatio(graph):
#graph.persist()
#print(graph.show())
= graph.outDegrees
OUTdf = OUTdf.filter(OUTdf.outDegree > 0)
FILOUT = graph.inDegrees
INdf
OUTdf.show()= INdf.filter(INdf.inDegree > 0)
FILIN id == INdf.id,"INNER").show()
OUTdf.join(INdf,OUTdf.= OUTdf.join(INdf,OUTdf.id == INdf.id,"INNER").drop(OUTdf.id)
JOINdf print(JOINdf.collect())
= JOINdf.select(JOINdf.id,(JOINdf.inDegree/JOINdf.outDegree).alias("connectionRatio"))
FILJOINdf = FILJOINdf.orderBy(FILJOINdf.connectionRatio.desc(),FILJOINdf.id.asc())
ORDERdf
ORDERdf.show()#print(graph.edges.collect())
#graph.filter("").show()
# YOUR CODE HERE
return ORDERdf
# example print
connectionRatio(graph).show()
'''connectionRatio tests'''
= [Row(id=10, connectionRatio=2.0),
correct id=3, connectionRatio=1.0),
Row(id=7, connectionRatio=1.0),
Row(id=4, connectionRatio=0.5),
Row(id=1, connectionRatio=1/3),
Row(id=2, connectionRatio=1/3)]
Row(
= connectionRatio(testGraph)
test equalArray(test.collect(), correct)
0.6 Communities
communities
uses label propagation algorithm (LPA) to detect communities for a graph.
param graph
: GraphFrame containing social data.
return
: DataFrame containing columns “community” and “count”. “community” is the label assigned by LPA and “count” is the number of users who belong to the community. The Dataframe should be sorted by “count” in descending order. If more than one communities have same number of users, these communities should be sorted by label in ascending order.
Note: set 5 as the number of iterations to be performed when running LPA.
Example output:
community | count |
---|---|
1 | 4 |
3 | 2 |
10 | 2 |
def communities(graph):
= testGraph
graph #print(graph.)
= graph.labelPropagation(maxIter=5)
result "ctable")
result.createOrReplaceTempView(= spark.sql("""
newdf SELECT label AS community,COUNT(label) AS count FROM ctable
GROUP BY label
ORDER BY count DESC, community ASC
""")
#print(newdf.take(10))
#graph.edges.show()
# YOUR CODE HERE
return newdf
# example print
communities(graph).show()
'''communities tests'''
= [Row(community=2, count=4),
correct =8, count=3),
Row(community=10, count=1)]
Row(community
equalArray(communities(testGraph).collect(), correct)
0.7 Highest Page Rank
highestPageRank
finds which user has the highest PageRank.
param graph
: GraphFrame containing social data.
return
: id of user with the highest PageRank.
Set tolerance “tol” as 0.0001 when using the pageRank algorithm.
def highestPageRank(graph):
= graph.pageRank(tol=0.0001)
results = results.vertices
vertexRank
results.vertices.show()#results.edges.show()
= vertexRank.select(vertexRank.id).orderBy(vertexRank.pagerank.desc())
maxid #maxid.show()
= maxid.collect()[0]
output # YOUR CODE HERE
return output.id
# example print
highestPageRank(graph)
'''highestPageRank tests'''
#graph = createGraph(testFile)
assert highestPageRank(testGraph) == 10, "the highest page rank was expected to be 10 but it was %s" % highestPageRank(testGraph)
spark.catalog.clearCache() spark.stop()