PySpark Graph Frames
0.2 Create Graph
createGraph creates a GraphFrame. Use user_id as both vertex id and vertex attribute. Use number of unique connections between users as edge weight.
Example: Supposed we have data shown below:
| first_user_id | second_user_id |
|---|---|
| 1 | 2 |
| 1 | 2 |
| 2 | 1 |
| 1 | 3 |
| 2 | 3 |
The graph should be:

param path: path of file whose data should be used to create the GraphFrame
return: GraphFrame
Load file into RDD then use RDD transformations to extract and parse data.
use Regex for fine grain selection.
def createGraph(path):
path = testFile
GraphRDD = sc.textFile(path)
#a = a0.distinct()
FilteredRDD = GraphRDD.filter(lambda x: len(re.findall(r" *([0-9]+) \| *([0-9]+)",x)) != 0) #filter out data
ProcessedRDD = FilteredRDD.map(lambda x: re.findall(r" *([0-9]+) \| *([0-9]+)",x) ) #process data to list
PairsRDD = ProcessedRDD.map(lambda x: (x[0],1)) #convert list to pair
PreVertex0 = PairsRDD.map(lambda x: [x[0][0],x[0][1]])
PreVertex1 = PreVertex0.flatMap(lambda x: [x[0],x[1]])
Vertex = list(set(PreVertex1.collect()))
#PreVertex1 = PreVertex0.reduce(lambda x,y: [x].extend([y]))
print(PreVertex0.take(5))
print(PreVertex1.take(4))
print(Vertex)
d3 = d.reduceByKey(lambda x,y: x + y)
#print(Vertex.take(15))
Edge = d3.map(lambda x: (int(x[0][0]),int(x[0][1]),x[1]))
print(Edge.take(15))
Vertex2 = [(int(i),int(i)) for i in Vertex]
GraphVertex = spark.createDataFrame(Vertex2, ["id", "name"])
GraphEdge = spark.createDataFrame(Edge, ["src", "dst", "relationship"])
Graph = GraphFrame(GraphVertex,GraphEdge)
return Graph# example print
graph = createGraph(sampleFile).persist()
graph.vertices.show()
graph.edges.show()---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-9-4ec2ad282db8> in <module>
1 # example print
2
----> 3 graph = createGraph(sampleFile).persist()
4 graph.vertices.show()
5 graph.edges.show()
NameError: name 'sampleFile' is not defined
'''createGraph tests'''
import random
correctVertices = sc.parallelize([Row(id=2, name=2),
Row(id=10, name=10),
Row(id=8, name=8),
Row(id=3, name=3),
Row(id=7, name=7),
Row(id=4, name=4),
Row(id=1, name=1),
Row(id=9, name=9)]).toDF()
correctEdges = sc.parallelize([Row(src=2, dst=10, relationship=1),
Row(src=2, dst=8, relationship=1),
Row(src=3, dst=7, relationship=1),
Row(src=3, dst=10, relationship=1),
Row(src=2, dst=3, relationship=1),
Row(src=10, dst=4, relationship=1),
Row(src=4, dst=10, relationship=1),
Row(src=4, dst=2, relationship=1),
Row(src=1, dst=9, relationship=1),
Row(src=1, dst=10, relationship=2),
Row(src=7, dst=9, relationship=1),
Row(src=1, dst=3, relationship=1),
Row(src=10, dst=1, relationship=1)]).toDF()
testGraph = createGraph(testFile).persist()
testVertices = testGraph.vertices
testEdges = testGraph.edges
assert testVertices.count() == correctVertices.count(), "the vertices count was expected to be %s but it was %s" % (correctVertices.count(), testVertices.count())
assert testEdges.count() == correctEdges.count(), "the edges count was expected to be %s but it was %s" % (correctEdges.count(), testEdges.count())
equalDF(testGraph.vertices, correctVertices, "id")
equalDF(testGraph.edges, correctEdges, "src", "dst", "relationship")
0.3 Both Directions
bothDirections finds pairs of users who are connected by an edge in both directions.
param graph: GraphFrame containing social data (created by createGraph).
return: DataFrame which has columns “start”, “end” and “connections”, corresponding to the starting user id, ending user id and number of connections between two users.
Example: Supposed we have a graph as below:
The result is
| start | end | connections |
|---|---|---|
| 10 | 4 | 1 |
| 4 | 10 | 1 |
| 1 | 10 | 2 |
| 10 | 1 | 1 |
find function from GraphFrames
def bothDirections(graph):
#motifs = graph.find(r"(src)->[relationship]->(dst); (dst)->[relationship]->(src)")
motifs = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.createOrReplaceTempView("arrows")
postprocess = spark.sql("""
SELECT a FROM arrows
""")
motifs.show()
output = motifs.select(motifs.e.src.alias("start"),motifs.e.dst.alias("end"),motifs.e.relationship.alias("connections"))
#motifs.selectExpr(r"a[1]").show()
#print(motifs.select("*").show())
print(motifs.collect())
print(motifs.a.id)
#print(GraphFrame(postprocess))
return output# example print
bothDirections(graph).show()'''bothDirections tests'''
correctEdges = sc.parallelize([Row(start=10, end=4, connections=1),
Row(start=4, end=10, connections=1),
Row(start=1, end=10, connections=2),
Row(start=10, end=1, connections=1)]).toDF()
equalDF(bothDirections(testGraph), correctEdges, "start", "end", "connections")0.4 Most Active User
mostActiveUser finds which user has the most outward connections.
param graph: GraphFrame containing social data.
return: id of user who has the most outward connections. Return the smallest id if more than one users have the same number of outward connections.
def mostActiveUser(graph):
print(graph.outDegrees.show())
newDF = graph.outDegrees
newDF.select(newDF.id).show()
outDF = newDF.select(newDF.id).orderBy(newDF.outDegree.desc(),newDF.id.asc())
outDF.show()
return outDF.first().id# example print
mostActiveUser(graph)0.5 Connection Ratio
connectionRatio shows which user has the highest ratio of inward connections but fewest outward connections.
param graph: GraphFrame containing social data.
return DataFrame which has columns “id” and “connectionRatio”, where “id” is the id of a user and “connectionRatio” = number of inward connections/number of outward connections. Users without inward or outward connections are be filtered out.
The DataFrame is sorted by connectionRatio in descending order. If more than one users have the same connection ratio, these users are sorted by their id in ascending order.
example output:
| id | connectionRatio |
|---|---|
| 10 | 2.0 |
| 3 | 1.0 |
| 7 | 1.0 |
| 4 | 0.5 |
| 1 | 0.3333333333333333 |
| 2 | 0.3333333333333333 |
def connectionRatio(graph):
#graph.persist()
#print(graph.show())
OUTdf = graph.outDegrees
FILOUT = OUTdf.filter(OUTdf.outDegree > 0)
INdf = graph.inDegrees
OUTdf.show()
FILIN = INdf.filter(INdf.inDegree > 0)
OUTdf.join(INdf,OUTdf.id == INdf.id,"INNER").show()
JOINdf = OUTdf.join(INdf,OUTdf.id == INdf.id,"INNER").drop(OUTdf.id)
print(JOINdf.collect())
FILJOINdf = JOINdf.select(JOINdf.id,(JOINdf.inDegree/JOINdf.outDegree).alias("connectionRatio"))
ORDERdf = FILJOINdf.orderBy(FILJOINdf.connectionRatio.desc(),FILJOINdf.id.asc())
ORDERdf.show()
#print(graph.edges.collect())
#graph.filter("").show()
# YOUR CODE HERE
return ORDERdf# example print
connectionRatio(graph).show()'''connectionRatio tests'''
correct = [Row(id=10, connectionRatio=2.0),
Row(id=3, connectionRatio=1.0),
Row(id=7, connectionRatio=1.0),
Row(id=4, connectionRatio=0.5),
Row(id=1, connectionRatio=1/3),
Row(id=2, connectionRatio=1/3)]
test = connectionRatio(testGraph)
equalArray(test.collect(), correct)0.6 Communities
communities uses label propagation algorithm (LPA) to detect communities for a graph.
param graph: GraphFrame containing social data.
return: DataFrame containing columns “community” and “count”. “community” is the label assigned by LPA and “count” is the number of users who belong to the community. The Dataframe should be sorted by “count” in descending order. If more than one communities have same number of users, these communities should be sorted by label in ascending order.
Note: set 5 as the number of iterations to be performed when running LPA.
Example output:
| community | count |
|---|---|
| 1 | 4 |
| 3 | 2 |
| 10 | 2 |
def communities(graph):
graph = testGraph
#print(graph.)
result = graph.labelPropagation(maxIter=5)
result.createOrReplaceTempView("ctable")
newdf = spark.sql("""
SELECT label AS community,COUNT(label) AS count FROM ctable
GROUP BY label
ORDER BY count DESC, community ASC
""")
#print(newdf.take(10))
#graph.edges.show()
# YOUR CODE HERE
return newdf# example print
communities(graph).show()'''communities tests'''
correct = [Row(community=2, count=4),
Row(community=8, count=3),
Row(community=10, count=1)]
equalArray(communities(testGraph).collect(), correct)0.7 Highest Page Rank
highestPageRank finds which user has the highest PageRank.
param graph: GraphFrame containing social data.
return: id of user with the highest PageRank.
Set tolerance “tol” as 0.0001 when using the pageRank algorithm.
def highestPageRank(graph):
results = graph.pageRank(tol=0.0001)
vertexRank = results.vertices
results.vertices.show()
#results.edges.show()
maxid = vertexRank.select(vertexRank.id).orderBy(vertexRank.pagerank.desc())
#maxid.show()
output = maxid.collect()[0]
# YOUR CODE HERE
return output.id# example print
highestPageRank(graph)'''highestPageRank tests'''
#graph = createGraph(testFile)
assert highestPageRank(testGraph) == 10, "the highest page rank was expected to be 10 but it was %s" % highestPageRank(testGraph)spark.catalog.clearCache()
spark.stop()