diff --git a/python/cugraph_service/README.md b/python/cugraph_service/README.md new file mode 100644 index 00000000000..2e9ed06c361 --- /dev/null +++ b/python/cugraph_service/README.md @@ -0,0 +1,40 @@ +# cugraph_service + +## Description +[RAPIDS](https://rapids.ai) cugraph_service provides an RPC interace to a remote [RAPIDS cuGraph](https://github.com/rapidsai/cugraph) session, allowing users to perform GPU accelerated graph analytics from a remote process. cugraph_service uses cuGraph, cuDF, and other libraries on the server to execute graph data prep and analysis on server-side GPUs. Multiple clients can connect to the server allowing different users and processes the ability to access large graph data that may not otherwise be possible using the client resources. + +##
+ +----- + +## Server +(description) +### Installing `cugraph_service_server` conda package + + TBD + +### Example +Starting the server +``` +$> PYTHONPATH=./python/cugraph_service python -m cugraph_service_server.server +``` + +## Client +(description) +### Installing the `cugraph_service_client` conda package + + TBD + +### Example +Creating a client +``` +>>> from cugraph_service_client import CugraphServiceClient +>>> client = CugraphServiceClient() +>>> client.load_csv_as_vertex_data(...) +``` + +------ + +##
Open GPU Data Science + +The RAPIDS suite of open source software libraries aims to enable execution of end-to-end data science and analytics pipelines entirely on GPUs. It relies on NVIDIA® CUDA® primitives for low-level compute optimization but exposing that GPU parallelism and high-bandwidth memory speed through user-friendly Python interfaces. diff --git a/python/cugraph_service/cugraph_service_client/__init__.py b/python/cugraph_service/cugraph_service_client/__init__.py new file mode 100644 index 00000000000..c7479163894 --- /dev/null +++ b/python/cugraph_service/cugraph_service_client/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cugraph_service_client.client import CugraphServiceClient diff --git a/python/cugraph_service/cugraph_service_client/client.py b/python/cugraph_service/cugraph_service_client/client.py new file mode 100644 index 00000000000..1292ab06048 --- /dev/null +++ b/python/cugraph_service/cugraph_service_client/client.py @@ -0,0 +1,955 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functools import wraps +from collections.abc import Sequence +import pickle + +from cugraph_service_client import defaults +from cugraph_service_client.types import ValueWrapper, GraphVertexEdgeID +from cugraph_service_client.cugraph_service_thrift import create_client + + +class CugraphServiceClient: + """ + Client object for cugraph_service, which defines the API that clients can + use to access the cugraph_service server. + """ + def __init__(self, host=defaults.host, port=defaults.port): + """ + Creates a connection to a cugraph_service server running on host/port. + + Parameters + ---------- + host : string, defaults to 127.0.0.1 + Hostname where the cugraph_service server is running + + port : int, defaults to 9090 + Port number where the cugraph_service server is listening + + Returns + ------- + CugraphServiceClient object + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + """ + self.host = host + self.port = port + self.__client = None + + # If True, do not automatically close a server connection upon + # completion or error of a server API call. This requires the caller to + # manually call close() when done. + self.hold_open = False + + def __del__(self): + self.close() + + def __server_connection(method): + """ + Decorator for methods that require a connection to the server to be + created prior to calling a server function, then closed upon completion + or error. If self.hold_open is True, the automatic call to close() will + not take place, allowing for multiple subsequent server calls to be + made using the same connection. self.hold_open therefore requires the + caller to manually call close() in order to allow other clients to + connect. + """ + @wraps(method) + def wrapped_method(self, *args, **kwargs): + self.open() + try: + ret_val = method(self, *args, **kwargs) + finally: + if not self.hold_open: + self.close() + return ret_val + return wrapped_method + + def open(self, call_timeout=900000): + """ + Opens a connection to the server at self.host/self.port if one is not + already established. close() must be called in order to allow other + connections from other clients to be made. + + This call does nothing if a connection to the server is already open. + + Note: all APIs that access the server will call this method + automatically, followed automatically by a call to close(), so calling + this method should not be necessary. close() is not automatically + called if self.hold_open is False. + + Parameters + ---------- + call_timeout : int (default is 900000) + Time in millisecods that calls to the server using this open + connection must return by. + + Returns + ------- + None + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> # Manually open a connection. The connection is held open and other + >>> # clients cannot connect until a client API call completes or + >>> # close() is manually called. + >>> client.open() + + """ + if self.__client is None: + self.__client = create_client(self.host, self.port, + call_timeout=call_timeout) + + def close(self): + """ + Closes a connection to the server if one has been established, allowing + other clients to access the server. This method is called automatically + for all APIs that access the server if self.hold_open is False. + + Parameters + ---------- + None + + Returns + ------- + None + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> # Have the client hold open the connect automatically opened as + >>> # part of a server API call until close() is called. This is + >>> # normally not necessary and shown here for demonstration purposes. + >>> client.hold_open = True + >>> client.node2vec([0,1], 2) + >>> # close the connection so other clients can connect + >>> client.close() + >>> # go back to automatic open/close mode (safer) + >>> client.hold_open = False + """ + if self.__client is not None: + self.__client.close() + self.__client = None + + ########################################################################### + # Environment management + @__server_connection + def uptime(self): + """ + Return the server uptime in seconds. This is often used as a "ping". + + Parameters + ---------- + None + + Returns + ------- + uptime : int + The time in seconds the server has been running. + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> client.uptime() + >>> 32 + """ + return self.__client.uptime() + + @__server_connection + def get_server_info(self): + """ + Return a dictionary of information about the server. + + Parameters + ---------- + None + + Returns + ------- + server_info : dict + + Dictionary containing environment and state information about the + server. + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> client.get_server_info() + >>> {'num_gpus': 2} + """ + server_info = self.__client.get_server_info() + # server_info is a dictionary of Value objects ("union" types returned + # from the server), so convert them to simple py types. + return dict((k, ValueWrapper(server_info[k]).get_py_obj()) + for k in server_info) + + @__server_connection + def load_graph_creation_extensions(self, extension_dir_path): + """ + Loads the extensions for graph creation present in the directory + specified by extension_dir_path. + + Parameters + ---------- + extension_dir_path : string + Path to the directory containing the extension files (.py source + files). This directory must be readable by the server. + + Returns + ------- + num_files_read : int + Number of extension files read in the extension_dir_path directory. + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> num_files_read = client.load_graph_creation_extensions( + ... "/some/server/side/directory") + >>> + """ + return self.__client.load_graph_creation_extensions(extension_dir_path) + + @__server_connection + def unload_graph_creation_extensions(self): + """ + Removes all extensions for graph creation previously loaded. + + Parameters + ---------- + None + + Returns + ------- + None + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> client.unload_graph_creation_extensions() + >>> + """ + return self.__client.unload_graph_creation_extensions() + + @__server_connection + def call_graph_creation_extension(self, func_name, + *func_args, **func_kwargs): + """ + Calls a graph creation extension on the server that was previously + loaded by a prior call to load_graph_creation_extensions(), then + returns the graph ID of the graph created by the extension. + + Parameters + ---------- + func_name : string + The name of the server-side extension function loaded by a prior + call to load_graph_creation_extensions(). All graph creation + extension functions are expected to return a new graph. + + *func_args : string, int, list, dictionary (optional) + The positional args to pass to func_name. Note that func_args are + converted to their string representation using repr() on the + client, then restored to python objects on the server using eval(), + and therefore only objects that can be restored server-side with + eval() are supported. + + **func_kwargs : string, int, list, dictionary + The keyword args to pass to func_name. Note that func_kwargs are + converted to their string representation using repr() on the + client, then restored to python objects on the server using eval(), + and therefore only objects that can be restored server-side with + eval() are supported. + + Returns + ------- + graph_id : int + unique graph ID + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> # Load the extension file containing "my_complex_create_graph()" + >>> client.load_graph_creation_extensions("/some/server/side/dir") + >>> new_graph_id = client.call_graph_creation_extension( + ... "my_complex_create_graph", + ... "/path/to/csv/on/server/graph.csv", + ... clean_data=True) + >>> + """ + func_args_repr = repr(func_args) + func_kwargs_repr = repr(func_kwargs) + return self.__client.call_graph_creation_extension( + func_name, func_args_repr, func_kwargs_repr) + + ########################################################################### + # Graph management + @__server_connection + def create_graph(self): + """ + Create a new graph associated with a new (non-default) unique graph ID, + return the new graph ID. + + Parameters + ---------- + None + + Returns + ------- + graph_id : int + unique graph ID + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> my_graph_id = client.create_graph() + >>> # Load a CSV to the new graph + >>> client.load_csv_as_edge_data( + ... "edges.csv", ["int32", "int32", "float32"], + ... vertex_col_names=["src", "dst"], graph_id=my_graph_id) + >>> + """ + return self.__client.create_graph() + + @__server_connection + def delete_graph(self, graph_id): + """ + Deletes the graph referenced by graph_id. + + Parameters + ---------- + graph_id : int + The graph ID to delete. If the ID passed is not valid on the + server, CugraphServiceError is raised. + + Returns + ------- + None + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> my_graph_id = client.create_graph() + >>> # Load a CSV to the new graph + >>> client.load_csv_as_edge_data( + ... "edges.csv", ["int32", "int32", "float32"], + ... vertex_col_names=["src", "dst"], graph_id=my_graph_id) + >>> # Remove the graph instance on the server and reclaim the memory + >>> client.delete_graph(my_graph_id) + """ + return self.__client.delete_graph(graph_id) + + @__server_connection + def get_graph_ids(self): + """ + Returns a list of all graph IDs the server is currently maintaining. + + Parameters + ---------- + None + + Returns + ------- + graph_id_list : list of unique int graph IDs + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> # This server already has graphs loaded from other sessions + >>> client.get_graph_ids() + [0, 26] + >>> + """ + return self.__client.get_graph_ids() + + @__server_connection + def get_graph_info(self, keys=None, graph_id=defaults.graph_id): + """ + Returns a dictionary containing meta-data about the graph referenced by + graph_id (or the default graph if not specified). + + Parameters + ---------- + graph_id : int, default is defaults.graph_id + The graph ID to apply the properties in the CSV to. If not provided + the default graph ID is used. + + Returns + ------- + None + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> client.load_csv_as_vertex_data( + ... "/server/path/to/vertex_data.csv", + ... dtypes=["int32", "string", "int32"], + ... vertex_col_name="vertex_id", + ... header="infer") + >>> client.get_graph_info() + {'num_edges': 3, 'num_vertices': 4} + """ + # Ensure keys is a list of strings when passing to RPC API + if keys is None: + keys = [] + elif isinstance(keys, str): + keys = [keys] + elif isinstance(keys, list): + if False in [isinstance(k, str) for k in keys]: + raise TypeError(f"keys must be a list of strings, got {keys}") + else: + raise TypeError("keys must be a string or list of strings, got " + f"{type(keys)}") + + graph_info = self.__client.get_graph_info(keys, graph_id) + + # special case: if only one key was specified, return only the single + # value + if len(keys) == 1: + return ValueWrapper(graph_info[keys[0]]).get_py_obj() + + # graph_info is a dictionary of Value objects ("union" types returned + # from the graph), so convert them to simple py types. + return dict((k, ValueWrapper(graph_info[k]).get_py_obj()) + for k in graph_info) + + @__server_connection + def load_csv_as_vertex_data(self, + csv_file_name, + dtypes, + vertex_col_name, + delimiter=" ", + header=None, + type_name="", + property_columns=None, + graph_id=defaults.graph_id, + names=None, + ): + + """ + Reads csv_file_name and applies it as vertex data to the graph + identified as graph_id (or the default graph if not specified). + + Parameters + ---------- + csv_file_name : string + Path to CSV file on the server + + dtypes : list of strings + Types for the columns in the CSV file + + vertex_col_name : string + Name of the column to use as the vertex ID + + delimiter : string, default is " " + Character that serves as the delimiter between columns in the CSV + + header : int, default is None + Row number to use as the column names. Default behavior is to + assume column names are explicitely provided (header=None). + header="infer" if the column names are to be inferred. If no names + are passed, header=0. See also cudf.read_csv + + type_name : string, default is "" + The vertex property "type" the CSV data is describing. For + instance, CSV data describing properties for "users" might pass + type_name as "user". A vertex property type is optional. + + property_columns : list of strings, default is None + The column names in the CSV to add as vertex properties. If None, + all columns will be added as properties. + + graph_id : int, default is defaults.graph_id + The graph ID to apply the properties in the CSV to. If not provided + the default graph ID is used. + + names: list of strings, default is None + The names to be used to reference the CSV columns, in lieu of a + header. + + Returns + ------- + None + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> client.load_csv_as_vertex_data( + ... "/server/path/to/vertex_data.csv", + ... dtypes=["int32", "string", "int32"], + ... vertex_col_name="vertex_id", + ... header="infer") + >>> + """ + # Map all int arg types that also have string options to ints + # FIXME: check for invalid header arg values + if header == "infer": + header = -1 + elif header is None: + header = -2 + return self.__client.load_csv_as_vertex_data(csv_file_name, + delimiter, + dtypes, + header, + vertex_col_name, + type_name, + property_columns or [], + graph_id, + names or []) + + @__server_connection + def load_csv_as_edge_data(self, + csv_file_name, + dtypes, + vertex_col_names, + delimiter=" ", + header=None, + type_name="", + property_columns=None, + graph_id=defaults.graph_id, + names=None + ): + """ + Reads csv_file_name and applies it as edge data to the graph identified + as graph_id (or the default graph if not specified). + + Parameters + ---------- + csv_file_name : string + Path to CSV file on the server + + dtypes : list of strings + Types for the columns in the CSV file + + vertex_col_names : tuple of strings + Names of the columns to use as the source and destination vertex + IDs defining the edges + + delimiter : string, default is " " + Character that serves as the delimiter between columns in the CSV + + header : int, default is None + Row number to use as the column names. Default behavior is to + assume column names are explicitely provided (header=None). + header="infer" if the column names are to be inferred. If no names + are passed, header=0. See also cudf.read_csv + + type_name : string, default is "" + The edge property "type" the CSV data is describing. For instance, + CSV data describing properties for "transactions" might pass + type_name as "transaction". An edge property type is optional. + + property_columns : list of strings, default is None + The column names in the CSV to add as edge properties. If None, all + columns will be added as properties. + + graph_id : int, default is defaults.graph_id + The graph ID to apply the properties in the CSV to. If not provided + the default graph ID is used. + + names: list of strings, default is None + The names to be used to reference the CSV columns, in lieu of a + header. + + Returns + ------- + None + + Examples + -------- + >>> from cugraph_service_client import CugraphServiceClient + >>> client = CugraphServiceClient() + >>> client.load_csv_as_edge_data( + ... "/server/path/to/edge_data.csv", + ... dtypes=["int32", "int32", "string", "int32"], + ... vertex_col_names=("src", "dst"), + ... header="infer") + >>> + """ + # Map all int arg types that also have string options to ints + # FIXME: check for invalid header arg values + if header == "infer": + header = -1 + elif header is None: + header = -2 + return self.__client.load_csv_as_edge_data(csv_file_name, + delimiter, + dtypes, + header, + vertex_col_names, + type_name, + property_columns or [], + graph_id, + names or []) + + @__server_connection + def get_edge_IDs_for_vertices(self, src_vert_IDs, dst_vert_IDs, + graph_id=defaults.graph_id): + """ + """ + # FIXME: finish docstring above + # FIXME: add type checking + return self.__client.get_edge_IDs_for_vertices(src_vert_IDs, + dst_vert_IDs, + graph_id) + + @__server_connection + def extract_subgraph(self, + create_using=None, + selection=None, + edge_weight_property="", + default_edge_weight=1.0, + allow_multi_edges=False, + renumber_graph=True, + add_edge_data=True, + graph_id=defaults.graph_id + ): + """ + Return a graph ID for a subgraph of the graph referenced by graph_id + that containing vertices and edges that match a selection. + + Parameters + ---------- + create_using : string, default is None + String describing the type of Graph object to create from the + selected subgraph of vertices and edges. The default (None) results + in a cugraph.Graph object. + + selection : int, default is None + A PropertySelection ID returned from one or more calls to + select_vertices() and/or select_edges(), used for creating a Graph + with only the selected properties. If not speciied the resulting + Graph will have all properties. Note, this could result in a Graph + with multiple edges, which may not be supported based on the value + of create_using. + + edge_weight_property : string, default is "" + The name of the property whose values will be used as weights on + the returned Graph. If not specified, the returned Graph will be + unweighted. + + default_edge_weight : float, default is 1.0 + The value to use when an edge property is specified but not present + on an edge. + + allow_multi_edges : bool + If True, multiple edges should be used to create the resulting + Graph, otherwise multiple edges will be detected and an exception + raised. + + graph_id : int, default is defaults.graph_id + The graph ID to extract the subgraph from. If the ID passed is not + valid on the server, CugraphServiceError is raised. + + Returns + ------- + A graph ID for a new Graph instance of the same type as create_using + containing only the vertices and edges resulting from applying the + selection to the set of vertex and edge property data. + + Examples + -------- + >>> + """ + # FIXME: finish docstring above + + # FIXME: convert defaults to type needed by the Thrift API. These will + # be changing to different types. + create_using = create_using or "" + selection = selection or "" + + return self.__client.extract_subgraph(create_using, + selection, + edge_weight_property, + default_edge_weight, + allow_multi_edges, + renumber_graph, + add_edge_data, + graph_id) + + @__server_connection + def get_graph_vertex_data(self, + id_or_ids=-1, + null_replacement_value=0, + graph_id=defaults.graph_id, + property_keys=None + ): + """ + Returns ... + + Parameters + ---------- + id_or_ids : int or list of ints (default -1) + + null_replacement_value : number or string (default 0) + + graph_id : int, default is defaults.graph_id + The graph ID to extract the subgraph from. If the ID passed is not + valid on the server, CugraphServiceError is raised. + + property_keys : list of strings (default []) + The keys (names) of properties to retrieve. If omitted, returns + the whole dataframe. + + Returns + ------- + + Examples + -------- + >>> + """ + # FIXME: finish docstring above + + vertex_edge_id_obj = self.__get_vertex_edge_id_obj(id_or_ids) + null_replacement_value_obj = ValueWrapper( + null_replacement_value, + val_name="null_replacement_value").union + + ndarray_bytes = \ + self.__client.get_graph_vertex_data( + vertex_edge_id_obj, + null_replacement_value_obj, + graph_id, + property_keys or [] + ) + + return pickle.loads(ndarray_bytes) + + @__server_connection + def get_graph_edge_data(self, + id_or_ids=-1, + null_replacement_value=0, + graph_id=defaults.graph_id, + property_keys=None + ): + """ + Returns ... + + Parameters + ---------- + id_or_ids : int or list of ints (default -1) + + null_replacement_value : number or string (default 0) + + graph_id : int, default is defaults.graph_id + The graph ID to extract the subgraph from. If the ID passed is not + valid on the server, CugraphServiceError is raised. + + property_keys : list of strings (default []) + The keys (names) of properties to retrieve. If omitted, returns + the whole dataframe. + + Returns + ------- + + Examples + -------- + >>> + """ + # FIXME: finish docstring above + + vertex_edge_id_obj = self.__get_vertex_edge_id_obj(id_or_ids) + null_replacement_value_obj = ValueWrapper( + null_replacement_value, + val_name="null_replacement_value").union + + ndarray_bytes = \ + self.__client.get_graph_edge_data( + vertex_edge_id_obj, + null_replacement_value_obj, + graph_id, + property_keys or [] + ) + + return pickle.loads(ndarray_bytes) + + @__server_connection + def is_vertex_property(self, property_key, graph_id=defaults.graph_id): + """ + Returns True if the given property key is for a valid vertex property + in the given graph, false otherwise.e + + Parameters + ---------- + property_key: string + The key (name) of the vertex property to check + graph_id: int + The id of the graph of interest + """ + return self.__client.is_vertex_property(property_key, graph_id) + + @__server_connection + def is_edge_property(self, property_key, graph_id=defaults.graph_id): + """ + Returns True if the given property key is for a valid vertex property + in the given graph, false otherwise.e + + Parameters + ---------- + property_key: string + The key (name) of the vertex property to check + graph_id: int + The id of the graph of interest + """ + return self.__client.is_edge_property(property_key, graph_id) + + ########################################################################### + # Algos + @__server_connection + def batched_ego_graphs(self, seeds, radius=1, graph_id=defaults.graph_id): + """ + Parameters + ---------- + + Returns + ------- + + Examples + -------- + >>> + """ + # FIXME: finish docstring above + + if not isinstance(seeds, list): + seeds = [seeds] + batched_ego_graphs_result = self.__client.batched_ego_graphs(seeds, + radius, + graph_id) + + # FIXME: ensure dtypes are correct for values returned from + # cugraph.batched_ego_graphs() in cugraph_handler.py + # return (numpy.frombuffer(batched_ego_graphs_result.src_verts, + # dtype="int32"), + # numpy.frombuffer(batched_ego_graphs_result.dst_verts, + # dtype="int32"), + # numpy.frombuffer(batched_ego_graphs_result.edge_weights, + # dtype="float64"), + # numpy.frombuffer(batched_ego_graphs_result.seeds_offsets, + # dtype="int64")) + return (batched_ego_graphs_result.src_verts, + batched_ego_graphs_result.dst_verts, + batched_ego_graphs_result.edge_weights, + batched_ego_graphs_result.seeds_offsets) + + @__server_connection + def node2vec(self, start_vertices, max_depth, graph_id=defaults.graph_id): + """ + Computes random walks for each node in 'start_vertices', under the + node2vec sampling framework. + + Parameters + ---------- + start_vertices: int or list or cudf.Series or cudf.DataFrame + A single node or a list or a cudf.Series of nodes from which to run + the random walks. In case of multi-column vertices it should be + a cudf.DataFrame. Only supports int32 currently. + + max_depth: int + The maximum depth of the random walks + + Returns + ------- + + Examples + -------- + >>> + """ + # FIXME: finish docstring above + + # start_vertices must be a list (cannot just be an iterable), and + # assume return value is tuple of python lists on host. + if not isinstance(start_vertices, list): + start_vertices = [start_vertices] + # FIXME: ensure list is a list of int32, since Thrift interface + # specifies that? + node2vec_result = self.__client.node2vec(start_vertices, + max_depth, + graph_id) + return (node2vec_result.vertex_paths, + node2vec_result.edge_weights, + node2vec_result.path_sizes) + + @__server_connection + def uniform_neighbor_sample(self, + start_list, + fanout_vals, + with_replacement=True, + graph_id=defaults.graph_id): + """ + Samples the graph and returns the graph id of the sampled + graph. + + Parameters: + start_list: list[int] + + fanout_vals: list[int] + + with_replacement: bool + + graph_id: int, default is defaults.graph_id + + Returns + ------- + The graph id of the sampled graph. + + """ + + return self.__client.uniform_neighbor_sample( + start_list, + fanout_vals, + with_replacement, + graph_id, + ) + + @__server_connection + def pagerank(self, graph_id=defaults.graph_id): + """ + pagerank + """ + raise NotImplementedError + + ########################################################################### + # Test/Debug + @__server_connection + def _get_graph_type(self, graph_id=defaults.graph_id): + """ + Test/debug API for returning a string repr of the graph_id instance. + """ + return self.__client.get_graph_type(graph_id) + + ########################################################################### + # Private + @staticmethod + def __get_vertex_edge_id_obj(id_or_ids): + # FIXME: do not assume all values are int32 + if isinstance(id_or_ids, Sequence): + vert_edge_id_obj = GraphVertexEdgeID(int32_ids=id_or_ids) + else: + vert_edge_id_obj = GraphVertexEdgeID(int32_id=id_or_ids) + return vert_edge_id_obj diff --git a/python/cugraph_service/cugraph_service_client/cugraph_service_thrift.py b/python/cugraph_service/cugraph_service_client/cugraph_service_thrift.py new file mode 100644 index 00000000000..48b85c59eb5 --- /dev/null +++ b/python/cugraph_service/cugraph_service_client/cugraph_service_thrift.py @@ -0,0 +1,254 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io + +import thriftpy2 +from thriftpy2.rpc import make_client +from thriftpy2.protocol import TBinaryProtocolFactory +from thriftpy2.server import TSimpleServer +from thriftpy2.thrift import TProcessor +from thriftpy2.transport import ( + TBufferedTransportFactory, + TServerSocket, + TTransportException, +) + + +# This is the Thrift input file as a string rather than a separate file. This +# allows the Thrift input to be contained within the module that's responsible +# for all Thrift-specific details rather than a separate .thrift file. +# +# thriftpy2 (https://github.com/Thriftpy/thriftpy2) is being used here instead +# of Apache Thrift since it offers an easier-to-use API exclusively for Python +# which is still compatible with servers/cleints using Apache Thrift (Apache +# Thrift can be used from a variety of different languages) while offering +# approximately the same performance. +# +# See the Apache Thrift tutorial for Python for examples: +# https://thrift.apache.org/tutorial/py.html +cugraph_thrift_spec = """ +# FIXME: consider additional, more fine-grained exceptions +exception CugraphServiceError { + 1:string message +} + +struct BatchedEgoGraphsResult { + 1:list src_verts + 2:list dst_verts + 3:list edge_weights + 4:list seeds_offsets +} + +struct Node2vecResult { + 1:list vertex_paths + 2:list edge_weights + 3:list path_sizes +} + +# FIXME: uniform_neighbor_sample may need to return indices as ints +# See: https://github.com/rapidsai/cugraph/issues/2654 +struct UniformNeighborSampleResult { + 1:list sources + 2:list destinations + 3:list indices +} + +union GraphVertexEdgeID { + 1:i32 int32_id + 2:i64 int64_id + 3:list int32_ids + 4:list int64_ids +} + +union Value { + 1:i32 int32_value + 2:i64 int64_value + 3:string string_value + 4:bool bool_value +} + +service CugraphService { + + ############################################################################## + # Environment management + i32 uptime() + + map get_server_info() throws (1:CugraphServiceError e), + + i32 load_graph_creation_extensions(1:string extension_dir_path + ) throws (1:CugraphServiceError e), + + void unload_graph_creation_extensions(), + + i32 call_graph_creation_extension(1:string func_name, + 2:string func_args_repr, + 3:string func_kwargs_repr + ) throws (1:CugraphServiceError e), + + + ############################################################################## + # Graph management + i32 create_graph() throws(1:CugraphServiceError e), + + void delete_graph(1:i32 graph_id) throws (1:CugraphServiceError e), + + list get_graph_ids() throws(1:CugraphServiceError e), + + map get_graph_info(1:list keys, + 2:i32 graph_id + ) throws(1:CugraphServiceError e), + + void load_csv_as_vertex_data(1:string csv_file_name, + 2:string delimiter, + 3:list dtypes, + 4:i32 header, + 5:string vertex_col_name, + 6:string type_name, + 7:list property_columns, + 8:i32 graph_id, + 9:list names + ) throws (1:CugraphServiceError e), + + void load_csv_as_edge_data(1:string csv_file_name, + 2:string delimiter, + 3:list dtypes, + 4:i32 header, + 5:list vertex_col_names, + 6:string type_name, + 7:list property_columns, + 8:i32 graph_id, + 9:list names + ) throws (1:CugraphServiceError e), + + list get_edge_IDs_for_vertices(1:list src_vert_IDs, + 2:list dst_vert_IDs, + 3:i32 graph_id + ) throws (1:CugraphServiceError e), + + i32 extract_subgraph(1:string create_using, + 2:string selection, + 3:string edge_weight_property, + 4:double default_edge_weight, + 5:bool allow_multi_edges, + 6:bool renumber_graph, + 7:bool add_edge_data, + 8:i32 graph_id + ) throws (1:CugraphServiceError e), + + binary get_graph_vertex_data(1:GraphVertexEdgeID vertex_id, + 2:Value null_replacement_value, + 3:i32 graph_id, + 4:list property_keys + ) throws (1:CugraphServiceError e), + + binary get_graph_edge_data(1:GraphVertexEdgeID edge_id, + 2:Value null_replacement_value + 3:i32 graph_id, + 4:list property_keys + ) throws (1:CugraphServiceError e), + + bool is_vertex_property(1:string property_key, + 2:i32 graph_id) throws (1:CugraphServiceError e), + + bool is_edge_property(1:string property_key, + 2:i32 graph_id) throws (1:CugraphServiceError e), + + ############################################################################## + # Algos + BatchedEgoGraphsResult + batched_ego_graphs(1:list seeds, + 2:i32 radius, + 3:i32 graph_id + ) throws (1:CugraphServiceError e), + + Node2vecResult + node2vec(1:list start_vertices, + 2:i32 max_depth, + 3:i32 graph_id + ) throws (1:CugraphServiceError e), + + UniformNeighborSampleResult + uniform_neighbor_sample(1:list start_list, + 2:list fanout_vals, + 3:bool with_replacement, + 4:i32 graph_id + ) throws (1:CugraphServiceError e), + + ############################################################################## + # Test/Debug + string get_graph_type(1:i32 graph_id) throws(1:CugraphServiceError e), +} +""" + +# Load the cugraph Thrift specification on import. Syntax errors and other +# problems will be apparent immediately on import, and it allows any other +# module to import this and access the various types defined in the Thrift +# specification without being exposed to the thriftpy2 API. +spec = thriftpy2.load_fp(io.StringIO(cugraph_thrift_spec), + module_name="cugraph_thrift") + + +def create_server(handler, host, port, client_timeout=90000): + """ + Return a server object configured to listen on host/port and use the + handler object to handle calls from clients. The handler object must have + an interface compatible with the CugraphService service defined in the + Thrift specification. + + Note: This function is defined here in order to allow it to have easy + access to the Thrift spec loaded here on import, and to keep all thriftpy2 + calls in this module. However, this function is likely only called from the + cugraph_service_server package which depends on the code in this package. + """ + proto_factory = TBinaryProtocolFactory() + trans_factory = TBufferedTransportFactory() + client_timeout = client_timeout + + processor = TProcessor(spec.CugraphService, handler) + server_socket = TServerSocket(host=host, port=port, + client_timeout=client_timeout) + server = TSimpleServer(processor, server_socket, + iprot_factory=proto_factory, + itrans_factory=trans_factory) + return server + + +def create_client(host, port, call_timeout=90000): + """ + Return a client object that will make calls on a server listening on + host/port. + + The call_timeout value defaults to 90 seconds, and is used for setting the + timeout for server API calls when using the client created here - if a call + does not return in call_timeout milliseconds, an exception is raised. + """ + try: + return make_client(spec.CugraphService, host=host, port=port, + timeout=call_timeout) + except TTransportException: + # Raise a CugraphServiceError in order to completely encapsulate all + # Thrift details in this module. If this was not done, callers of this + # function would have to import thriftpy2 in order to catch the + # TTransportException, which then leaks thriftpy2. + # + # NOTE: normally the CugraphServiceError exception is imported from the + # cugraph_service_client.exceptions module, but since + # cugraph_service_client.exceptions.CugraphServiceError is actually + # defined from the spec in this module, just use it directly from spec. + # + # FIXME: may need to have additional thrift exception handlers + # FIXME: this exception being raised could use more detail + raise spec.CugraphServiceError("could not create a client session " + "with a cugraph_service server") diff --git a/python/cugraph_service/cugraph_service_client/defaults.py b/python/cugraph_service/cugraph_service_client/defaults.py new file mode 100644 index 00000000000..e84d52f7f2f --- /dev/null +++ b/python/cugraph_service/cugraph_service_client/defaults.py @@ -0,0 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +host = "localhost" +port = 9090 +graph_id = 0 diff --git a/python/cugraph_service/cugraph_service_client/exceptions.py b/python/cugraph_service/cugraph_service_client/exceptions.py new file mode 100644 index 00000000000..abf6c81dc21 --- /dev/null +++ b/python/cugraph_service/cugraph_service_client/exceptions.py @@ -0,0 +1,18 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cugraph_service_client.cugraph_service_thrift import spec + +# FIXME: add more fine-grained exceptions! +CugraphServiceError = spec.CugraphServiceError diff --git a/python/cugraph_service/cugraph_service_client/types.py b/python/cugraph_service/cugraph_service_client/types.py new file mode 100644 index 00000000000..8cab495f720 --- /dev/null +++ b/python/cugraph_service/cugraph_service_client/types.py @@ -0,0 +1,83 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy + +from cugraph_service_client.cugraph_service_thrift import spec + +Value = spec.Value +GraphVertexEdgeID = spec.GraphVertexEdgeID +BatchedEgoGraphsResult = spec.BatchedEgoGraphsResult +Node2vecResult = spec.Node2vecResult +UniformNeighborSampleResult = spec.UniformNeighborSampleResult + + +class UnionWrapper: + """ + Provides easy conversions between py objs and Thrift "unions". + """ + def get_py_obj(self): + not_members = set(["default_spec", "thrift_spec", "read", "write"]) + attrs = [a for a in dir(self.union) + if not(a.startswith("_")) and a not in not_members] + for a in attrs: + val = getattr(self.union, a) + if val is not None: + return val + + return None + + +class ValueWrapper(UnionWrapper): + def __init__(self, val, val_name="value"): + if isinstance(val, Value): + self.union = val + elif isinstance(val, int): + if val < 4294967296: + self.union = Value(int32_value=val) + else: + self.union = Value(int64_value=val) + elif isinstance(val, numpy.int32): + self.union = Value(int32_value=int(val)) + elif isinstance(val, numpy.int64): + self.union = Value(int64_value=int(val)) + elif isinstance(val, str): + self.union = Value(string_value=val) + elif isinstance(val, bool): + self.union = Value(bool_value=val) + else: + raise TypeError(f"{val_name} must be one of the " + "following types: [int, str, bool], got " + f"{type(val)}") + + +class GraphVertexEdgeIDWrapper(UnionWrapper): + def __init__(self, val, val_name="id"): + if isinstance(val, GraphVertexEdgeID): + self.union = val + elif isinstance(val, int): + if val >= 4294967296: + self.union = GraphVertexEdgeID(int64_id=val) + else: + self.union = GraphVertexEdgeID(int32_id=val) + elif isinstance(val, list): + # FIXME: this only check the first item, others could be larger + if val[0] >= 4294967296: + self.union = GraphVertexEdgeID(int64_ids=val) + else: + self.union = GraphVertexEdgeID(int32_ids=val) + else: + raise TypeError(f"{val_name} must be one of the " + "following types: [int, list], got " + f"{type(val)}") diff --git a/python/cugraph_service/cugraph_service_server/__init__.py b/python/cugraph_service/cugraph_service_server/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cugraph_service/cugraph_service_server/cugraph_handler.py b/python/cugraph_service/cugraph_service_server/cugraph_handler.py new file mode 100644 index 00000000000..8352ff5c3ec --- /dev/null +++ b/python/cugraph_service/cugraph_service_server/cugraph_handler.py @@ -0,0 +1,820 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +import importlib +import time +import traceback +from inspect import signature + +import numpy as np +import cudf +import dask_cudf +import cugraph +from dask.distributed import Client +from dask_cuda.initialize import initialize as dask_initialize +from cugraph.experimental import PropertyGraph, MGPropertyGraph +from cugraph.dask.comms import comms as Comms +from cugraph import uniform_neighbor_sample +from cugraph.dask import uniform_neighbor_sample as mg_uniform_neighbor_sample +from cugraph.structure.graph_implementation.simpleDistributedGraph import ( + simpleDistributedGraphImpl, +) + +from cugraph_service_client import defaults +from cugraph_service_client.exceptions import CugraphServiceError +from cugraph_service_client.types import ( + BatchedEgoGraphsResult, + Node2vecResult, + UniformNeighborSampleResult, + ValueWrapper, + GraphVertexEdgeIDWrapper, +) + + +def call_algo(sg_algo_func, G, **kwargs): + """ + Calls the appropriate algo function based on the graph G being MG or SG. If + G is SG, sg_algo_func will be called and passed kwargs, otherwise the MG + version of sg_algo_func will be called with kwargs. + """ + is_mg_graph = isinstance(G._Impl, simpleDistributedGraphImpl) + + if sg_algo_func is uniform_neighbor_sample: + if is_mg_graph: + possible_args = ["start_list", "fanout_vals", "with_replacement"] + kwargs_to_pass = {a: kwargs[a] for a in possible_args + if a in kwargs} + data = mg_uniform_neighbor_sample(G, **kwargs_to_pass) + data = data.compute() + else: + possible_args = ["start_list", "fanout_vals", "with_replacement", + "is_edge_ids"] + kwargs_to_pass = {a: kwargs[a] for a in possible_args + if a in kwargs} + data = uniform_neighbor_sample(G, **kwargs_to_pass) + + return UniformNeighborSampleResult( + sources=data.sources.values_host, + destinations=data.destinations.values_host, + indices=data.indices.values_host + ) + + else: + raise RuntimeError(f"internal error: {sg_algo_func} is not supported") + + +class ExtensionServerFacade: + """ + Instances of this class are passed to server extension functions to be used + to access various aspects of the cugraph_service_client server from within + the extension. This provideas a means to insulate the CugraphHandler + (considered here to be the "server") from direct access by end user + extensions, allowing extension code to query/access the server as needed + without giving extensions the ability to call potentially unsafe methods + directly on the CugraphHandler. + + An example is using an instance of a ExtensionServerFacade to allow a Graph + creation extension to query the SG/MG state the server is using in order to + determine how to create a Graph instance. + """ + def __init__(self, cugraph_handler): + self.__handler = cugraph_handler + + @property + def is_mg(self): + return self.__handler.is_mg + + def get_server_info(self): + # The handler returns objects suitable for serialization over RPC so + # convert them to regular py objs since this call is originating + # server-side. + return {k: ValueWrapper(v).get_py_obj() for (k, v) + in self.__handler.get_server_info().items()} + + +class CugraphHandler: + """ + Class which handles RPC requests for a cugraph_service server. + """ + + # The name of the param that should be set to a ExtensionServerFacade + # instance for server extension functions. + __server_facade_extension_param_name = "server" + + def __init__(self): + self.__next_graph_id = defaults.graph_id + 1 + self.__graph_objs = {} + self.__graph_creation_extensions = {} + self.__dask_client = None + self.__dask_cluster = None + self.__start_time = int(time.time()) + + def __del__(self): + self.shutdown_dask_client() + + ########################################################################### + # Environment management + @property + def is_mg(self): + """ + True if the CugraphHandler has multiple GPUs available via a dask + cluster. + """ + return self.__dask_client is not None + + def uptime(self): + """ + Return the server uptime in seconds. This is often used as a "ping". + """ + return int(time.time()) - self.__start_time + + def get_server_info(self): + """ + Returns a dictionary of meta-data about the server. + + Dictionary items are string:union_objs, where union_objs are Value + "unions" used for RPC serialization. + """ + # FIXME: expose self.__dask_client.scheduler_info() as needed + if self.__dask_client is not None: + num_gpus = len(self.__dask_client.scheduler_info()["workers"]) + else: + # The assumption is that cugraph_service server requires at least 1 + # GPU (ie. currently there is no CPU-only version of + # cugraph_service server) + num_gpus = 1 + + return {"num_gpus": ValueWrapper(num_gpus).union} + + def load_graph_creation_extensions(self, extension_dir_path): + """ + Loads ("imports") all modules matching the pattern *_extension.py in + the directory specified by extension_dir_path. + + The modules are searched and their functions are called (if a match is + found) when call_graph_creation_extension() is called. + + """ + extension_dir = Path(extension_dir_path) + + if (not extension_dir.exists()) or (not extension_dir.is_dir()): + raise CugraphServiceError(f"bad directory: {extension_dir}") + + num_files_read = 0 + + for ext_file in extension_dir.glob("*_extension.py"): + module_name = ext_file.stem + spec = importlib.util.spec_from_file_location(module_name, + ext_file) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + self.__graph_creation_extensions[module_name] = module + num_files_read += 1 + + return num_files_read + + def unload_graph_creation_extensions(self): + """ + Removes all graph creation extensions. + """ + self.__graph_creation_extensions.clear() + + def call_graph_creation_extension(self, func_name, + func_args_repr, func_kwargs_repr): + """ + Calls the graph creation extension function func_name and passes it the + eval'd func_args_repr and func_kwargs_repr objects. + + The arg/kwarg reprs are eval'd prior to calling in order to pass actual + python objects to func_name (this is needed to allow arbitrary arg + objects to be serialized as part of the RPC call from the + client). + + func_name cannot be a private name (name starting with __). + + All loaded extension modules are checked when searching for func_name, + and the first extension module that contains it will have its function + called. + """ + if not(func_name.startswith("__")): + for module in self.__graph_creation_extensions.values(): + # Ignore private functions + func = getattr(module, func_name, None) + if func is not None: + func_args = eval(func_args_repr) + func_kwargs = eval(func_kwargs_repr) + func_sig = signature(func) + func_params = list(func_sig.parameters.keys()) + facade_param = self.__server_facade_extension_param_name + + # Graph creation extensions that have the last arg named + # self.__server_facade_extension_param_name are passed a + # ExtensionServerFacade instance to allow them to query the + # "server" in a safe way, if needed. + if (facade_param in func_params): + if func_params[-1] == facade_param: + func_kwargs[facade_param] = \ + ExtensionServerFacade(self) + else: + raise CugraphServiceError( + f"{facade_param}, if specified, must be the " + "last param.") + try: + graph_obj = func(*func_args, **func_kwargs) + except Exception: + # FIXME: raise a more detailed error + raise CugraphServiceError( + f"error running {func_name} : " + f"{traceback.format_exc()}") + return self.__add_graph(graph_obj) + + raise CugraphServiceError( + f"{func_name} is not a graph creation extension") + + def initialize_dask_client(self, dask_scheduler_file=None): + """ + Initialize a dask client to be used for MG operations. + """ + if dask_scheduler_file is not None: + # Env var UCX_MAX_RNDV_RAILS=1 must be set too. + dask_initialize(enable_tcp_over_ucx=True, + enable_nvlink=True, + enable_infiniband=True, + enable_rdmacm=True, + # net_devices="mlx5_0:1", + ) + self.__dask_client = Client(scheduler_file=dask_scheduler_file) + else: + # FIXME: LocalCUDACluster init. Implement when tests are in place. + raise NotImplementedError + + if not Comms.is_initialized(): + Comms.initialize(p2p=True) + + def shutdown_dask_client(self): + """ + Shutdown/cleanup the dask client for this handler instance. + """ + if self.__dask_client is not None: + Comms.destroy() + self.__dask_client.close() + + if self.__dask_cluster is not None: + self.__dask_cluster.close() + self.__dask_cluster = None + + self.__dask_client = None + + ########################################################################### + # Graph management + def create_graph(self): + """ + Create a new graph associated with a new unique graph ID, return the + new graph ID. + """ + pG = self.__create_graph() + return self.__add_graph(pG) + + def delete_graph(self, graph_id): + """ + Remove the graph identified by graph_id from the server. + """ + dG = self.__graph_objs.pop(graph_id, None) + if dG is None: + raise CugraphServiceError(f"invalid graph_id {graph_id}") + + del dG + print(f'deleted graph with id {graph_id}') + + def get_graph_ids(self): + """ + Returns a list of the graph IDs currently in use. + """ + return list(self.__graph_objs.keys()) + + def get_graph_info(self, keys, graph_id): + """ + Returns a dictionary of meta-data about the graph identified by + graph_id. If keys passed, only returns the values in keys. + + Dictionary items are string:union_objs, where union_objs are Value + "unions" used for RPC serialization. + """ + valid_keys = set(["num_vertices", + "num_vertices_from_vertex_data", + "num_edges", + "num_vertex_properties", + "num_edge_properties", + ]) + if len(keys) == 0: + keys = valid_keys + else: + invalid_keys = set(keys) - valid_keys + if len(invalid_keys) != 0: + raise CugraphServiceError(f"got invalid keys: {invalid_keys}") + + G = self._get_graph(graph_id) + info = {} + if isinstance(G, (PropertyGraph, MGPropertyGraph)): + for k in keys: + if k == "num_vertices": + info[k] = G.get_num_vertices() + elif k == "num_vertices_from_vertex_data": + info[k] = G.get_num_vertices(include_edge_data=False) + elif k == "num_edges": + info[k] = G.get_num_edges() + elif k == "num_vertex_properties": + info[k] = len(G.vertex_property_names) + elif k == "num_edge_properties": + info[k] = len(G.edge_property_names) + else: + for k in keys: + if k == "num_vertices": + info[k] = G.number_of_vertices() + elif k == "num_vertices_from_vertex_data": + info[k] = 0 + elif k == "num_edges": + info[k] = G.number_of_edges() + elif k == "num_vertex_properties": + info[k] = 0 + elif k == "num_edge_properties": + info[k] = 0 + + return {key: ValueWrapper(value).union + for (key, value) in info.items()} + + def get_graph_type(self, graph_id): + """ + Returns a string repr of the graph type associated with graph_id. + """ + return repr(type(self._get_graph(graph_id))) + + def load_csv_as_vertex_data(self, + csv_file_name, + delimiter, + dtypes, + header, + vertex_col_name, + type_name, + property_columns, + graph_id, + names + ): + """ + Given a CSV csv_file_name present on the server's file system, read it + and apply it as edge data to the graph specified by graph_id, or the + default graph if not specified. + """ + pG = self._get_graph(graph_id) + if header == -1: + header = "infer" + elif header == -2: + header = None + + if len(names) == 0: + names = None + + # FIXME: error check that file exists + # FIXME: error check that edgelist was read correctly + try: + gdf = self.__get_dataframe_from_csv(csv_file_name, + delimiter=delimiter, + dtypes=dtypes, + header=header, + names=names) + pG.add_vertex_data(gdf, + type_name=type_name, + vertex_col_name=vertex_col_name, + property_columns=property_columns) + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") + + def load_csv_as_edge_data(self, + csv_file_name, + delimiter, + dtypes, + header, + vertex_col_names, + type_name, + property_columns, + graph_id, + names + ): + """ + Given a CSV csv_file_name present on the server's file system, read it + and apply it as vertex data to the graph specified by graph_id, or the + default graph if not specified. + """ + pG = self._get_graph(graph_id) + # FIXME: error check that file exists + # FIXME: error check that edgelist read correctly + if header == -1: + header = "infer" + elif header == -2: + header = None + + if len(names) == 0: + names = None + + try: + gdf = self.__get_dataframe_from_csv(csv_file_name, + delimiter=delimiter, + dtypes=dtypes, + header=header, + names=names) + pG.add_edge_data(gdf, + type_name=type_name, + vertex_col_names=vertex_col_names, + property_columns=property_columns) + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") + + # FIXME: ensure edge IDs can also be filtered by edge type + # See: https://github.com/rapidsai/cugraph/issues/2655 + def get_edge_IDs_for_vertices(self, src_vert_IDs, dst_vert_IDs, graph_id): + """ + Return a list of edge IDs corresponding to the vertex IDs in each of + src_vert_IDs and dst_vert_IDs that, when combined, define an edge in + the graph associated with graph_id. + + For example, if src_vert_IDs is [0, 1, 2] and dst_vert_IDs is [7, 8, 9] + return the edge IDs for edges (0, 7), (1, 8), and (2, 9). + + graph_id must be associated with a Graph extracted from a PropertyGraph + (MG or SG). + """ + G = self._get_graph(graph_id) + if isinstance(G, (PropertyGraph, MGPropertyGraph)): + raise CugraphServiceError("get_edge_IDs_for_vertices() only " + "accepts an extracted subgraph ID, got " + f"an ID for a {type(G)}.") + + return self.__get_edge_IDs_from_graph_edge_data(G, + src_vert_IDs, + dst_vert_IDs) + + def extract_subgraph(self, + create_using, + selection, + edge_weight_property, + default_edge_weight, + allow_multi_edges, + renumber_graph, + add_edge_data, + graph_id + ): + """ + Extract a subgraph, return a new graph ID + """ + pG = self._get_graph(graph_id) + if not(isinstance(pG, (PropertyGraph, MGPropertyGraph))): + raise CugraphServiceError("extract_subgraph() can only be called " + "on a graph with properties.") + # Convert defaults needed for the RPC API into defaults used by + # PropertyGraph.extract_subgraph() + create_using = create_using or cugraph.Graph + selection = selection or None + edge_weight_property = edge_weight_property or None + + # FIXME: create_using and selection should not be strings at this point + + try: + G = pG.extract_subgraph(create_using, + selection, + edge_weight_property, + default_edge_weight, + allow_multi_edges, + renumber_graph, + add_edge_data) + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") + + return self.__add_graph(G) + + def get_graph_vertex_data(self, + id_or_ids, + null_replacement_value, + graph_id, + property_keys): + """ + Returns the vertex data as a serialized numpy array for the given + id_or_ids. null_replacement_value must be provided if the data + contains NA values, since NA values cannot be serialized. + """ + pG = self._get_graph(graph_id) + ids = GraphVertexEdgeIDWrapper(id_or_ids).get_py_obj() + if ids == -1: + ids = None + elif not isinstance(ids, list): + ids = [ids] + if property_keys == []: + columns = None + else: + columns = property_keys + df = pG.get_vertex_data(vertex_ids=ids, columns=columns) + return self.__get_graph_data_as_numpy_bytes(df, null_replacement_value) + + def get_graph_edge_data(self, + id_or_ids, + null_replacement_value, + graph_id, + property_keys): + """ + Returns the edge data as a serialized numpy array for the given + id_or_ids. null_replacement_value must be provided if the data + contains NA values, since NA values cannot be serialized. + """ + pG = self._get_graph(graph_id) + ids = GraphVertexEdgeIDWrapper(id_or_ids).get_py_obj() + if ids == -1: + ids = None + elif not isinstance(ids, list): + ids = [ids] + if property_keys == []: + columns = None + else: + columns = property_keys + df = pG.get_edge_data(edge_ids=ids, columns=columns) + return self.__get_graph_data_as_numpy_bytes(df, null_replacement_value) + + def is_vertex_property(self, property_key, graph_id): + G = self._get_graph(graph_id) + if isinstance(G, (PropertyGraph, MGPropertyGraph)): + return property_key in G.vertex_property_names + + raise CugraphServiceError('Graph does not contain properties') + + def is_edge_property(self, property_key, graph_id): + G = self._get_graph(graph_id) + if isinstance(G, (PropertyGraph, MGPropertyGraph)): + return property_key in G.edge_property_names + + raise CugraphServiceError('Graph does not contain properties') + + ########################################################################### + # Algos + def batched_ego_graphs(self, seeds, radius, graph_id): + """ + """ + # FIXME: finish docstring above + # FIXME: exception handling + G = self._get_graph(graph_id) + # FIXME: write test to catch an MGPropertyGraph being passed in + if isinstance(G, PropertyGraph): + raise CugraphServiceError("batched_ego_graphs() cannot operate " + "directly on a graph with properties, " + "call extract_subgraph() then call " + "batched_ego_graphs() on the extracted " + "subgraph instead.") + try: + # FIXME: update this to use call_algo() + # FIXME: this should not be needed, need to update + # cugraph.batched_ego_graphs to also accept a list + seeds = cudf.Series(seeds, dtype="int32") + (ego_edge_list, seeds_offsets) = \ + cugraph.batched_ego_graphs(G, seeds, radius) + + # batched_ego_graphs_result = BatchedEgoGraphsResult( + # src_verts=ego_edge_list["src"].values_host.tobytes(), #i32 + # dst_verts=ego_edge_list["dst"].values_host.tobytes(), #i32 + # edge_weights=ego_edge_list["weight"].values_host.tobytes(), + # #f64 + # seeds_offsets=seeds_offsets.values_host.tobytes() #i64 + # ) + batched_ego_graphs_result = BatchedEgoGraphsResult( + src_verts=ego_edge_list["src"].values_host, + dst_verts=ego_edge_list["dst"].values_host, + edge_weights=ego_edge_list["weight"].values_host, + seeds_offsets=seeds_offsets.values_host + ) + return batched_ego_graphs_result + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") + + return batched_ego_graphs_result + + def node2vec(self, start_vertices, max_depth, graph_id): + """ + """ + # FIXME: finish docstring above + # FIXME: exception handling + G = self._get_graph(graph_id) + # FIXME: write test to catch an MGPropertyGraph being passed in + if isinstance(G, PropertyGraph): + raise CugraphServiceError("node2vec() cannot operate directly on " + "a graph with properties, call " + "extract_subgraph() then call " + "node2vec() on the extracted subgraph " + "instead.") + + try: + # FIXME: update this to use call_algo() + # FIXME: this should not be needed, need to update cugraph.node2vec + # to also accept a list + start_vertices = cudf.Series(start_vertices, dtype="int32") + + (paths, weights, path_sizes) = \ + cugraph.node2vec(G, start_vertices, max_depth) + + node2vec_result = Node2vecResult( + vertex_paths=paths.values_host, + edge_weights=weights.values_host, + path_sizes=path_sizes.values_host, + ) + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") + + return node2vec_result + + def uniform_neighbor_sample(self, + start_list, + fanout_vals, + with_replacement, + graph_id, + ): + G = self._get_graph(graph_id) + if isinstance(G, (MGPropertyGraph, PropertyGraph)): + raise CugraphServiceError("uniform_neighbor_sample() cannot " + "operate directly on a graph with " + "properties, call extract_subgraph() " + "then call uniform_neighbor_sample() " + "on the extracted subgraph instead.") + + try: + return call_algo( + uniform_neighbor_sample, + G, + start_list=start_list, + fanout_vals=fanout_vals, + with_replacement=with_replacement + ) + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") + + def pagerank(self, graph_id): + """ + """ + raise NotImplementedError + + ########################################################################### + # "Protected" interface - used for both implementation and test/debug. Will + # not be exposed to a cugraph_service client. + def _get_graph(self, graph_id): + """ + Return the cuGraph Graph object associated with graph_id. + + If the graph_id is the default graph ID and the default graph has not + been created, then instantiate a new PropertyGraph as the default graph + and return it. + """ + pG = self.__graph_objs.get(graph_id) + + # Always create the default graph if it does not exist + if pG is None: + if graph_id == defaults.graph_id: + pG = self.__create_graph() + self.__graph_objs[graph_id] = pG + else: + raise CugraphServiceError(f"invalid graph_id {graph_id}") + + return pG + + ########################################################################### + # Private + def __get_dataframe_from_csv(self, + csv_file_name, + delimiter, + dtypes, + header, + names): + """ + Read a CSV into a DataFrame and return it. This will use either a cuDF + DataFrame or a dask_cudf DataFrame based on if the handler is + configured to use a dask cluster or not. + """ + gdf = cudf.read_csv(csv_file_name, + delimiter=delimiter, + dtype=dtypes, + header=header, + names=names) + if self.is_mg: + num_gpus = len(self.__dask_client.scheduler_info()["workers"]) + return dask_cudf.from_cudf(gdf, npartitions=num_gpus) + + return gdf + + def __add_graph(self, G): + """ + Create a new graph ID for G and add G to the internal mapping of + graph ID:graph instance. + """ + gid = self.__next_graph_id + self.__graph_objs[gid] = G + self.__next_graph_id += 1 + return gid + + def __create_graph(self): + """ + Instantiate a graph object using a type appropriate for the handler ( + either SG or MG) + """ + return MGPropertyGraph() if self.is_mg else PropertyGraph() + + # FIXME: consider adding this to PropertyGraph + def __remove_internal_columns(self, pg_column_names): + """ + Removes all column names from pg_column_names that are "internal" (ie. + used for PropertyGraph bookkeeping purposes only) + """ + internal_column_names = [PropertyGraph.vertex_col_name, + PropertyGraph.src_col_name, + PropertyGraph.dst_col_name, + PropertyGraph.type_col_name, + PropertyGraph.edge_id_col_name, + PropertyGraph.vertex_id_col_name, + PropertyGraph.weight_col_name] + + # Create a list of user-visible columns by removing the internals while + # preserving order + user_visible_column_names = list(pg_column_names) + for internal_column_name in internal_column_names: + if internal_column_name in user_visible_column_names: + user_visible_column_names.remove(internal_column_name) + + return user_visible_column_names + + # FIXME: consider adding this to PropertyGraph + def __get_edge_IDs_from_graph_edge_data(self, + G, + src_vert_IDs, + dst_vert_IDs): + """ + Return a list of edge IDs corresponding to the vertex IDs in each of + src_vert_IDs and dst_vert_IDs that, when combined, define an edge in G. + + For example, if src_vert_IDs is [0, 1, 2] and dst_vert_IDs is [7, 8, 9] + return the edge IDs for edges (0, 7), (1, 8), and (2, 9). + + G must have an "edge_data" attribute. + """ + edge_IDs = [] + num_edges = len(src_vert_IDs) + + for i in range(num_edges): + src_mask = G.edge_data[PropertyGraph.src_col_name] == \ + src_vert_IDs[i] + dst_mask = G.edge_data[PropertyGraph.dst_col_name] == \ + dst_vert_IDs[i] + value = (G.edge_data[src_mask & dst_mask] + [PropertyGraph.edge_id_col_name] + ) + + # FIXME: This will compute the result (if using dask) then transfer + # to host memory for each iteration - is there a more efficient + # way? + if self.is_mg: + value = value.compute() + edge_IDs.append(value.values_host[0]) + + return edge_IDs + + def __get_graph_data_as_numpy_bytes(self, + dataframe, + null_replacement_value): + """ + Returns a byte array repr of the vertex or edge graph data. Since the + byte array cannot represent NA values, null_replacement_value must be + provided to be used in place of NAs. + """ + try: + if dataframe is None: + return np.ndarray(shape=(0, 0)).dumps() + elif isinstance(dataframe, dask_cudf.DataFrame): + df = dataframe.compute() + else: + df = dataframe + + # null_replacement_value is a Value "union" + n = ValueWrapper(null_replacement_value).get_py_obj() + + # This needs to be a copy of the df data to replace NA values + # FIXME: should something other than a numpy type be serialized to + # prevent a copy? (note: any other type required to be de-serialzed + # on the client end could add dependencies on the client) + df_numpy = df.to_numpy(na_value=n) + return df_numpy.dumps() + + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") diff --git a/python/cugraph_service/cugraph_service_server/server.py b/python/cugraph_service/cugraph_service_server/server.py new file mode 100644 index 00000000000..27d15d57d09 --- /dev/null +++ b/python/cugraph_service/cugraph_service_server/server.py @@ -0,0 +1,74 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +from pathlib import Path + +from cugraph_service_client import defaults +from cugraph_service_client.cugraph_service_thrift import create_server +from cugraph_service_server.cugraph_handler import CugraphHandler + + +def create_handler(graph_creation_extension_dir=None, + dask_scheduler_file=None): + """ + Create and return a CugraphHandler instance initialized with + options. Setting graph_creation_extension_dir to a valid dir results in the + handler loading graph creation extensions from that dir. + """ + handler = CugraphHandler() + if graph_creation_extension_dir is not None: + handler.load_graph_creation_extensions(graph_creation_extension_dir) + if dask_scheduler_file is not None: + # FIXME: if initialize_dask_client(None) is called, it creates a + # LocalCUDACluster. Add support for this via a different CLI option? + handler.initialize_dask_client(dask_scheduler_file) + return handler + + +def start_server_blocking(handler, host, port): + """ + Start the cugraph_service server on host/port, using handler as the request + handler instance. This call blocks indefinitely until Ctrl-C. + """ + server = create_server(handler, host=host, port=port) + server.serve() # blocks until Ctrl-C (kill -2) + + +if __name__ == "__main__": + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument("--host", + type=str, + default=defaults.host, + help="hostname the server should use, default is " + f"{defaults.host}") + arg_parser.add_argument("--port", + type=int, + default=defaults.port, + help="port the server should listen on, default " + f"is {defaults.port}") + arg_parser.add_argument("--graph-creation-extension-dir", + type=Path, + help="dir to load graph creation extension " + "functions from") + arg_parser.add_argument("--dask-scheduler-file", + type=Path, + help="file generated by a dask scheduler, used " + "for connecting to a dask cluster for MG support") + args = arg_parser.parse_args() + handler = create_handler(args.graph_creation_extension_dir, + args.dask_scheduler_file) + print("Starting the cugraph_service server...", flush=True) + start_server_blocking(handler, args.host, args.port) + print("done.") diff --git a/python/cugraph_service/img/cugraph_service_pict.png b/python/cugraph_service/img/cugraph_service_pict.png new file mode 100644 index 00000000000..20fcace215a Binary files /dev/null and b/python/cugraph_service/img/cugraph_service_pict.png differ diff --git a/python/cugraph_service/img/rapids_logo.png b/python/cugraph_service/img/rapids_logo.png new file mode 100644 index 00000000000..405040836a0 Binary files /dev/null and b/python/cugraph_service/img/rapids_logo.png differ diff --git a/python/cugraph_service/scripts/README.md b/python/cugraph_service/scripts/README.md new file mode 100644 index 00000000000..c9e6014f34b --- /dev/null +++ b/python/cugraph_service/scripts/README.md @@ -0,0 +1,6 @@ +This directory contains various scripts helpful for cugraph_service users and developers. + +The following scripts were copied from https://github.com/rapidsai/multi-gpu-tools and are useful for starting a dask cluster, which is needed by cugraph_service for multi-GPU support. +* `run-dask-process.sh` +* `functions.sh` +* `default-config.sh` diff --git a/python/cugraph_service/scripts/default-config.sh b/python/cugraph_service/scripts/default-config.sh new file mode 100644 index 00000000000..3ed045fc058 --- /dev/null +++ b/python/cugraph_service/scripts/default-config.sh @@ -0,0 +1,38 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THIS_DIR=$(cd $(dirname ${BASH_SOURCE[0]}) && pwd) + +# Most are defined using the bash := or :- syntax, which means they +# will be set only if they were previously unset. The project config +# is loaded first, which gives it the opportunity to override anything +# in this file that uses that syntax. If there are variables in this +# file that should not be overridded by a project, then they will +# simply not use that syntax and override, since these variables are +# read last. +SCRIPTS_DIR=$THIS_DIR + +# These really should be oerridden by the project config! +CONDA_ENV=${CONDA_ENV:-rapids} + +GPUS_PER_NODE=${GPUS_PER_NODE:-8} +WORKER_RMM_POOL_SIZE=${WORKER_RMM_POOL_SIZE:-12G} +DASK_CUDA_INTERFACE=${DASK_CUDA_INTERFACE:-ib0} +DASK_SCHEDULER_PORT=${DASK_SCHEDULER_PORT:-8792} +DASK_DEVICE_MEMORY_LIMIT=${DASK_DEVICE_MEMORY_LIMIT:-auto} +DASK_HOST_MEMORY_LIMIT=${DASK_HOST_MEMORY_LIMIT:-auto} + +BUILD_LOG_FILE=${BUILD_LOG_FILE:-${RESULTS_DIR}/build_log.txt} +SCHEDULER_FILE=${SCHEDULER_FILE:-${WORKSPACE}/dask-scheduler.json} +DATE=${DATE:-$(date --utc "+%Y-%m-%d_%H:%M:%S")_UTC} +ENV_EXPORT_FILE=${ENV_EXPORT_FILE:-${WORKSPACE}/$(basename ${CONDA_ENV})-${DATE}.txt} diff --git a/python/cugraph_service/scripts/functions.sh b/python/cugraph_service/scripts/functions.sh new file mode 100644 index 00000000000..7b5fc7bf305 --- /dev/null +++ b/python/cugraph_service/scripts/functions.sh @@ -0,0 +1,66 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is source'd from script-env.sh to add functions to the +# calling environment, hence no #!/bin/bash as the first line. This +# also assumes the variables used in this file have been defined +# elsewhere. + +NUMARGS=$# +ARGS=$* +function hasArg { + (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") +} + +function logger { + echo -e ">>>> $@" +} + +# Calling "setTee outfile" will cause all stdout and stderr of the +# current script to be output to "tee", which outputs to stdout and +# "outfile" simultaneously. This is useful by allowing a script to +# "tee" itself at any point without being called with tee. +_origFileDescriptorsSaved=0 +function setTee { + if [[ $_origFileDescriptorsSaved == 0 ]]; then + # Save off the original file descr 1 and 2 as 3 and 4 + exec 3>&1 4>&2 + _origFileDescriptorsSaved=1 + fi + teeFile=$1 + # Create a named pipe. + pipeName=$(mktemp -u) + mkfifo $pipeName + # Close the currnet 1 and 2 and restore to original (3, 4) in the + # event this function is called repeatedly. + exec 1>&- 2>&- + exec 1>&3 2>&4 + # Start a tee process reading from the named pipe. Redirect stdout + # and stderr to the named pipe which goes to the tee process. The + # named pipe "file" can be removed and the tee process stays alive + # until the fd is closed. + tee -a < $pipeName $teeFile & + exec > $pipeName 2>&1 + rm $pipeName +} + +# Call this to stop script output from going to "tee" after a prior +# call to setTee. +function unsetTee { + if [[ $_origFileDescriptorsSaved == 1 ]]; then + # Close the current fd 1 and 2 which should stop the tee + # process, then restore 1 and 2 to original (saved as 3, 4). + exec 1>&- 2>&- + exec 1>&3 2>&4 + fi +} diff --git a/python/cugraph_service/scripts/run-dask-process.sh b/python/cugraph_service/scripts/run-dask-process.sh new file mode 100755 index 00000000000..ed5133390ce --- /dev/null +++ b/python/cugraph_service/scripts/run-dask-process.sh @@ -0,0 +1,249 @@ +#!/bin/bash +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THIS_DIR=$(cd $(dirname ${BASH_SOURCE[0]}) && pwd) + +source ${THIS_DIR}/default-config.sh +source ${THIS_DIR}/functions.sh + +# Logs can be written to a specific location by setting the LOGS_DIR +# env var. +LOGS_DIR=${LOGS_DIR:-dask_logs-$$} + +######################################## +NUMARGS=$# +ARGS=$* +function hasArg { + (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") +} +VALIDARGS="-h --help scheduler workers --tcp --ucx --ucxib --ucx-ib" +HELP="$0 [ ...] [ ...] + where is: + scheduler - start dask scheduler + workers - start dask workers + and is: + --tcp - initalize a tcp cluster (default) + --ucx - initialize a ucx cluster with NVLink + --ucxib | --ucx-ib - initialize a ucx cluster with IB+NVLink + -h | --help - print this text + + The cluster config order of precedence is any specification on the + command line (--tcp, --ucx, etc.) if provided, then the value of the + env var CLUSTER_CONFIG_TYPE if set, then the default value of tcp. + +" + +# CLUSTER_CONFIG_TYPE defaults to the env var value if set, else TCP +CLUSTER_CONFIG_TYPE=${CLUSTER_CONFIG_TYPE:-TCP} +START_SCHEDULER=0 +START_WORKERS=0 + +if (( ${NUMARGS} == 0 )); then + echo "${HELP}" + exit 0 +else + if hasArg -h || hasArg --help; then + echo "${HELP}" + exit 0 + fi + for a in ${ARGS}; do + if ! (echo " ${VALIDARGS} " | grep -q " ${a} "); then + echo "Invalid option: ${a}" + exit 1 + fi + done +fi + +if hasArg scheduler; then + START_SCHEDULER=1 +fi +if hasArg workers; then + START_WORKERS=1 +fi +# Allow the command line to take precedence +if hasArg --tcp; then + CLUSTER_CONFIG_TYPE=TCP +elif hasArg --ucx; then + CLUSTER_CONFIG_TYPE=UCX +elif hasArg --ucxib || hasArg --ucx-ib; then + CLUSTER_CONFIG_TYPE=UCXIB +fi + +######################################## + +#export DASK_LOGGING__DISTRIBUTED="DEBUG" + +#ulimit -n 100000 + +SCHEDULER_LOG=${LOGS_DIR}/scheduler_log.txt +WORKERS_LOG=${LOGS_DIR}/worker-${HOSTNAME}_log.txt + + +function buildTcpArgs { + export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s" + export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s" + export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s" + export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s" + export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False" + + SCHEDULER_ARGS="--protocol=tcp + --port=$DASK_SCHEDULER_PORT + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--rmm-pool-size=$WORKER_RMM_POOL_SIZE + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + " + +} + +function buildUCXWithInfinibandArgs { + + export UCX_MAX_RNDV_RAILS=1 + export UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda + export DASK_RMM__POOL_SIZE=0.5GB + export DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True + + SCHEDULER_ARGS="--protocol=ucx + --port=$DASK_SCHEDULER_PORT + --interface=$DASK_CUDA_INTERFACE + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--interface=$DASK_CUDA_INTERFACE + --rmm-pool-size=$WORKER_RMM_POOL_SIZE + --rmm-maximum-pool-size=$WORKER_RMM_POOL_SIZE + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + --jit-unspill + " +} + + +function buildUCXwithoutInfinibandArgs { + + export UCX_TCP_CM_REUSEADDR=y + export UCX_MAX_RNDV_RAILS=1 + export UCX_TCP_TX_SEG_SIZE=8M + export UCX_TCP_RX_SEG_SIZE=8M + + export DASK_DISTRIBUTED__COMM__UCX__CUDA_COPY=True + export DASK_DISTRIBUTED__COMM__UCX__TCP=True + export DASK_DISTRIBUTED__COMM__UCX__NVLINK=True + export DASK_DISTRIBUTED__COMM__UCX__INFINIBAND=False + export DASK_DISTRIBUTED__COMM__UCX__RDMACM=False + export DASK_RMM__POOL_SIZE=0.5GB + + + SCHEDULER_ARGS="--protocol=ucx + --port=$DASK_SCHEDULER_PORT + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--enable-tcp-over-ucx + --enable-nvlink + --disable-infiniband + --disable-rdmacm + --rmm-pool-size=$WORKER_RMM_POOL_SIZE + --rmm-maximum-pool-size=$WORKER_RMM_POOL_SIZE + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + --jit-unspill + " +} + +if [[ "$CLUSTER_CONFIG_TYPE" == "UCX" ]]; then + logger "Using cluster configurtion for UCX" + buildUCXwithoutInfinibandArgs +elif [[ "$CLUSTER_CONFIG_TYPE" == "UCXIB" ]]; then + logger "Using cluster configurtion for UCX with Infiniband" + buildUCXWithInfinibandArgs +else + logger "Using cluster configurtion for TCP" + buildTcpArgs +fi + + +######################################## + +scheduler_pid="" +worker_pid="" +num_scheduler_tries=0 + +function startScheduler { + mkdir -p $(dirname $SCHEDULER_FILE) + echo "RUNNING: \"python -m distributed.cli.dask_scheduler $SCHEDULER_ARGS\"" > $SCHEDULER_LOG + dask-scheduler $SCHEDULER_ARGS >> $SCHEDULER_LOG 2>&1 & + scheduler_pid=$! +} + +mkdir -p $LOGS_DIR +logger "Logs written to: $LOGS_DIR" + +if [[ $START_SCHEDULER == 1 ]]; then + rm -f $SCHEDULER_FILE $SCHEDULER_LOG $WORKERS_LOG + + startScheduler + sleep 6 + num_scheduler_tries=$(python -c "print($num_scheduler_tries+1)") + + # Wait for the scheduler to start first before proceeding, since + # it may require several retries (if prior run left ports open + # that need time to close, etc.) + while [ ! -f "$SCHEDULER_FILE" ]; do + scheduler_alive=$(ps -p $scheduler_pid > /dev/null ; echo $?) + if [[ $scheduler_alive != 0 ]]; then + if [[ $num_scheduler_tries != 30 ]]; then + echo "scheduler failed to start, retry #$num_scheduler_tries" + startScheduler + sleep 6 + num_scheduler_tries=$(echo $num_scheduler_tries+1 | bc) + else + echo "could not start scheduler, exiting." + exit 1 + fi + fi + done + echo "scheduler started." +fi + +if [[ $START_WORKERS == 1 ]]; then + rm -f $WORKERS_LOG + while [ ! -f "$SCHEDULER_FILE" ]; do + echo "run-dask-process.sh: $SCHEDULER_FILE not present - waiting to start workers..." + sleep 2 + done + echo "RUNNING: \"python -m dask_cuda.cli.dask_cuda_worker $WORKER_ARGS\"" > $WORKERS_LOG + dask-cuda-worker $WORKER_ARGS >> $WORKERS_LOG 2>&1 & + worker_pid=$! + echo "worker(s) started." +fi + +# This script will not return until the following background process +# have been completed/killed. +if [[ $worker_pid != "" ]]; then + echo "waiting for worker pid $worker_pid to finish before exiting script..." + wait $worker_pid +fi +if [[ $scheduler_pid != "" ]]; then + echo "waiting for scheduler pid $scheduler_pid to finish before exiting script..." + wait $scheduler_pid +fi diff --git a/python/cugraph_service/tests/__init__.py b/python/cugraph_service/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cugraph_service/tests/client1_script.py b/python/cugraph_service/tests/client1_script.py new file mode 100644 index 00000000000..0abedb646b8 --- /dev/null +++ b/python/cugraph_service/tests/client1_script.py @@ -0,0 +1,52 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Script to be used to simulate a cugraph_service client. +""" + +import random +import time +from pathlib import Path + +from cugraph_service_client import CugraphServiceClient + + +_data_dir = (Path(__file__).parent)/"data" + +edgelist_csv_data = { + "karate": {"csv_file_name": + (_data_dir/"karate.csv").absolute().as_posix(), + "dtypes": ["int32", "int32", "float32"], + "num_edges": 156, + }, +} + +client = CugraphServiceClient() + +test_data = edgelist_csv_data["karate"] +client.load_csv_as_edge_data(test_data["csv_file_name"], + dtypes=test_data["dtypes"], + vertex_col_names=["0", "1"], + type_name="") +time.sleep(10) +n = int(random.random() * 1000) + +# print(f"---> starting {n}", flush=True) + +for i in range(1000000): + extracted_gid = client.extract_subgraph(allow_multi_edges=False) + # client.delete_graph(extracted_gid) + # print(f"---> {n}: extracted {extracted_gid}", flush=True) + +# print(f"---> done {n}", flush=True) diff --git a/python/cugraph_service/tests/client2_script.py b/python/cugraph_service/tests/client2_script.py new file mode 100644 index 00000000000..f6538bec2b2 --- /dev/null +++ b/python/cugraph_service/tests/client2_script.py @@ -0,0 +1,34 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Script to be used to simulate a cugraph_service client. +""" +import time +import random + +from cugraph_service_client import CugraphServiceClient + +client = CugraphServiceClient() + +time.sleep(10) +n = int(random.random() * 1000) + +# print(f"---> starting {n}", flush=True) + +for i in range(1000000): + extracted_gid = client.extract_subgraph(allow_multi_edges=False) + # client.delete_graph(extracted_gid) + # print(f"---> {n}: extracted {extracted_gid}", flush=True) + +# print(f"---> done {n}", flush=True) diff --git a/python/cugraph_service/tests/conftest.py b/python/cugraph_service/tests/conftest.py new file mode 100644 index 00000000000..58af3a33a09 --- /dev/null +++ b/python/cugraph_service/tests/conftest.py @@ -0,0 +1,234 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from tempfile import TemporaryDirectory + +import pytest + +graph_creation_extension1_file_contents = """ +import cudf +from cugraph.experimental import PropertyGraph + +def custom_graph_creation_function(server): + edgelist = cudf.DataFrame(columns=['src', 'dst'], + data=[(0, 77), (1, 88), (2, 99)]) + pG = PropertyGraph() + pG.add_edge_data(edgelist, vertex_col_names=('src', 'dst')) + + # smoke test the server object by accesing the "mg" attr + server.is_mg + + return pG +""" + +graph_creation_extension2_file_contents = """ +import cudf +from cugraph.experimental import PropertyGraph + +def __my_private_function(): + pass + +def my_graph_creation_function(arg1:str, arg2:str, arg3:str, server): + edgelist = cudf.DataFrame(columns=[arg1, arg2, arg3], + data=[(0, 1, 2), (88, 99, 77)]) + pG = PropertyGraph() + pG.add_edge_data(edgelist, vertex_col_names=(arg1, arg2)) + return pG +""" + +graph_creation_extension_long_running_file_contents = """ +import time +import cudf +from cugraph.experimental import PropertyGraph + +def long_running_graph_creation_function(server): + time.sleep(10) + pG = PropertyGraph() + return pG +""" + +graph_creation_extension_no_facade_arg_file_contents = """ +import time +import cudf +from cugraph.experimental import PropertyGraph + +def graph_creation_function(arg1, arg2): + time.sleep(10) + pG = PropertyGraph() + return pG +""" + +graph_creation_extension_bad_arg_order_file_contents = """ +import time +import cudf +from cugraph.experimental import PropertyGraph + +def graph_creation_function(server, arg1, arg2): + pG = PropertyGraph() + return pG +""" + +graph_creation_extension_empty_graph_file_contents = """ +import time +import cudf +from cugraph.experimental import PropertyGraph, MGPropertyGraph + +def graph_creation_function(server): + if server.is_mg: + pG = MGPropertyGraph() + else: + pG = PropertyGraph() + return pG +""" + +graph_creation_extension_big_vertex_ids_file_contents = """ +import time +import cudf +import cupy +import dask_cudf +from cugraph.experimental import PropertyGraph, MGPropertyGraph + +def graph_creation_function_vert_and_edge_data_big_vertex_ids(server): + if server.is_mg: + pG = MGPropertyGraph() + else: + pG = PropertyGraph() + big_num = (2**32)+1 + df = cudf.DataFrame({"vert_id":cupy.arange(big_num, big_num+10, + dtype="int64"), + "vert_prop":cupy.arange(big_num+100, big_num+110, + dtype="int64")}) + if server.is_mg: + df = dask_cudf.from_cudf(df, npartitions=2) + pG.add_vertex_data(df, vertex_col_name="vert_id") + + df = cudf.DataFrame({"src":cupy.arange(big_num, big_num+10, dtype="int64"), + "dst":cupy.arange(big_num+1,big_num+11, dtype="int64"), + "edge_prop":cupy.arange(big_num+100, big_num+110, + dtype="int64")}) + if server.is_mg: + df = dask_cudf.from_cudf(df, npartitions=2) + pG.add_edge_data(df, vertex_col_names=["src", "dst"]) + + return pG +""" + + +############################################################################### +# module scope fixtures + +@pytest.fixture(scope="module") +def graph_creation_extension1(): + with TemporaryDirectory() as tmp_extension_dir: + # write graph creation extension .py file + graph_creation_extension_file = open( + Path(tmp_extension_dir) / + "custom_graph_creation_extension.py", + "w") + print(graph_creation_extension1_file_contents, + file=graph_creation_extension_file, + flush=True) + + yield tmp_extension_dir + + +@pytest.fixture(scope="module") +def graph_creation_extension2(): + with TemporaryDirectory() as tmp_extension_dir: + # write graph creation extension .py file + graph_creation_extension_file = open( + Path(tmp_extension_dir) / + "my_graph_creation_extension.py", + "w") + print(graph_creation_extension2_file_contents, + file=graph_creation_extension_file, + flush=True) + + yield tmp_extension_dir + + +@pytest.fixture(scope="module") +def graph_creation_extension_long_running(): + with TemporaryDirectory() as tmp_extension_dir: + # write graph creation extension .py file + graph_creation_extension_file = open( + Path(tmp_extension_dir) / + "long_running_graph_creation_extension.py", + "w") + print(graph_creation_extension_long_running_file_contents, + file=graph_creation_extension_file, + flush=True) + + yield tmp_extension_dir + + +@pytest.fixture(scope="module") +def graph_creation_extension_no_facade_arg(): + with TemporaryDirectory() as tmp_extension_dir: + # write graph creation extension .py file + graph_creation_extension_file = open( + Path(tmp_extension_dir) / + "graph_creation_no_facade_arg_extension.py", + "w") + print(graph_creation_extension_no_facade_arg_file_contents, + file=graph_creation_extension_file, + flush=True) + + yield tmp_extension_dir + + +@pytest.fixture(scope="module") +def graph_creation_extension_bad_arg_order(): + with TemporaryDirectory() as tmp_extension_dir: + # write graph creation extension .py file + graph_creation_extension_file = open( + Path(tmp_extension_dir) / + "graph_creation_bad_arg_order_extension.py", + "w") + print(graph_creation_extension_bad_arg_order_file_contents, + file=graph_creation_extension_file, + flush=True) + + yield tmp_extension_dir + + +@pytest.fixture(scope="module") +def graph_creation_extension_big_vertex_ids(): + with TemporaryDirectory() as tmp_extension_dir: + # write graph creation extension .py file + graph_creation_extension_file = open( + Path(tmp_extension_dir) / + "graph_creation_big_vertex_ids_extension.py", + "w") + print(graph_creation_extension_big_vertex_ids_file_contents, + file=graph_creation_extension_file, + flush=True) + + yield tmp_extension_dir + + +@pytest.fixture(scope="module") +def graph_creation_extension_empty_graph(): + with TemporaryDirectory() as tmp_extension_dir: + # write graph creation extension .py file + graph_creation_extension_file = open( + Path(tmp_extension_dir) / + "graph_creation_empty_graph_extension.py", + "w") + print(graph_creation_extension_empty_graph_file_contents, + file=graph_creation_extension_file, + flush=True) + + yield tmp_extension_dir diff --git a/python/cugraph_service/tests/data.py b/python/cugraph_service/tests/data.py new file mode 100644 index 00000000000..51cb378d92e --- /dev/null +++ b/python/cugraph_service/tests/data.py @@ -0,0 +1,59 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +_data_dir = (Path(__file__).parent)/"data" + +edgelist_csv_data = { + "karate": {"csv_file_name": + (_data_dir/"karate.csv").absolute().as_posix(), + "dtypes": ["int32", "int32", "float32"], + "num_edges": 156, + }, +} + +property_csv_data = { + "merchants": {"csv_file_name": + (_data_dir/"merchants.csv").absolute().as_posix(), + "dtypes": ["int32", "int32", "int32", "float32", "int32", + "string"], + "vert_col_name": "merchant_id", + }, + + "users": {"csv_file_name": + (_data_dir/"users.csv").absolute().as_posix(), + "dtypes": ["int32", "int32", "int32"], + "vert_col_name": "user_id", + }, + + "transactions": {"csv_file_name": + (_data_dir/"transactions.csv").absolute().as_posix(), + "dtypes": ["int32", "int32", "float32", "float32", + "int32", "string"], + "vert_col_names": ("user_id", "merchant_id"), + }, + + "relationships": {"csv_file_name": + (_data_dir/"relationships.csv").absolute().as_posix(), + "dtypes": ["int32", "int32", "int32"], + "vert_col_names": ("user_id_1", "user_id_2"), + }, + + "referrals": {"csv_file_name": + (_data_dir/"referrals.csv").absolute().as_posix(), + "dtypes": ["int32", "int32", "int32", "int32"], + "vert_col_names": ("user_id_1", "user_id_2"), + }, +} diff --git a/python/cugraph_service/tests/data/karate.csv b/python/cugraph_service/tests/data/karate.csv new file mode 100644 index 00000000000..4ed9f4356a9 --- /dev/null +++ b/python/cugraph_service/tests/data/karate.csv @@ -0,0 +1,156 @@ +1 0 1.0 +2 0 1.0 +3 0 1.0 +4 0 1.0 +5 0 1.0 +6 0 1.0 +7 0 1.0 +8 0 1.0 +10 0 1.0 +11 0 1.0 +12 0 1.0 +13 0 1.0 +17 0 1.0 +19 0 1.0 +21 0 1.0 +31 0 1.0 +2 1 1.0 +3 1 1.0 +7 1 1.0 +13 1 1.0 +17 1 1.0 +19 1 1.0 +21 1 1.0 +30 1 1.0 +3 2 1.0 +7 2 1.0 +8 2 1.0 +9 2 1.0 +13 2 1.0 +27 2 1.0 +28 2 1.0 +32 2 1.0 +7 3 1.0 +12 3 1.0 +13 3 1.0 +6 4 1.0 +10 4 1.0 +6 5 1.0 +10 5 1.0 +16 5 1.0 +16 6 1.0 +30 8 1.0 +32 8 1.0 +33 8 1.0 +33 9 1.0 +33 13 1.0 +32 14 1.0 +33 14 1.0 +32 15 1.0 +33 15 1.0 +32 18 1.0 +33 18 1.0 +33 19 1.0 +32 20 1.0 +33 20 1.0 +32 22 1.0 +33 22 1.0 +25 23 1.0 +27 23 1.0 +29 23 1.0 +32 23 1.0 +33 23 1.0 +25 24 1.0 +27 24 1.0 +31 24 1.0 +31 25 1.0 +29 26 1.0 +33 26 1.0 +33 27 1.0 +31 28 1.0 +33 28 1.0 +32 29 1.0 +33 29 1.0 +32 30 1.0 +33 30 1.0 +32 31 1.0 +33 31 1.0 +33 32 1.0 +0 1 1.0 +0 2 1.0 +0 3 1.0 +0 4 1.0 +0 5 1.0 +0 6 1.0 +0 7 1.0 +0 8 1.0 +0 10 1.0 +0 11 1.0 +0 12 1.0 +0 13 1.0 +0 17 1.0 +0 19 1.0 +0 21 1.0 +0 31 1.0 +1 2 1.0 +1 3 1.0 +1 7 1.0 +1 13 1.0 +1 17 1.0 +1 19 1.0 +1 21 1.0 +1 30 1.0 +2 3 1.0 +2 7 1.0 +2 8 1.0 +2 9 1.0 +2 13 1.0 +2 27 1.0 +2 28 1.0 +2 32 1.0 +3 7 1.0 +3 12 1.0 +3 13 1.0 +4 6 1.0 +4 10 1.0 +5 6 1.0 +5 10 1.0 +5 16 1.0 +6 16 1.0 +8 30 1.0 +8 32 1.0 +8 33 1.0 +9 33 1.0 +13 33 1.0 +14 32 1.0 +14 33 1.0 +15 32 1.0 +15 33 1.0 +18 32 1.0 +18 33 1.0 +19 33 1.0 +20 32 1.0 +20 33 1.0 +22 32 1.0 +22 33 1.0 +23 25 1.0 +23 27 1.0 +23 29 1.0 +23 32 1.0 +23 33 1.0 +24 25 1.0 +24 27 1.0 +24 31 1.0 +25 31 1.0 +26 29 1.0 +26 33 1.0 +27 33 1.0 +28 31 1.0 +28 33 1.0 +29 32 1.0 +29 33 1.0 +30 32 1.0 +30 33 1.0 +31 32 1.0 +31 33 1.0 +32 33 1.0 diff --git a/python/cugraph_service/tests/data/merchants.csv b/python/cugraph_service/tests/data/merchants.csv new file mode 100644 index 00000000000..f7e3cea890f --- /dev/null +++ b/python/cugraph_service/tests/data/merchants.csv @@ -0,0 +1,6 @@ +"merchant_id" "merchant_location" "merchant_size" "merchant_sales" "merchant_num_employees" "merchant_name" +11 78750 44 123.2 12 "north" +4 78757 112 234.99 18 "south" +21 44145 83 992.1 27 "east" +16 47906 92 32.43 5 "west" +86 47906 192 2.43 51 "west" diff --git a/python/cugraph_service/tests/data/referrals.csv b/python/cugraph_service/tests/data/referrals.csv new file mode 100644 index 00000000000..66a84642bbd --- /dev/null +++ b/python/cugraph_service/tests/data/referrals.csv @@ -0,0 +1,9 @@ +"user_id_1" "user_id_2" "merchant_id" "stars" +89216 78634 11 5 +89021 89216 4 4 +89021 89216 21 3 +89021 89216 11 3 +89021 78634 21 4 +78634 32431 11 4 +78634 89216 21 3 +78634 89216 21 4 diff --git a/python/cugraph_service/tests/data/relationships.csv b/python/cugraph_service/tests/data/relationships.csv new file mode 100644 index 00000000000..b5a58301740 --- /dev/null +++ b/python/cugraph_service/tests/data/relationships.csv @@ -0,0 +1,6 @@ +"user_id_1" "user_id_2" "relationship_type" +89216 89021 9 +89216 32431 9 +32431 78634 8 +78634 89216 8 +78634 89216 9 diff --git a/python/cugraph_service/tests/data/transactions.csv b/python/cugraph_service/tests/data/transactions.csv new file mode 100644 index 00000000000..591dbeec813 --- /dev/null +++ b/python/cugraph_service/tests/data/transactions.csv @@ -0,0 +1,5 @@ +"user_id" "merchant_id" "volume" "time" "card_num" "card_type" +89021 11 33.2 1639084966.5513437 123456 "MC" +89216 4 0 1639085163.481217 8832 "CASH" +78634 16 72.0 1639084912.567394 4321 "DEBIT" +32431 4 103.2 1639084721.354346 98124 "V" diff --git a/python/cugraph_service/tests/data/users.csv b/python/cugraph_service/tests/data/users.csv new file mode 100644 index 00000000000..4b9de59b5d8 --- /dev/null +++ b/python/cugraph_service/tests/data/users.csv @@ -0,0 +1,5 @@ +"user_id" "user_location" "vertical" +89021 78757 0 +32431 78750 1 +89216 78757 1 +78634 47906 0 diff --git a/python/cugraph_service/tests/demo1.py b/python/cugraph_service/tests/demo1.py new file mode 100644 index 00000000000..6e189a4b7b3 --- /dev/null +++ b/python/cugraph_service/tests/demo1.py @@ -0,0 +1,71 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +from cugraph_service_client import CugraphServiceClient + +# Use the location of this file for finding various data files +this_dir = Path(__file__).parent + +# Use the defaults for host and port (localhost, 9090) +# Assume the server is running and using the same defaults! +client = CugraphServiceClient() + +# Remove any graphs from a previous session! +for gid in client.get_graph_ids(): + client.delete_graph(gid) + +# Add vertex and edge data to the default graph instance (the default graph +# does not require a graph ID to access) The file names specified must be +# visible to the server. + +client.load_csv_as_vertex_data( + (this_dir/"vertex_data.csv").absolute().as_posix(), + dtypes=["int32", "string", "int32"], + vertex_col_name="vertex_id", + header="infer") +client.load_csv_as_edge_data( + (this_dir/"edge_data.csv").absolute().as_posix(), + dtypes=["int32", "int32", "string", "int32"], + vertex_col_names=("src", "dst"), + header="infer") + +# Verify the number of edges +assert client.get_num_edges() == 10000 + +# Run sampling and get a path, need to extract a subgraph first +extracted_gid = client.extract_subgraph(allow_multi_edges=True) +start_vertices = 11 +max_depth = 2 +(vertex_paths, edge_weights, path_sizes) = \ + client.node2vec(start_vertices, max_depth, extracted_gid) + +# Create another graph on the server +graph2 = client.create_graph() + +# Verify that both the default and new graph are present on the server +assert len(client.get_graph_ids()) == 3 + +# Add edge data to the new graph +client.load_csv_as_vertex_data( + (this_dir/"vertex_data.csv").absolute().as_posix(), + dtypes=["int32", "string", "int32"], + vertex_col_name="vertex_id", + header="infer", + graph_id=graph2) + +# Remove the new graph from the server and verify +client.delete_graph(graph2) +assert len(client.get_graph_ids()) == 2 diff --git a/python/cugraph_service/tests/gen_demo_data.py b/python/cugraph_service/tests/gen_demo_data.py new file mode 100644 index 00000000000..c4db2e9b32e --- /dev/null +++ b/python/cugraph_service/tests/gen_demo_data.py @@ -0,0 +1,47 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random + +############################################################################### +# vertex CSV +colors = ["red", "white", "blue", "green", + "yellow", "orange", "black", "purple"] + +with open("vertex_data.csv", "w") as vertex_out: + print("vertex_id color num_stars", file=vertex_out) + + for i in range(1000): + print(f"{i} {random.choice(colors)} {int(random.random() * 10000)}", + file=vertex_out) + + +############################################################################### +# edge CSV +relationship = ["friend", "coworker", "reviewer"] +ids = range(1000) + +with open("edge_data.csv", "w") as edge_out: + print("src dst relationship_type num_interactions", file=edge_out) + + for i in range(10000): + src = random.choice(ids) + dst = random.choice(ids) + while(src == dst): + dst = random.choice(ids) + + print(f"{src} {dst} " + f"{random.choice(relationship)} " + f"{int((random.random() + 1) * 10)}", + file=edge_out) diff --git a/python/cugraph_service/tests/multi_client_test_runner.sh b/python/cugraph_service/tests/multi_client_test_runner.sh new file mode 100644 index 00000000000..70433de0970 --- /dev/null +++ b/python/cugraph_service/tests/multi_client_test_runner.sh @@ -0,0 +1,24 @@ +# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# source this script (ie. do not run it) for easier job control from the shell +# FIXME: change this and/or cugraph_service so PYTHONPATH is not needed +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client1_script.py & +sleep 1 +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client2_script.py & +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client2_script.py & +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client2_script.py & +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client2_script.py & +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client2_script.py & +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client2_script.py & +PYTHONPATH=/Projects/cugraph/python/cugraph_service python client2_script.py diff --git a/python/cugraph_service/tests/test_cugraph_handler.py b/python/cugraph_service/tests/test_cugraph_handler.py new file mode 100644 index 00000000000..ad2cc3d92fe --- /dev/null +++ b/python/cugraph_service/tests/test_cugraph_handler.py @@ -0,0 +1,243 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pickle + +import pytest + + +############################################################################### +# fixtures +# The fixtures used in these tests are defined in conftest.py + + +############################################################################### +# tests + +def test_load_and_call_graph_creation_extension(graph_creation_extension2): + """ + Ensures load_extensions reads the extensions and makes the new APIs they + add available. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + from cugraph_service_client.exceptions import CugraphServiceError + + handler = CugraphHandler() + + extension_dir = graph_creation_extension2 + + # DNE + with pytest.raises(CugraphServiceError): + handler.load_graph_creation_extensions("/path/that/does/not/exist") + + # Exists, but is a file + with pytest.raises(CugraphServiceError): + handler.load_graph_creation_extensions(__file__) + + # Load the extension and call the function defined in it + num_files_read = handler.load_graph_creation_extensions(extension_dir) + assert num_files_read == 1 + + # Private function should not be callable + with pytest.raises(CugraphServiceError): + handler.call_graph_creation_extension("__my_private_function", + "()", "{}") + + # Function which DNE in the extension + with pytest.raises(CugraphServiceError): + handler.call_graph_creation_extension("bad_function_name", + "()", "{}") + + # Wrong number of args + with pytest.raises(CugraphServiceError): + handler.call_graph_creation_extension("my_graph_creation_function", + "('a',)", "{}") + + # This call should succeed and should result in a new PropertyGraph present + # in the handler instance. + new_graph_ID = handler.call_graph_creation_extension( + "my_graph_creation_function", "('a', 'b', 'c')", "{}") + + assert new_graph_ID in handler.get_graph_ids() + + # Inspect the PG and ensure it was created from my_graph_creation_function + pG = handler._get_graph(new_graph_ID) + edge_props = pG.edge_property_names + assert ("c" in edge_props) + + +def test_load_and_unload_graph_creation_extension(graph_creation_extension2): + """ + Ensure extensions can be unloaded. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + from cugraph_service_client.exceptions import CugraphServiceError + + handler = CugraphHandler() + + extension_dir = graph_creation_extension2 + + # Load the extensions and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + new_graph_ID = handler.call_graph_creation_extension( + "my_graph_creation_function", "('a', 'b', 'c')", "{}") + assert new_graph_ID in handler.get_graph_ids() + + # Unload then try to run the same call again, which should fail + handler.unload_graph_creation_extensions() + + with pytest.raises(CugraphServiceError): + handler.call_graph_creation_extension( + "my_graph_creation_function", "('a', 'b', 'c')", "{}") + + +def test_load_and_unload_graph_creation_extension_no_args( + graph_creation_extension1): + """ + Test graph_creation_extension1 which contains an extension with no args. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + handler = CugraphHandler() + + extension_dir = graph_creation_extension1 + + # Load the extensions and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + new_graph_ID = handler.call_graph_creation_extension( + "custom_graph_creation_function", "()", "{}") + assert new_graph_ID in handler.get_graph_ids() + + +def test_load_and_unload_graph_creation_extension_no_facade_arg( + graph_creation_extension_no_facade_arg): + """ + Test an extension that has no facade arg. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + handler = CugraphHandler() + + extension_dir = graph_creation_extension_no_facade_arg + + # Load the extensions and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + new_graph_ID = handler.call_graph_creation_extension( + "graph_creation_function", "('a')", "{'arg2':33}") + assert new_graph_ID in handler.get_graph_ids() + + +def test_load_and_unload_graph_creation_extension_bad_arg_order( + graph_creation_extension_bad_arg_order): + """ + Test an extension that has the facade arg in the wrong position. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + from cugraph_service_client.exceptions import CugraphServiceError + + handler = CugraphHandler() + + extension_dir = graph_creation_extension_bad_arg_order + + # Load the extensions and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + with pytest.raises(CugraphServiceError): + handler.call_graph_creation_extension( + "graph_creation_function", "('a', 'b')", "{}") + + +def test_get_graph_data_large_vertex_ids( + graph_creation_extension_big_vertex_ids): + """ + Test that graphs with large vertex ID values (>int32) are handled. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + + handler = CugraphHandler() + + extension_dir = graph_creation_extension_big_vertex_ids + + # Load the extension and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + new_graph_id = handler.call_graph_creation_extension( + "graph_creation_function_vert_and_edge_data_big_vertex_ids", + "()", "{}") + + invalid_vert_id = 2 + vert_data = handler.get_graph_vertex_data( + id_or_ids=invalid_vert_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(vert_data)) == 0 + + large_vert_id = (2**32)+1 + vert_data = handler.get_graph_vertex_data( + id_or_ids=large_vert_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(vert_data)) == 1 + + invalid_edge_id = (2**32)+1 + edge_data = handler.get_graph_edge_data( + id_or_ids=invalid_edge_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(edge_data)) == 0 + + small_edge_id = 2 + edge_data = handler.get_graph_edge_data( + id_or_ids=small_edge_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(edge_data)) == 1 + + +def test_get_graph_data_empty_graph(graph_creation_extension_empty_graph): + """ + Tests that get_graph_*_data() handles empty graphs correctly. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + + handler = CugraphHandler() + + extension_dir = graph_creation_extension_empty_graph + + # Load the extension and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + new_graph_id = handler.call_graph_creation_extension( + "graph_creation_function", "()", "{}") + + invalid_vert_id = 2 + vert_data = handler.get_graph_vertex_data( + id_or_ids=invalid_vert_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(vert_data)) == 0 + + invalid_edge_id = 2 + edge_data = handler.get_graph_edge_data( + id_or_ids=invalid_edge_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(edge_data)) == 0 diff --git a/python/cugraph_service/tests/test_e2e.py b/python/cugraph_service/tests/test_e2e.py new file mode 100644 index 00000000000..86bffd121dc --- /dev/null +++ b/python/cugraph_service/tests/test_e2e.py @@ -0,0 +1,458 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import subprocess +import time +from collections.abc import Sequence + +import pytest + +from . import data + + +############################################################################### +# fixtures + +@pytest.fixture(scope="module") +def server(graph_creation_extension1): + """ + Start a cugraph_service server, stop it when done with the fixture. This + also uses graph_creation_extension1 to preload a graph creation extension. + """ + from cugraph_service_server import server + from cugraph_service_client import CugraphServiceClient + from cugraph_service_client.exceptions import CugraphServiceError + + server_file = server.__file__ + server_process = None + host = "localhost" + port = 9090 + graph_creation_extension_dir = graph_creation_extension1 + client = CugraphServiceClient(host, port) + + try: + client.uptime() + print("FOUND RUNNING SERVER, ASSUMING IT SHOULD BE USED FOR TESTING!") + yield + + except CugraphServiceError: + # A server was not found, so start one for testing then stop it when + # testing is done. + + # pytest will update sys.path based on the tests it discovers, and for + # this source tree, an entry for the parent of this "tests" directory + # will be added. The parent to this "tests" directory also allows + # imports to find the cugraph_service sources, so in oder to ensure the + # server that's started is also using the same sources, the PYTHONPATH + # env should be set to the sys.path being used in this process. + env_dict = os.environ.copy() + env_dict["PYTHONPATH"] = ":".join(sys.path) + + with subprocess.Popen( + [sys.executable, server_file, + "--host", host, + "--port", str(port), + "--graph-creation-extension-dir", + graph_creation_extension_dir], + env=env_dict, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True) as server_process: + try: + print("\nLaunched cugraph_service server, waiting for it to " + "start...", + end="", flush=True) + max_retries = 10 + retries = 0 + while retries < max_retries: + try: + client.uptime() + print("started.") + break + except CugraphServiceError: + time.sleep(1) + retries += 1 + if retries >= max_retries: + raise RuntimeError("error starting server") + except Exception: + if server_process.poll() is None: + server_process.terminate() + raise + + # yield control to the tests + yield + + # tests are done, now stop the server + print("\nTerminating server...", end="", flush=True) + server_process.terminate() + print("done.", flush=True) + + +@pytest.fixture(scope="function") +def client(server): + """ + Creates a client instance to the running server, closes the client when the + fixture is no longer used by tests. + """ + from cugraph_service_client import CugraphServiceClient, defaults + + client = CugraphServiceClient(defaults.host, defaults.port) + + for gid in client.get_graph_ids(): + client.delete_graph(gid) + + # FIXME: should this fixture always unconditionally unload all extensions? + # client.unload_graph_creation_extensions() + + # yield control to the tests + yield client + + # tests are done, now stop the server + client.close() + + +@pytest.fixture(scope="function") +def client_with_edgelist_csv_loaded(client): + """ + Loads the karate CSV into the default graph on the server. + """ + test_data = data.edgelist_csv_data["karate"] + client.load_csv_as_edge_data(test_data["csv_file_name"], + dtypes=test_data["dtypes"], + vertex_col_names=["0", "1"], + type_name="") + assert client.get_graph_ids() == [0] + return (client, test_data) + + +@pytest.fixture(scope="function") +def client_with_property_csvs_loaded(client): + """ + Loads each of the vertex and edge property CSVs into the default graph on + the server. + """ + merchants = data.property_csv_data["merchants"] + users = data.property_csv_data["users"] + transactions = data.property_csv_data["transactions"] + relationships = data.property_csv_data["relationships"] + referrals = data.property_csv_data["referrals"] + + client.load_csv_as_vertex_data(merchants["csv_file_name"], + dtypes=merchants["dtypes"], + vertex_col_name=merchants["vert_col_name"], + header=0, + type_name="merchants") + client.load_csv_as_vertex_data(users["csv_file_name"], + dtypes=users["dtypes"], + vertex_col_name=users["vert_col_name"], + header=0, + type_name="users") + + client.load_csv_as_edge_data(transactions["csv_file_name"], + dtypes=transactions["dtypes"], + vertex_col_names=transactions[ + "vert_col_names"], + header=0, + type_name="transactions") + client.load_csv_as_edge_data(relationships["csv_file_name"], + dtypes=relationships["dtypes"], + vertex_col_names=relationships[ + "vert_col_names"], + header=0, + type_name="relationships") + client.load_csv_as_edge_data(referrals["csv_file_name"], + dtypes=referrals["dtypes"], + vertex_col_names=referrals["vert_col_names"], + header=0, + type_name="referrals") + + assert client.get_graph_ids() == [0] + return (client, data.property_csv_data) + + +############################################################################### +# tests +def test_get_graph_info_key_types(client_with_property_csvs_loaded): + """ + Tests error handling for info keys passed in. + """ + from cugraph_service_client.exceptions import CugraphServiceError + + (client, test_data) = client_with_property_csvs_loaded + + with pytest.raises(TypeError): + client.get_graph_info(21) # bad key type + with pytest.raises(TypeError): + client.get_graph_info([21, "num_edges"]) # bad key type + with pytest.raises(CugraphServiceError): + client.get_graph_info("21") # bad key value + with pytest.raises(CugraphServiceError): + client.get_graph_info(["21"]) # bad key value + with pytest.raises(CugraphServiceError): + client.get_graph_info(["num_edges", "21"]) # bad key value + + client.get_graph_info() # valid + + +def test_get_num_edges_default_graph(client_with_edgelist_csv_loaded): + (client, test_data) = client_with_edgelist_csv_loaded + assert client.get_graph_info("num_edges") == test_data["num_edges"] + + +def test_load_csv_as_edge_data_nondefault_graph(client): + from cugraph_service_client.exceptions import CugraphServiceError + + test_data = data.edgelist_csv_data["karate"] + + with pytest.raises(CugraphServiceError): + client.load_csv_as_edge_data(test_data["csv_file_name"], + dtypes=test_data["dtypes"], + vertex_col_names=["0", "1"], + type_name="", + graph_id=9999) + + +def test_get_num_edges_nondefault_graph(client_with_edgelist_csv_loaded): + from cugraph_service_client.exceptions import CugraphServiceError + + (client, test_data) = client_with_edgelist_csv_loaded + # Bad graph ID + with pytest.raises(CugraphServiceError): + client.get_graph_info("num_edges", graph_id=9999) + + new_graph_id = client.create_graph() + client.load_csv_as_edge_data(test_data["csv_file_name"], + dtypes=test_data["dtypes"], + vertex_col_names=["0", "1"], + type_name="", + graph_id=new_graph_id) + + assert client.get_graph_info("num_edges") == test_data["num_edges"] + assert client.get_graph_info("num_edges", graph_id=new_graph_id) \ + == test_data["num_edges"] + + +def test_node2vec(client_with_edgelist_csv_loaded): + (client, test_data) = client_with_edgelist_csv_loaded + extracted_gid = client.extract_subgraph() + start_vertices = 11 + max_depth = 2 + (vertex_paths, edge_weights, path_sizes) = \ + client.node2vec(start_vertices, max_depth, extracted_gid) + # FIXME: consider a more thorough test + assert isinstance(vertex_paths, list) and len(vertex_paths) + assert isinstance(edge_weights, list) and len(edge_weights) + assert isinstance(path_sizes, list) and len(path_sizes) + + +def test_extract_subgraph(client_with_edgelist_csv_loaded): + (client, test_data) = client_with_edgelist_csv_loaded + Gid = client.extract_subgraph(create_using=None, + selection=None, + edge_weight_property="2", + default_edge_weight=None, + allow_multi_edges=False) + # FIXME: consider a more thorough test + assert Gid in client.get_graph_ids() + + +def test_load_and_call_graph_creation_extension(client, + graph_creation_extension2): + """ + Tests calling a user-defined server-side graph creation extension from the + cugraph_service client. + """ + # The graph_creation_extension returns the tmp dir created which contains + # the extension + extension_dir = graph_creation_extension2 + + num_files_loaded = client.load_graph_creation_extensions(extension_dir) + assert num_files_loaded == 1 + + new_graph_ID = client.call_graph_creation_extension( + "my_graph_creation_function", "a", "b", "c") + + assert new_graph_ID in client.get_graph_ids() + + # Inspect the PG and ensure it was created from my_graph_creation_function + # FIXME: add client APIs to allow for a more thorough test of the graph + assert client.get_graph_info(["num_edges"], new_graph_ID) == 2 + + +def test_load_and_call_graph_creation_long_running_extension( + client, + graph_creation_extension_long_running): + """ + Tests calling a user-defined server-side graph creation extension from the + cugraph_service client. + """ + # The graph_creation_extension returns the tmp dir created which contains + # the extension + extension_dir = graph_creation_extension_long_running + + num_files_loaded = client.load_graph_creation_extensions(extension_dir) + assert num_files_loaded == 1 + + new_graph_ID = client.call_graph_creation_extension( + "long_running_graph_creation_function") + + assert new_graph_ID in client.get_graph_ids() + + # Inspect the PG and ensure it was created from my_graph_creation_function + # FIXME: add client APIs to allow for a more thorough test of the graph + assert client.get_graph_info(["num_edges"], new_graph_ID) == 0 + + +def test_call_graph_creation_extension(client): + """ + Ensure the graph creation extension preloaded by the server fixture is + callable. + """ + new_graph_ID = client.call_graph_creation_extension( + "custom_graph_creation_function") + + assert new_graph_ID in client.get_graph_ids() + + # Inspect the PG and ensure it was created from + # custom_graph_creation_function + # FIXME: add client APIs to allow for a more thorough test of the graph + assert client.get_graph_info(["num_edges"], new_graph_ID) == 3 + + +def test_get_graph_vertex_data(client_with_property_csvs_loaded): + (client, test_data) = client_with_property_csvs_loaded + + # FIXME: do not hardcode the shape values, get them from the input data. + np_array_all_vertex_data = client.get_graph_vertex_data() + assert np_array_all_vertex_data.shape == (9, 9) + + # The remaining tests get individual vertex data - compare those to the + # all_vertex_data retrieved earlier. + vert_ids = [11, 86, 89021] + np_array = client.get_graph_vertex_data(vert_ids) + assert np_array.shape == (3, 9) + # The 1st element is the vert ID + for (i, vid) in enumerate(vert_ids): + assert np_array[i][0] == vid + + np_array = client.get_graph_vertex_data(11) + assert np_array.shape == (1, 9) + assert np_array[0][0] == 11 + + np_array = client.get_graph_vertex_data(86) + assert np_array.shape == (1, 9) + assert np_array[0][0] == 86 + + +def test_get_graph_edge_data(client_with_property_csvs_loaded): + (client, test_data) = client_with_property_csvs_loaded + + # FIXME: do not hardcode the shape values, get them from the input data. + np_array_all_rows = client.get_graph_edge_data() + assert np_array_all_rows.shape == (17, 11) + + # The remaining tests get individual edge data - compare those to the + # all_edge_data retrieved earlier. + edge_ids = [0, 1, 2] + np_array = client.get_graph_edge_data(edge_ids) + assert np_array.shape == (3, 11) + # The 3rd element is the edge ID + for (i, eid) in enumerate(edge_ids): + assert np_array[i][2] == eid + + np_array = client.get_graph_edge_data(0) + assert np_array.shape == (1, 11) + assert np_array[0][2] == 0 + + np_array = client.get_graph_edge_data(1) + assert np_array.shape == (1, 11) + assert np_array[0][2] == 1 + + +def test_get_graph_info(client_with_property_csvs_loaded): + (client, test_data) = client_with_property_csvs_loaded + + info = client.get_graph_info(["num_vertices", + "num_vertex_properties"]) + data = (info["num_vertices"], + info["num_vertex_properties"]) + # FIXME: do not hardcode values, get them from the input data. + assert data == (9, 7) + + info = client.get_graph_info(["num_edges", "num_edge_properties"]) + data = (info["num_edges"], info["num_edge_properties"]) + # FIXME: do not hardcode values, get them from the input data. + assert data == (17, 7) + + +def test_batched_ego_graphs(client_with_edgelist_csv_loaded): + (client, test_data) = client_with_edgelist_csv_loaded + + extracted_gid = client.extract_subgraph() + + # These are known vertex IDs in the default graph loaded + seeds = [0, 1, 2] + results_lists = client.batched_ego_graphs( + seeds, radius=1, graph_id=extracted_gid) + + (srcs, dsts, weights, seeds_offsets) = results_lists + + assert isinstance(srcs, Sequence) + assert isinstance(dsts, Sequence) + assert isinstance(weights, Sequence) + assert len(srcs) == len(dsts) == len(weights) + + assert isinstance(seeds_offsets, Sequence) + assert len(srcs) == seeds_offsets[-1] + + +def test_get_edge_IDs_for_vertices(client_with_edgelist_csv_loaded): + (client, test_data) = client_with_edgelist_csv_loaded + + extracted_gid = client.extract_subgraph() + + srcs = [1, 2, 3] + dsts = [0, 0, 0] + + edge_IDs = client.get_edge_IDs_for_vertices(srcs, dsts, + graph_id=extracted_gid) + + assert len(edge_IDs) == len(srcs) + + +def test_uniform_neighbor_sampling(client_with_edgelist_csv_loaded): + from cugraph_service_client.exceptions import CugraphServiceError + from cugraph_service_client import defaults + + (client, test_data) = client_with_edgelist_csv_loaded + + start_list = [1, 2, 3] + fanout_vals = [2, 2, 2] + with_replacement = True + + # invalid graph type - default graph is a PG, needs an extracted subgraph + with pytest.raises(CugraphServiceError): + client.uniform_neighbor_sample(start_list=start_list, + fanout_vals=fanout_vals, + with_replacement=with_replacement, + graph_id=defaults.graph_id) + + extracted_gid = client.extract_subgraph(renumber_graph=True) + # Ensure call can be made, assume results verified in other tests + client.uniform_neighbor_sample(start_list=start_list, + fanout_vals=fanout_vals, + with_replacement=with_replacement, + graph_id=extracted_gid) diff --git a/python/cugraph_service/tests/test_mg_cugraph_handler.py b/python/cugraph_service/tests/test_mg_cugraph_handler.py new file mode 100644 index 00000000000..3da3fa82f65 --- /dev/null +++ b/python/cugraph_service/tests/test_mg_cugraph_handler.py @@ -0,0 +1,285 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from pathlib import Path +import pickle + +import pytest + +from . import data + + +############################################################################### +# fixtures + +@pytest.fixture(scope="module") +def mg_handler(): + """ + Creates a cugraph_service handler that uses a dask client. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + + dask_scheduler_file = os.environ.get("SCHEDULER_FILE") + if dask_scheduler_file is None: + raise EnvironmentError("Environment variable SCHEDULER_FILE must be " + "set to the path to a dask scheduler json file") + dask_scheduler_file = Path(dask_scheduler_file) + if not dask_scheduler_file.exists(): + raise FileNotFoundError("env var SCHEDULER_FILE is set to " + f"{dask_scheduler_file}, which does not " + "exist.") + + handler = CugraphHandler() + handler.initialize_dask_client(dask_scheduler_file) + return handler + + +# Make this a function-level fixture so it cleans up the mg_handler after each +# test, allowing other tests to use mg_handler without graphs loaded. +@pytest.fixture(scope="function") +def handler_with_karate_edgelist_loaded(mg_handler): + """ + Loads the karate CSV into the default graph in the handler. + """ + from cugraph_service_client import defaults + + test_data = data.edgelist_csv_data["karate"] + + # Ensure the handler starts with no graphs in memory + for gid in mg_handler.get_graph_ids(): + mg_handler.delete_graph(gid) + + mg_handler.load_csv_as_edge_data(test_data["csv_file_name"], + delimiter=" ", + dtypes=test_data["dtypes"], + header=None, + vertex_col_names=["0", "1"], + type_name="", + property_columns=[], + names=[], + graph_id=defaults.graph_id, + ) + assert mg_handler.get_graph_ids() == [0] + + yield (mg_handler, test_data) + + for gid in mg_handler.get_graph_ids(): + mg_handler.delete_graph(gid) + + +############################################################################### +# tests + +# FIXME: consolidate this with the SG version of this test. +def test_get_graph_data_large_vertex_ids( + mg_handler, + graph_creation_extension_big_vertex_ids, + ): + """ + Test that graphs with large vertex ID values (>int32) are handled. + """ + handler = mg_handler + extension_dir = graph_creation_extension_big_vertex_ids + + # Load the extension and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + new_graph_id = handler.call_graph_creation_extension( + "graph_creation_function_vert_and_edge_data_big_vertex_ids", + "()", "{}") + + invalid_vert_id = 2 + vert_data = handler.get_graph_vertex_data( + id_or_ids=invalid_vert_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(vert_data)) == 0 + + large_vert_id = (2**32)+1 + vert_data = handler.get_graph_vertex_data( + id_or_ids=large_vert_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(vert_data)) == 1 + + invalid_edge_id = (2**32)+1 + edge_data = handler.get_graph_edge_data( + id_or_ids=invalid_edge_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(edge_data)) == 0 + + small_edge_id = 2 + edge_data = handler.get_graph_edge_data( + id_or_ids=small_edge_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(edge_data)) == 1 + + +# FIXME: consolidate this with the SG version of this test. +def test_get_graph_data_empty_graph( + mg_handler, + graph_creation_extension_empty_graph, + ): + """ + Tests that get_graph_*_data() handles empty graphs correctly. + """ + handler = mg_handler + extension_dir = graph_creation_extension_empty_graph + + # Load the extension and ensure it can be called. + handler.load_graph_creation_extensions(extension_dir) + new_graph_id = handler.call_graph_creation_extension( + "graph_creation_function", "()", "{}") + + invalid_vert_id = 2 + vert_data = handler.get_graph_vertex_data( + id_or_ids=invalid_vert_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(vert_data)) == 0 + + invalid_edge_id = 2 + edge_data = handler.get_graph_edge_data( + id_or_ids=invalid_edge_id, + null_replacement_value=0, + graph_id=new_graph_id, + property_keys=None) + + assert len(pickle.loads(edge_data)) == 0 + + +def test_get_edge_IDs_for_vertices(handler_with_karate_edgelist_loaded): + from cugraph_service_client import defaults + + (handler, test_data) = handler_with_karate_edgelist_loaded + + # Use the test/debug API to ensure the correct type was created + assert "MG" in handler.get_graph_type(defaults.graph_id) + + extracted_graph_id = handler.extract_subgraph(create_using=None, + selection=None, + edge_weight_property=None, + default_edge_weight=1.0, + allow_multi_edges=True, + renumber_graph=True, + add_edge_data=True, + graph_id=defaults.graph_id) + + # FIXME: this assumes these are always the first 3 edges in karate, which + # may not be a safe assumption. + eIDs = handler.get_edge_IDs_for_vertices([1, 2, 3], + [0, 0, 0], + extracted_graph_id) + assert eIDs == [0, 1, 2] + + +def test_get_graph_info(handler_with_karate_edgelist_loaded): + """ + get_graph_info() for specific args. + """ + from cugraph_service_client import defaults + from cugraph_service_client.types import ValueWrapper + + (handler, test_data) = handler_with_karate_edgelist_loaded + + # A common use of get_graph_info() is to get the "shape" of the data, + # meaning the number of vertices/edges by the number of properites per + # edge/vertex. + info = handler.get_graph_info(["num_edges", "num_edge_properties"], + defaults.graph_id) + # info is a dictionary containing cugraph_service_client.types.Value objs, + # so access the int32 member directly for easy comparison. + shape = (ValueWrapper(info["num_edges"]).get_py_obj(), + ValueWrapper(info["num_edge_properties"]).get_py_obj()) + assert shape == (156, 1) # The single edge property is the weight + + info = handler.get_graph_info(["num_vertices_from_vertex_data", + "num_vertex_properties"], + defaults.graph_id) + shape = (ValueWrapper(info["num_vertices_from_vertex_data"]).get_py_obj(), + ValueWrapper(info["num_vertex_properties"]).get_py_obj()) + assert shape == (0, 0) + + +def test_get_graph_info_defaults(mg_handler): + """ + Ensure calling get_graph_info() with no args returns the info dict with all + keys present for an empty default graph. + """ + from cugraph_service_client import defaults + from cugraph_service_client.types import ValueWrapper + + handler = mg_handler + + info = handler.get_graph_info([], graph_id=defaults.graph_id) + + expected = {"num_vertices": 0, + "num_vertices_from_vertex_data": 0, + "num_edges": 0, + "num_vertex_properties": 0, + "num_edge_properties": 0, + } + actual = {key: ValueWrapper(val).get_py_obj() + for (key, val) in info.items()} + + assert expected == actual + + +def test_uniform_neighbor_sampling(handler_with_karate_edgelist_loaded): + from cugraph_service_client.exceptions import CugraphServiceError + from cugraph_service_client import defaults + + (handler, test_data) = handler_with_karate_edgelist_loaded + + start_list = [1, 2, 3] + fanout_vals = [2, 2, 2] + with_replacement = True + + # invalid graph type - default graph is a PG, needs an extracted subgraph + with pytest.raises(CugraphServiceError): + handler.uniform_neighbor_sample(start_list=start_list, + fanout_vals=fanout_vals, + with_replacement=with_replacement, + graph_id=defaults.graph_id) + + # FIXME: add test coverage for specifying the edge ID as the + # edge_weight_property, then ensuring the edge ID is returned properly with + # the uniform_neighbor_sample results. + # See: https://github.com/rapidsai/cugraph/issues/2654 + extracted_gid = handler.extract_subgraph(create_using=None, + selection=None, + edge_weight_property=None, + default_edge_weight=1.0, + allow_multi_edges=True, + renumber_graph=True, + add_edge_data=True, + graph_id=defaults.graph_id) + + # Ensure call can be made, assume results verified in other tests + handler.uniform_neighbor_sample(start_list=start_list, + fanout_vals=fanout_vals, + with_replacement=with_replacement, + graph_id=extracted_gid) diff --git a/python/cugraph_service/tests/test_mg_e2e.py b/python/cugraph_service/tests/test_mg_e2e.py new file mode 100644 index 00000000000..79768ca654f --- /dev/null +++ b/python/cugraph_service/tests/test_mg_e2e.py @@ -0,0 +1,180 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import subprocess +import time +from pathlib import Path + +import pytest + +from . import data + + +############################################################################### +# fixtures + +@pytest.fixture(scope="module") +def mg_server(): + """ + Start a cugraph_service server that uses multiple GPUs via a dask + configuration, then stop it when done with the fixture. + """ + from cugraph_service_server import server + from cugraph_service_client import CugraphServiceClient + from cugraph_service_client.exceptions import CugraphServiceError + + server_file = server.__file__ + server_process = None + host = "localhost" + port = 9090 + client = CugraphServiceClient(host, port) + + try: + client.uptime() + print("FOUND RUNNING SERVER, ASSUMING IT SHOULD BE USED FOR TESTING!") + yield + + except CugraphServiceError: + # A server was not found, so start one for testing then stop it when + # testing is done. + + dask_scheduler_file = os.environ.get("SCHEDULER_FILE") + if dask_scheduler_file is None: + raise EnvironmentError("Environment variable SCHEDULER_FILE must " + "be set to the path to a dask scheduler " + "json file") + dask_scheduler_file = Path(dask_scheduler_file) + if not dask_scheduler_file.exists(): + raise FileNotFoundError("env var SCHEDULER_FILE is set to " + f"{dask_scheduler_file}, which does not " + "exist.") + + # pytest will update sys.path based on the tests it discovers, and for + # this source tree, an entry for the parent of this "tests" directory + # will be added. The parent to this "tests" directory also allows + # imports to find the cugraph_service sources, so in oder to ensure the + # server that's started is also using the same sources, the PYTHONPATH + # env should be set to the sys.path being used in this process. + env_dict = os.environ.copy() + env_dict["PYTHONPATH"] = ":".join(sys.path) + + with subprocess.Popen( + [sys.executable, server_file, + "--host", host, + "--port", str(port), + "--dask-scheduler-file", + dask_scheduler_file], + env=env_dict, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True) as server_process: + try: + print("\nLaunched cugraph_service server, waiting for it to " + "start...", + end="", flush=True) + max_retries = 10 + retries = 0 + while retries < max_retries: + try: + client.uptime() + print("started.") + break + except CugraphServiceError: + time.sleep(1) + retries += 1 + if retries >= max_retries: + raise RuntimeError("error starting server") + except Exception: + print(server_process.stdout.read()) + if server_process.poll() is None: + server_process.terminate() + raise + + # yield control to the tests + yield + + # tests are done, now stop the server + print("\nTerminating server...", end="", flush=True) + server_process.terminate() + print("done.", flush=True) + + +@pytest.fixture(scope="function") +def client(mg_server): + """ + Creates a client instance to the running server, closes the client when the + fixture is no longer used by tests. + """ + from cugraph_service_client import CugraphServiceClient, defaults + + client = CugraphServiceClient(defaults.host, defaults.port) + + for gid in client.get_graph_ids(): + client.delete_graph(gid) + + # FIXME: should this fixture always unconditionally unload all extensions? + # client.unload_graph_creation_extensions() + + # yield control to the tests + yield client + + # tests are done, now stop the server + client.close() + + +@pytest.fixture(scope="function") +def client_with_edgelist_csv_loaded(client): + """ + Loads the karate CSV into the default graph on the server. + """ + test_data = data.edgelist_csv_data["karate"] + client.load_csv_as_edge_data(test_data["csv_file_name"], + dtypes=test_data["dtypes"], + vertex_col_names=["0", "1"], + type_name="") + assert client.get_graph_ids() == [0] + return (client, test_data) + + +############################################################################### +# tests + +def test_get_default_graph_info(client_with_edgelist_csv_loaded): + """ + Test to ensure various info on the default graph loaded from the specified + fixture is correct. + """ + (client, test_data) = client_with_edgelist_csv_loaded + + # get_graph_type() is a test/debug API which returns a string repr of the + # graph type. Ideally, users should not need to know the graph type. + assert "MG" in client._get_graph_type() + + assert client.get_graph_info(["num_edges"]) == test_data["num_edges"] + assert client.get_server_info()["num_gpus"] > 1 + + +def test_get_edge_IDs_for_vertices(client_with_edgelist_csv_loaded): + """ + """ + (client, test_data) = client_with_edgelist_csv_loaded + + # get_graph_type() is a test/debug API which returns a string repr of the + # graph type. Ideally, users should not need to know the graph type. + assert "MG" in client._get_graph_type() + + graph_id = client.extract_subgraph(allow_multi_edges=True) + client.get_edge_IDs_for_vertices([1, 2, 3], [0, 0, 0], graph_id)