-
Notifications
You must be signed in to change notification settings - Fork 1
/
graph.py
142 lines (102 loc) · 4.06 KB
/
graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from xml.etree import cElementTree as ET
from collections import defaultdict
class Graph():
def __init__(self, *args):
"""Initialize graph.
Supports the following types of initializations:
g = Graph(graphml_file)
g = Graph(nodes, edges)
where:
graphml (str): content of a GraphML file.
nodes (list): list of node objects.
edges (list): list of edges, each as (node, node).
`edges` can alternatively be provided as a dict(node -> node).
"""
signature = tuple(map(type, args))
handlers = {
(str,): self._load_graphml,
#(unicode,): self._load_graphml,
(list, list): self._init_nodes_edges,
(list, dict): self._init_nodes_edges,
(list, defaultdict): self._init_nodes_edges
}
assert signature in handlers, 'Unsupported arguments %s' % str(
signature)
handlers[signature](*args)
def _init_nodes_edges(self, nodes, edges):
self.nodes = nodes
if type(edges) is list: self.set_edge_list(edges)
else: self.edges = edges
def _load_graphml(self, graphml):
"""Load nodes and edges from a GraphML description.
Args:
graphml (str): content of a GraphML file.
"""
tree = ET.ElementTree(ET.fromstring(graphml))
root_elems = [elem for elem in tree.getroot() if elem.tag.endswith("graph")]
assert root_elems, "Could not find <graph> element"
root = root_elems[0]
namespaces = {"graphml": "http://graphml.graphdrawing.org/xmlns"}
edge_default = root.attrib.get("edgedefault", "directed")
self.nodes = [
node.attrib["id"]
for node in root.findall("graphml:node", namespaces)
]
edge_list = [(e.attrib["source"], e.attrib["target"])
for e in root.findall("graphml:edge", namespaces)]
if edge_default == "undirected":
reversed_edges = [(dst, src) for src, dst in edge_list]
edge_list += reversed_edges
self.set_edge_list(edge_list)
def set_edge_list(self, edge_list):
"""Load edges from an edge list.
Args:
edge_list ([(node, node)]): edge list.
"""
self.edges = defaultdict(set)
for src, dst in edge_list:
self.edges[src].add(dst)
def reduce_graph(self, predicate):
"""Return a copy graph with a subset of nodes.
Each node is part of the returned graph iff predicate(node) is True.
Args:
predicate (callable :: node -> bool)
Returns:
Graph: graph object.
"""
nodes = filter(predicate, self.nodes)
edge_list = [(src, dst) for src, dst in self.get_edge_list()
if (src in nodes and dst in nodes)]
return Graph(nodes, edge_list)
def get_edge_list(self):
"""Return a list of graph edges."""
result = list()
for src in sorted(self.edges.keys()):
destinations = sorted(self.edges[src])
for dst in destinations:
result.append((src, dst))
return result
def get_outdegree(self, node):
"""Return outdegree of `node`."""
return len(self.edges[node])
def map_node_names(self, name_map):
"""Rename graph nodes based on a name map.
Args:
name_map (dict (node -> node)): name mapping dictionary.
"""
def rename_edge(edge):
src, dst = edge
return (name_map[src], name_map[dst])
new_edge_list = map(rename_edge, self.get_edge_list())
self.nodes = name_map.values()
self.set_edge_list(new_edge_list)
def rename_nodes(self, name_format="n%d"):
"""Rename graph nodes using indices.
Args:
name_format (str): node renaming format (must contain '%d').
"""
name_map = {
node: name_format % ind
for ind, node in enumerate(self.nodes)
}
self.map_node_names(name_map)