Skip to content

Commit

Permalink
Cugraph-Service Remote Graphs and Algorithm Dispatch (#2832)
Browse files Browse the repository at this point in the history
Resolves rapidsai/graph_dl#80

This PR combines `RemoteGraph` and `RemotePropertyGraph`, adds support for remote API calls to cugraph.Graph, adds subgraph extraction to the property graph wrappers, adds the ability to dispatch cuGraph algorithms (though currently only `uniform_neighbor_sample` is supported), better supports multigraphs for subgraph extraction, properly handles renumbering on the server, and supports implicit subgraph extraction for remote graphs.

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Erik Welch (https://github.com/eriknw)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - Vibhu Jawa (https://github.com/VibhuJawa)

URL: #2832
  • Loading branch information
alexbarghi-nv authored Nov 10, 2022
1 parent fd0a2be commit ffa4a6d
Show file tree
Hide file tree
Showing 10 changed files with 803 additions and 263 deletions.
71 changes: 62 additions & 9 deletions python/cugraph/cugraph/gnn/pyg_extensions/loader/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,76 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from cugraph.structure.graph_implementation import (
simpleDistributedGraphImpl,
simpleGraphImpl,
)
# cuGraph or cuGraph-Service is required; each has its own version of
# import_optional and we need to select the correct one.
try:
from cugraph_service.client.remote_graph_utils import import_optional
except ModuleNotFoundError:
try:
from cugraph.utilities.utils import import_optional
except ModuleNotFoundError:
raise ModuleNotFoundError(
"cuGraph extensions for PyG require cuGraph"
"or cuGraph-Service to be installed."
)

_transform_to_backend_dtype_1d = import_optional("_transform_to_backend_dtype_1d")
cudf = import_optional("cudf")
pandas = import_optional("pandas")


def call_cugraph_algorithm(name, graph, *args, backend="numpy", **kwargs):
"""
Calls a cugraph algorithm for a remote, sg, or mg graph.
Requires either cuGraph or cuGraph-Service to be installed.
name : string
The name of the cuGraph algorithm to run (i.e. uniform_neighbor_sample)
graph : Graph (cuGraph) or RemoteGraph (cuGraph-Service)
The graph to call the algorithm on.
backend : ('cudf', 'pandas', 'cupy', 'numpy', 'torch', 'torch:<device>')
[default = 'numpy']
The backend where the algorithm results will be stored. Only used
if the graph is a remote graph.
"""

if graph.is_remote():
# If the graph is remote, cuGraph-Service must be installed
# Therefore we do not explicitly check that it is available
if name != "uniform_neighbor_sample":
raise ValueError(
f"cuGraph algorithm {name} is not yet supported for RemoteGraph"
)
else:
# TODO eventually replace this with a "call_algorithm call"
sample_result = graph._client.uniform_neighbor_sample(
*args, **kwargs, graph_id=graph._graph_id
)

if backend == "cudf":
df = cudf.DataFrame()
elif backend == "pandas":
df = pandas.DataFrame()
else:
# handle cupy, numpy, torch as dict of arrays/tensors
df = {}

# _transform_to_backend_dtype_1d handles array/Series conversion
for k, v in sample_result.__dict__.items():
df[k] = _transform_to_backend_dtype_1d(
v, series_name=k, backend=backend
)

return df

def call_cugraph_algorithm(name, graph, *args, **kwargs):
# TODO check using graph property in a future PR
if isinstance(graph._Impl, simpleDistributedGraphImpl):
elif graph.is_multi_gpu():
import cugraph.dask

return getattr(cugraph.dask, name)(graph, *args, **kwargs)

# TODO check using graph property in a future PR
elif isinstance(graph._Impl, simpleGraphImpl):
else:
import cugraph

return getattr(cugraph, name)(graph, *args, **kwargs)

# TODO Properly dispatch for cugraph-service.
7 changes: 7 additions & 0 deletions python/cugraph/cugraph/structure/graph_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,13 @@ def is_remote(self):
"""
return False

def is_multi_gpu(self):
"""
Returns True if the graph is a multi-gpu graph; otherwise
returns False.
"""
return isinstance(self._Impl, simpleDistributedGraphImpl)

def to_directed(self):
"""
Return a directed representation of the graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ class simpleGraphImpl:
edgeWeightCol = "weights"
edgeIdCol = "edge_id"
edgeTypeCol = "edge_type"
srcCol = "src"
dstCol = "dst"

class EdgeList:
def __init__(self, source, destination, edge_attr=None):
self.edgelist_df = cudf.DataFrame()
self.edgelist_df["src"] = source
self.edgelist_df["dst"] = destination
self.edgelist_df[simpleGraphImpl.srcCol] = source
self.edgelist_df[simpleGraphImpl.dstCol] = destination
self.weights = False
if edge_attr is not None:
self.weights = True
Expand Down Expand Up @@ -245,7 +247,12 @@ def __from_edgelist(
value_col=value_col, store_transposed=store_transposed, renumber=renumber
)

def to_pandas_edgelist(self, source="src", destination="dst", weight="weights"):
def to_pandas_edgelist(
self,
source="src",
destination="dst",
weight="weights",
):
"""
Returns the graph edge list as a Pandas DataFrame.
Expand All @@ -266,11 +273,21 @@ def to_pandas_edgelist(self, source="src", destination="dst", weight="weights"):
gdf = self.view_edge_list()
if self.properties.weighted:
gdf.rename(
columns={"src": source, "dst": destination, "weight": weight},
columns={
simpleGraphImpl.srcCol: source,
simpleGraphImpl.dstCol: destination,
"weight": weight,
},
inplace=True,
)
else:
gdf.rename(columns={"src": source, "dst": destination}, inplace=True)
gdf.rename(
columns={
simpleGraphImpl.srcCol: source,
simpleGraphImpl.dstCol: destination,
},
inplace=True,
)
return gdf.to_pandas()

def to_pandas_adjacency(self):
Expand All @@ -296,9 +313,9 @@ def to_numpy_array(self):
df = self.edgelist.edgelist_df
np_array = np.full((nlen, nlen), 0.0)
for i in range(0, elen):
np_array[df["src"].iloc[i], df["dst"].iloc[i]] = df[
self.edgeWeightCol
].iloc[i]
np_array[
df[simpleGraphImpl.srcCol].iloc[i], df[simpleGraphImpl.dstCol].iloc[i]
] = df[self.edgeWeightCol].iloc[i]
return np_array

def to_numpy_matrix(self):
Expand Down Expand Up @@ -345,11 +362,18 @@ def view_edge_list(self):
edgelist_df = self.edgelist.edgelist_df

if self.properties.renumbered:
edgelist_df = self.renumber_map.unrenumber(edgelist_df, "src")
edgelist_df = self.renumber_map.unrenumber(edgelist_df, "dst")
edgelist_df = self.renumber_map.unrenumber(
edgelist_df, simpleGraphImpl.srcCol
)
edgelist_df = self.renumber_map.unrenumber(
edgelist_df, simpleGraphImpl.dstCol
)

if not self.properties.directed:
edgelist_df = edgelist_df[edgelist_df["src"] <= edgelist_df["dst"]]
edgelist_df = edgelist_df[
edgelist_df[simpleGraphImpl.srcCol]
<= edgelist_df[simpleGraphImpl.dstCol]
]
edgelist_df = edgelist_df.reset_index(drop=True)
self.properties.edge_count = len(edgelist_df)

Expand Down Expand Up @@ -576,7 +600,9 @@ def number_of_vertices(self):
elif self.transposedadjlist is not None:
self.properties.node_count = len(self.transposedadjlist.offsets) - 1
elif self.edgelist is not None:
df = self.edgelist.edgelist_df[["src", "dst"]]
df = self.edgelist.edgelist_df[
[simpleGraphImpl.srcCol, simpleGraphImpl.dstCol]
]
self.properties.node_count = df.max().max() + 1
else:
raise RuntimeError("Graph is Empty")
Expand All @@ -601,8 +627,8 @@ def number_of_edges(self, directed_edges=False):
if self.properties.directed is False:
self.properties.edge_count = len(
self.edgelist.edgelist_df[
self.edgelist.edgelist_df["src"]
>= self.edgelist.edgelist_df["dst"]
self.edgelist.edgelist_df[simpleGraphImpl.srcCol]
>= self.edgelist.edgelist_df[simpleGraphImpl.dstCol]
]
)
else:
Expand Down Expand Up @@ -852,8 +878,8 @@ def _make_plc_graph(self, value_col=None, store_transposed=False, renumber=True)
self._plc_graph = SGGraph(
resource_handle=ResourceHandle(),
graph_properties=graph_props,
src_array=self.edgelist.edgelist_df["src"],
dst_array=self.edgelist.edgelist_df["dst"],
src_array=self.edgelist.edgelist_df[simpleGraphImpl.srcCol],
dst_array=self.edgelist.edgelist_df[simpleGraphImpl.dstCol],
weight_array=weight_col,
edge_id_array=id_col,
edge_type_array=type_col,
Expand Down Expand Up @@ -901,10 +927,15 @@ def to_undirected(self, G, store_transposed=False):
df = self.edgelist.edgelist_df
if self.edgelist.weights:
source_col, dest_col, value_col = symmetrize(
df, "src", "dst", simpleGraphImpl.edgeWeightCol
df,
simpleGraphImpl.srcCol,
simpleGraphImpl.dstCol,
simpleGraphImpl.edgeWeightCol,
)
else:
source_col, dest_col = symmetrize(df, "src", "dst")
source_col, dest_col = symmetrize(
df, simpleGraphImpl.srcCol, simpleGraphImpl.dstCol
)
value_col = None
G.edgelist = simpleGraphImpl.EdgeList(source_col, dest_col, value_col)

Expand All @@ -923,25 +954,29 @@ def has_node(self, n):
tmp = self.renumber_map.to_internal_vertex_id(cudf.Series([n]))
return tmp[0] is not cudf.NA and tmp[0] >= 0
else:
df = self.edgelist.edgelist_df[["src", "dst"]]
df = self.edgelist.edgelist_df[
[simpleGraphImpl.srcCol, simpleGraphImpl.dstCol]
]
return (df == n).any().any()

def has_edge(self, u, v):
"""
Returns True if the graph contains the edge (u,v).
"""
if self.properties.renumbered:
tmp = cudf.DataFrame({"src": [u, v]})
tmp = tmp.astype({"src": "int"})
tmp = cudf.DataFrame({simpleGraphImpl.srcCol: [u, v]})
tmp = tmp.astype({simpleGraphImpl.srcCol: "int"})
tmp = self.renumber_map.add_internal_vertex_id(
tmp, "id", "src", preserve_order=True
tmp, "id", simpleGraphImpl.srcCol, preserve_order=True
)

u = tmp["id"][0]
v = tmp["id"][1]

df = self.edgelist.edgelist_df
return ((df["src"] == u) & (df["dst"] == v)).any()
return (
(df[simpleGraphImpl.srcCol] == u) & (df[simpleGraphImpl.dstCol] == v)
).any()

def has_self_loop(self):
"""
Expand All @@ -950,7 +985,7 @@ def has_self_loop(self):
# Detect self loop
if self.properties.self_loop is None:
elist = self.edgelist.edgelist_df
if (elist["src"] == elist["dst"]).any():
if (elist[simpleGraphImpl.srcCol] == elist[simpleGraphImpl.dstCol]).any():
self.properties.self_loop = True
else:
self.properties.self_loop = False
Expand All @@ -962,7 +997,7 @@ def edges(self):
sources and destinations. It does not return the edge weights.
For viewing edges with weights use view_edge_list()
"""
return self.view_edge_list()[["src", "dst"]]
return self.view_edge_list()[[simpleGraphImpl.srcCol, simpleGraphImpl.dstCol]]

def nodes(self):
"""
Expand All @@ -981,7 +1016,9 @@ def nodes(self):
else:
return df[df.columns[0]]
else:
return cudf.concat([df["src"], df["dst"]]).unique()
return cudf.concat(
[df[simpleGraphImpl.srcCol], df[simpleGraphImpl.dstCol]]
).unique()
if self.adjlist is not None:
return cudf.Series(np.arange(0, self.number_of_nodes()))

Expand All @@ -995,7 +1032,9 @@ def neighbors(self, n):
n = node[0]

df = self.edgelist.edgelist_df
neighbors = df[df["src"] == n]["dst"].reset_index(drop=True)
neighbors = df[df[simpleGraphImpl.srcCol] == n][
simpleGraphImpl.dstCol
].reset_index(drop=True)
if self.properties.renumbered:
# FIXME: Multi-column vertices
return self.renumber_map.from_internal_vertex_id(neighbors)["0"]
Expand Down
1 change: 0 additions & 1 deletion python/cugraph_service/cugraph_service_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@

from cugraph_service_client.client import CugraphServiceClient
from cugraph_service_client.remote_graph import RemoteGraph
from cugraph_service_client.remote_graph import RemotePropertyGraph
22 changes: 11 additions & 11 deletions python/cugraph_service/cugraph_service_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import cupy as cp

from cugraph_service_client import defaults
from cugraph_service_client.remote_graph import RemoteGraph
from cugraph_service_client import extension_return_dtype_map
from cugraph_service_client.remote_graph import RemotePropertyGraph
from cugraph_service_client.types import (
ValueWrapper,
GraphVertexEdgeID,
Expand Down Expand Up @@ -515,9 +515,9 @@ def delete_graph(self, graph_id):

def graph(self):
"""
Constructs an empty RemotePropertyGraph object.
Constructs a new RemoteGraph object wrapping a remote PropertyGraph.
"""
return RemotePropertyGraph(self, self.create_graph())
return RemoteGraph(self, self.create_graph())

@__server_connection
def get_graph_ids(self):
Expand Down Expand Up @@ -797,7 +797,7 @@ def extract_subgraph(
selection=None,
edge_weight_property="",
default_edge_weight=1.0,
allow_multi_edges=False,
check_multi_edges=True,
renumber_graph=True,
add_edge_data=True,
graph_id=defaults.graph_id,
Expand All @@ -811,7 +811,7 @@ def extract_subgraph(
create_using : string, default is None
String describing the type of Graph object to create from the
selected subgraph of vertices and edges. The default (None) results
in a cugraph.Graph object.
in a directed cugraph.MultiGraph object.
selection : int, default is None
A PropertySelection ID returned from one or more calls to
Expand All @@ -830,10 +830,10 @@ def extract_subgraph(
The value to use when an edge property is specified but not present
on an edge.
allow_multi_edges : bool
If True, multiple edges should be used to create the resulting
Graph, otherwise multiple edges will be detected and an exception
raised.
check_multi_edges : bool (default is True)
When True and create_using argument is given and not a MultiGraph,
this will perform an expensive check to verify that the edges in
the edge dataframe do not form a multigraph with duplicate edges.
graph_id : int, default is defaults.graph_id
The graph ID to extract the subgraph from. If the ID passed is not
Expand Down Expand Up @@ -861,7 +861,7 @@ def extract_subgraph(
selection,
edge_weight_property,
default_edge_weight,
allow_multi_edges,
check_multi_edges,
renumber_graph,
add_edge_data,
graph_id,
Expand Down Expand Up @@ -979,7 +979,7 @@ def get_graph_edge_data(
def is_vertex_property(self, property_key, graph_id=defaults.graph_id):
"""
Returns True if the given property key is for a valid vertex property
in the given graph, false otherwise.e
in the given graph, False otherwise.
Parameters
----------
Expand Down
Loading

0 comments on commit ffa4a6d

Please sign in to comment.