diff --git a/benchmarks/cugraph-service/pytest-based/bench_cgs_uniform_neighbor_sample.py b/benchmarks/cugraph-service/pytest-based/bench_cgs_uniform_neighbor_sample.py new file mode 100644 index 00000000000..0cc64a2af39 --- /dev/null +++ b/benchmarks/cugraph-service/pytest-based/bench_cgs_uniform_neighbor_sample.py @@ -0,0 +1,264 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import os +from pathlib import Path + +import pytest +import numpy as np + +# If the rapids-pytest-benchmark plugin is installed, the "gpubenchmark" +# fixture will be available automatically. Check that this fixture is available +# by trying to import rapids_pytest_benchmark, and if that fails, set +# "gpubenchmark" to the standard "benchmark" fixture provided by +# pytest-benchmark. +try: + import rapids_pytest_benchmark # noqa: F401 +except ImportError: + import pytest_benchmark + + gpubenchmark = pytest_benchmark.plugin.benchmark + +from cugraph_service_client import CugraphServiceClient +from cugraph_service_client.exceptions import CugraphServiceError +from cugraph_service_client import RemoteGraph +from cugraph_service_server.testing import utils + +from cugraph_benchmarking import params + +_seed = 42 + + +def create_remote_graph(graph_data, is_mg, client): + """ + Create a remote graph instance based on the data to be loaded/generated, + relying on server-side graph creation extensions. + + The server extension is part of the + "cugraph_service_server.testing.benchmark_server_extension" and is loaded + in the ensure_running_service_for_sampling() helper. + """ + # Assume strings are names of datasets in the datasets package + if isinstance(graph_data, str): + gid = client.call_graph_creation_extension( + "create_graph_from_builtin_dataset", graph_data + ) + + # Assume dictionary contains RMAT params + elif isinstance(graph_data, dict): + scale = graph_data["scale"] + num_edges = (2**scale) * graph_data["edgefactor"] + seed = _seed + gid = client.call_graph_creation_extension( + "create_graph_from_rmat_generator", + scale=scale, + num_edges=num_edges, + seed=seed, + mg=is_mg, + ) + else: + raise TypeError(f"graph_data can only be str or dict, got {type(graph_data)}") + + G = RemoteGraph(client, gid) + return G + + +def get_uniform_neighbor_sample_args( + G, seed, batch_size, fanout, with_replacement +): + """ + Return a dictionary containing the args for uniform_neighbor_sample based + on the graph and desired args passed in. For example, if a large start list + and small fanout list is desired, a "large" (based on graph size) list of + valid vert IDs for the graph passed in and a "small" list of fanout values + will be returned. + + The dictionary return value allows for easily supporting other args without + having to maintain an order of values in a return tuple, for example. + """ + if with_replacement not in [True, False]: + raise ValueError(f"got unexpected value {with_replacement=}") + + num_verts = G.number_of_vertices() + + if batch_size > num_verts: + num_start_verts = int(num_verts * 0.25) + else: + num_start_verts = batch_size + + # Create the start_list on the server, since generating a list of actual + # IDs requires unrenumbering steps that cannot be easily done remotely. + start_list = G._client.call_extension( + "gen_vertex_list", G._graph_id, num_start_verts, seed + ) + + return { + "start_list": list(start_list), + "fanout": fanout, + "with_replacement": with_replacement, + } + + +def ensure_running_service_for_sampling(dask_scheduler_file=None, + start_local_cuda_cluster=False): + """ + Returns a tuple containing a Popen object for the running cugraph-service + server subprocess, and a client object connected to it. If a server was + detected already running, the Popen object will be None. + """ + host = "localhost" + port = 9090 + client = CugraphServiceClient(host, port) + server_process = None + + try: + client.uptime() + print("FOUND RUNNING SERVER, ASSUMING IT SHOULD BE USED FOR TESTING!") + + except CugraphServiceError: + # A server was not found, so start one for testing then stop it when + # testing is done. + server_process = utils.start_server_subprocess( + host=host, + port=port, + start_local_cuda_cluster=start_local_cuda_cluster, + dask_scheduler_file=dask_scheduler_file, + ) + + # Ensure the extensions needed for these benchmarks are loaded + required_graph_creation_extension_module = "benchmark_server_extension" + server_data = client.get_server_info() + # .stem excludes .py extensions, so it can match a python module name + loaded_graph_creation_extension_modules = [ + Path(m).stem for m in server_data["graph_creation_extensions"] + ] + if ( + required_graph_creation_extension_module + not in loaded_graph_creation_extension_modules + ): + modules_loaded = client.load_graph_creation_extensions( + "cugraph_service_server.testing.benchmark_server_extension" + ) + if len(modules_loaded) < 1: + raise RuntimeError( + "failed to load graph creation extension " + f"{required_graph_creation_extension_module}" + ) + + loaded_extension_modules = [Path(m).stem for m in server_data["extensions"]] + if required_graph_creation_extension_module not in loaded_extension_modules: + modules_loaded = client.load_extensions( + "cugraph_service_server.testing.benchmark_server_extension" + ) + if len(modules_loaded) < 1: + raise RuntimeError( + "failed to load extension " + f"{required_graph_creation_extension_module}" + ) + + return (server_process, client) + + +def remote_uniform_neighbor_sample(G, start_list, fanout_vals, with_replacement=True): + """ + Calls uniform_neighbor_sample() on the server using the client assigned to + the RemoteGraph instance G. + """ + assert G.is_remote() + result = G._client.uniform_neighbor_sample( + start_list, fanout_vals, with_replacement, graph_id=G._graph_id, result_device=1 + ) + return result + + +@pytest.fixture(scope="module", params=params.graph_obj_fixture_params) +def remote_graph_objs(request): + """ + Fixture that returns a RemoteGraph object populated with graph data and + algo callable based on the parameters. This also ensures a cugraph-service + server if not. + """ + (gpu_config, graph_data) = request.param + server_process = None + + if gpu_config not in ["SG", "SNMG", "MNMG"]: + raise RuntimeError(f"got unexpected gpu_config value: {gpu_config}") + + # Ensure the appropriate server is running + if gpu_config == "SG": + (server_process, cgs_client) = ensure_running_service_for_sampling() + is_mg = False + + elif gpu_config == "SNMG": + dask_scheduler_file = os.environ.get("SCHEDULER_FILE") + if dask_scheduler_file is None: + (server_process, cgs_client) = ensure_running_service_for_sampling( + start_local_cuda_cluster=True + ) + else: + assert Path(dask_scheduler_file).exists() + (server_process, cgs_client) = ensure_running_service_for_sampling( + dask_scheduler_file=dask_scheduler_file + ) + is_mg = True + + else: + raise NotImplementedError(f"{gpu_config=}") + + print("creating graph...") + st = time.perf_counter_ns() + G = create_remote_graph(graph_data, is_mg, cgs_client) + print(f"done creating graph, took {((time.perf_counter_ns() - st) / 1e9)}s") + + uns_func = remote_uniform_neighbor_sample + + yield (G, uns_func) + + del G # is this necessary? + if server_process is not None: + print("\nTerminating server...", end="", flush=True) + server_process.terminate() + server_process.wait(timeout=60) + print("done.", flush=True) + + +################################################################################ +# Benchmarks +@pytest.mark.parametrize("batch_size", params.batch_sizes.values()) +@pytest.mark.parametrize("fanout", [params.fanout_10_25, params.fanout_5_10_15]) +@pytest.mark.parametrize( + "with_replacement", [False], ids=lambda v: f"with_replacement={v}" +) +def bench_cgs_uniform_neighbor_sample( + gpubenchmark, remote_graph_objs, batch_size, fanout, with_replacement +): + (G, uniform_neighbor_sample_func) = remote_graph_objs + + uns_args = get_uniform_neighbor_sample_args( + G, _seed, batch_size, fanout, with_replacement + ) + # print(f"\n{uns_args}") + # FIXME: uniform_neighbor_sample cannot take a np.ndarray for start_list + result = gpubenchmark( + uniform_neighbor_sample_func, + G, + start_list=uns_args["start_list"], + fanout_vals=uns_args["fanout"], + with_replacement=uns_args["with_replacement"], + ) + dtmap = {"int32": 32 // 8, "int64": 64 // 8} + dt = str(result.sources.dtype) + llen = len(result.sources) + print(f"\nresult list len: {llen} (x3), dtype={dt}, total bytes={3*llen*dtmap[dt]}") diff --git a/benchmarks/python_pytest_based/README.md b/benchmarks/cugraph/pytest-based/README.md similarity index 100% rename from benchmarks/python_pytest_based/README.md rename to benchmarks/cugraph/pytest-based/README.md diff --git a/benchmarks/python_pytest_based/bench_algos.py b/benchmarks/cugraph/pytest-based/bench_algos.py similarity index 84% rename from benchmarks/python_pytest_based/bench_algos.py rename to benchmarks/cugraph/pytest-based/bench_algos.py index 6d8758e2230..bdfbbfef0dc 100644 --- a/benchmarks/python_pytest_based/bench_algos.py +++ b/benchmarks/cugraph/pytest-based/bench_algos.py @@ -32,27 +32,39 @@ def setFixtureParamNames(*args, **kwargs): import cugraph from cugraph.structure.number_map import NumberMap from cugraph.testing import utils +from pylibcugraph.testing import gen_fixture_params_product from cugraph.utilities.utils import is_device_version_less_than import rmm -from .params import FIXTURE_PARAMS +from cugraph_benchmarking.params import ( + directed_datasets, + undirected_datasets, + managed_memory, + pool_allocator, +) + +fixture_params = gen_fixture_params_product( + (directed_datasets + undirected_datasets, "ds"), + (managed_memory, "mm"), + (pool_allocator, "pa")) ############################################################################### # Helpers def createGraph(csvFileName, graphType=None): """ - Helper function to create a Graph or DiGraph based on csvFileName. + Helper function to create a Graph (directed or undirected) based on + csvFileName. """ if graphType is None: - # There's potential value in verifying that a DiGraph can be created - # from a undirected dataset, and a Graph from a directed. (For now?) do - # not include those combinations to keep benchmark runtime and - # complexity lower, and assume tests have coverage to verify - # correctness for those combinations. + # There's potential value in verifying that a directed graph can be + # created from a undirected dataset, and an undirected from a directed + # dataset. (For now?) do not include those combinations to keep + # benchmark runtime and complexity lower, and assume tests have + # coverage to verify correctness for those combinations. if "directed" in csvFileName.parts: - graphType = cugraph.structure.graph_classes.DiGraph + graphType = cugraph.Graph(directed=True) else: - graphType = cugraph.structure.graph_classes.Graph + graphType = cugraph.Graph() return cugraph.from_cudf_edgelist( utils.read_csv_file(csvFileName), @@ -93,7 +105,7 @@ def reinitRMM(managed_mem, pool_alloc): # For benchmarks, the operations performed in fixtures are not measured as part # of the benchmark. @pytest.fixture(scope="module", - params=FIXTURE_PARAMS) + params=fixture_params) def edgelistCreated(request): """ Returns a new edgelist created from a CSV, which is specified as part of @@ -111,7 +123,7 @@ def edgelistCreated(request): @pytest.fixture(scope="module", - params=FIXTURE_PARAMS) + params=fixture_params) def graphWithAdjListComputed(request): """ Create a Graph obj from the CSV file in param, compute the adjacency list @@ -127,11 +139,11 @@ def graphWithAdjListComputed(request): @pytest.fixture(scope="module", - params=FIXTURE_PARAMS) + params=fixture_params) def anyGraphWithAdjListComputed(request): """ - Create a Graph (or DiGraph) obj based on the param, compute the adjacency - list and return it. + Create a Graph (directed or undirected) obj based on the param, compute the + adjacency list and return it. """ setFixtureParamNames(request, ["dataset", "managed_mem", "pool_allocator"]) csvFileName = request.param[0] @@ -143,11 +155,11 @@ def anyGraphWithAdjListComputed(request): @pytest.fixture(scope="module", - params=FIXTURE_PARAMS) + params=fixture_params) def anyGraphWithTransposedAdjListComputed(request): """ - Create a Graph (or DiGraph) obj based on the param, compute the transposed - adjacency list and return it. + Create a Graph (directed or undirected) obj based on the param, compute the + transposed adjacency list and return it. """ setFixtureParamNames(request, ["dataset", "managed_mem", "pool_allocator"]) csvFileName = request.param[0] @@ -169,9 +181,9 @@ def bench_create_graph(gpubenchmark, edgelistCreated): renumber=False) -# Creating DiGraphs on small datasets runs in micro-seconds, which results in -# thousands of rounds before the default threshold is met, so lower the -# max_time for this benchmark. +# Creating directed Graphs on small datasets runs in micro-seconds, which +# results in thousands of rounds before the default threshold is met, so lower +# the max_time for this benchmark. @pytest.mark.ETL @pytest.mark.benchmark( warmup=True, @@ -182,7 +194,7 @@ def bench_create_digraph(gpubenchmark, edgelistCreated): gpubenchmark(cugraph.from_cudf_edgelist, edgelistCreated, source="0", destination="1", - create_using=cugraph.structure.graph_classes.DiGraph, + create_using=cugraph.Graph(directed=True), renumber=False) diff --git a/benchmarks/cugraph/pytest-based/bench_cugraph_uniform_neighbor_sample.py b/benchmarks/cugraph/pytest-based/bench_cugraph_uniform_neighbor_sample.py new file mode 100644 index 00000000000..5be7f042c24 --- /dev/null +++ b/benchmarks/cugraph/pytest-based/bench_cugraph_uniform_neighbor_sample.py @@ -0,0 +1,289 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import pytest +import numpy as np +import cupy as cp +from cugraph.testing.mg_utils import start_dask_client, stop_dask_client +import cudf +import dask_cudf + +# If the rapids-pytest-benchmark plugin is installed, the "gpubenchmark" +# fixture will be available automatically. Check that this fixture is available +# by trying to import rapids_pytest_benchmark, and if that fails, set +# "gpubenchmark" to the standard "benchmark" fixture provided by +# pytest-benchmark. +try: + import rapids_pytest_benchmark # noqa: F401 +except ImportError: + import pytest_benchmark + + gpubenchmark = pytest_benchmark.plugin.benchmark + +from cugraph import ( + MultiGraph, + uniform_neighbor_sample, +) +from cugraph.generators import rmat +from cugraph.experimental import datasets +from cugraph.dask import uniform_neighbor_sample as uniform_neighbor_sample_mg + +from cugraph_benchmarking import params + +_seed = 42 + + +def create_graph(graph_data): + """ + Create a graph instance based on the data to be loaded/generated. + """ + # FIXME: need to consider directed/undirected? + G = MultiGraph(directed=True) + + # Assume strings are names of datasets in the datasets package + if isinstance(graph_data, str): + ds = getattr(datasets, graph_data) + edgelist_df = ds.get_edgelist() + # FIXME: edgelist_df should have column names that match the defaults + # for G.from_cudf_edgelist() + G.from_cudf_edgelist( + edgelist_df, source="src", destination="dst", edge_attr="wgt", renumber=True + ) + + # Assume dictionary contains RMAT params + elif isinstance(graph_data, dict): + scale = graph_data["scale"] + num_edges = (2**scale) * graph_data["edgefactor"] + seed = _seed + edgelist_df = rmat( + scale, + num_edges, + 0.57, # from Graph500 + 0.19, # from Graph500 + 0.19, # from Graph500 + seed, + clip_and_flip=False, + scramble_vertex_ids=False, # FIXME: need to understand relevance of this + create_using=None, # None == return edgelist + mg=False, + ) + edgelist_df["weight"] = cp.float32(1) + + G.from_cudf_edgelist( + edgelist_df, + source="src", + destination="dst", + edge_attr="weight", + renumber=True, + ) + + else: + raise TypeError(f"graph_data can only be str or dict, got {type(graph_data)}") + + return G + + +def create_mg_graph(graph_data): + """ + Create a graph instance based on the data to be loaded/generated. + """ + (client, cluster) = start_dask_client( + enable_tcp_over_ucx=False, + enable_infiniband=False, + enable_nvlink=False, + enable_rdmacm=False, + net_devices=None, + ) + # FIXME: need to consider directed/undirected? + G = MultiGraph(directed=True) + + # Assume strings are names of datasets in the datasets package + if isinstance(graph_data, str): + ds = getattr(datasets, graph_data) + edgelist_df = ds.get_edgelist() + # FIXME: edgelist_df should have column names that match the defaults + # for G.from_cudf_edgelist() + edgelist_df = dask_cudf.from_cudf(edgelist_df) + G.from_dask_cudf_edgelist( + edgelist_df, source="src", destination="dst", edge_attr="wgt", renumber=True + ) + + # Assume dictionary contains RMAT params + elif isinstance(graph_data, dict): + scale = graph_data["scale"] + num_edges = (2**scale) * graph_data["edgefactor"] + seed = _seed + edgelist_df = rmat( + scale, + num_edges, + 0.57, # from Graph500 + 0.19, # from Graph500 + 0.19, # from Graph500 + seed, + clip_and_flip=False, + scramble_vertex_ids=False, # FIXME: need to understand relevance of this + create_using=None, # None == return edgelist + mg=True, + ) + edgelist_df["weight"] = np.float32(1) + + G.from_dask_cudf_edgelist( + edgelist_df, + source="src", + destination="dst", + edge_attr="weight", + renumber=True, + ) + + else: + raise TypeError(f"graph_data can only be str or dict, got {type(graph_data)}") + + return (G, client, cluster) + + +def get_uniform_neighbor_sample_args( + G, seed, batch_size, fanout, with_replacement +): + """ + Return a dictionary containing the args for uniform_neighbor_sample based + on the graph and desired args passed in. For example, if a large start list + and small fanout list is desired, a "large" (based on graph size) list of + valid vert IDs for the graph passed in and a "small" list of fanout values + will be returned. + + The dictionary return value allows for easily supporting other args without + having to maintain an order of values in a return tuple, for example. + """ + if with_replacement not in [True, False]: + raise ValueError(f"got unexpected value {with_replacement=}") + + rng = np.random.default_rng(seed) + num_verts = G.number_of_vertices() + + if batch_size > num_verts: + num_start_verts = int(num_verts * 0.25) + else: + num_start_verts = batch_size + + # Create the list of starting vertices by picking num_start_verts random + # ints between 0 and num_verts, then map those to actual vertex IDs. Since + # the randomly-chosen IDs may not map to actual IDs, keep trying until + # num_start_verts have been picked, or max_tries is reached. + assert G.renumbered + start_list_set = set() + max_tries = 10000 + try_num = 0 + while (len(start_list_set) < num_start_verts) and (try_num < max_tries): + internal_vertex_ids_start_list = rng.choice( + num_verts, size=num_start_verts, replace=False + ) + start_list_df = cudf.DataFrame({"vid": internal_vertex_ids_start_list}) + start_list_df = G.unrenumber(start_list_df, "vid") + + if G.is_multi_gpu(): + start_list_series = start_list_df.compute()["vid"] + else: + start_list_series = start_list_df["vid"] + + start_list_series.dropna(inplace=True) + start_list_set.update(set(start_list_series.values_host.tolist())) + try_num += 1 + + start_list = list(start_list_set) + start_list = start_list[:num_start_verts] + assert len(start_list) == num_start_verts + + return { + "start_list": list(start_list), + "fanout": fanout, + "with_replacement": with_replacement, + } + + +@pytest.fixture(scope="module", params=params.graph_obj_fixture_params) +def graph_objs(request): + """ + Fixture that returns a Graph object and algo callable (SG or MG) based on + the parameters. This handles instantiating the correct type (SG or MG) and + populating it with graph data. + """ + (gpu_config, graph_data) = request.param + dask_client = None + dask_cluster = None + + if gpu_config not in ["SG", "SNMG", "MNMG"]: + raise RuntimeError(f"got unexpected gpu_config value: {gpu_config}") + + print("creating graph...") + st = time.perf_counter_ns() + if gpu_config == "SG": + G = create_graph(graph_data) + uns_func = uniform_neighbor_sample + else: + (G, dask_client, dask_cluster) = create_mg_graph(graph_data) + uns_func = uniform_neighbor_sample_mg + def uns_func(*args, **kwargs): + print("running sampling...") + st = time.perf_counter_ns() + result_ddf = uniform_neighbor_sample_mg(*args, **kwargs) + print(f"done running sampling, took {((time.perf_counter_ns() - st) / 1e9)}s") + print("dask compute() results...") + st = time.perf_counter_ns() + sources = result_ddf["sources"].compute().to_cupy() + destinations = result_ddf["destinations"].compute().to_cupy() + indices = result_ddf["indices"].compute().to_cupy() + print(f"done dask compute() results, took {((time.perf_counter_ns() - st) / 1e9)}s") + return (sources, destinations, indices) + + print(f"done creating graph, took {((time.perf_counter_ns() - st) / 1e9)}s") + + yield (G, uns_func) + + if dask_client is not None: + stop_dask_client(dask_client, dask_cluster) + + +################################################################################ +# Benchmarks +@pytest.mark.parametrize("batch_size", params.batch_sizes.values()) +@pytest.mark.parametrize("fanout", [params.fanout_10_25, params.fanout_5_10_15]) +@pytest.mark.parametrize( + "with_replacement", [False], ids=lambda v: f"with_replacement={v}" +) +def bench_cugraph_uniform_neighbor_sample( + gpubenchmark, graph_objs, batch_size, fanout, with_replacement +): + (G, uniform_neighbor_sample_func) = graph_objs + + uns_args = get_uniform_neighbor_sample_args( + G, _seed, batch_size, fanout, with_replacement + ) + # print(f"\n{uns_args}") + # FIXME: uniform_neighbor_sample cannot take a np.ndarray for start_list + result = gpubenchmark( + uniform_neighbor_sample_func, + G, + start_list=uns_args["start_list"], + fanout_vals=uns_args["fanout"], + with_replacement=uns_args["with_replacement"], + ) + dtmap = {"int32": 32 // 8, "int64": 64 // 8} + if isinstance(result, tuple): + dt = str(result[0].dtype) + llen = len(result[0]) + else: + dt = str(result.sources.dtype) + llen = len(result.sources) + print(f"\nresult list len: {llen} (x3), dtype={dt}, total bytes={3*llen*dtmap[dt]}") diff --git a/benchmarks/python_pytest_based/conftest.py b/benchmarks/cugraph/pytest-based/conftest.py similarity index 94% rename from benchmarks/python_pytest_based/conftest.py rename to benchmarks/cugraph/pytest-based/conftest.py index 488fb5c2064..312afb5f824 100644 --- a/benchmarks/python_pytest_based/conftest.py +++ b/benchmarks/cugraph/pytest-based/conftest.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -10,7 +10,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# pytest customizations specific to these benchmarks def pytest_addoption(parser): parser.addoption("--no-rmm-reinit", action="store_true", default=False, diff --git a/benchmarks/python_e2e/README.md b/benchmarks/cugraph/standalone/README.md similarity index 100% rename from benchmarks/python_e2e/README.md rename to benchmarks/cugraph/standalone/README.md diff --git a/benchmarks/python_e2e/benchmark.py b/benchmarks/cugraph/standalone/benchmark.py similarity index 100% rename from benchmarks/python_e2e/benchmark.py rename to benchmarks/cugraph/standalone/benchmark.py diff --git a/benchmarks/python_e2e/cugraph_dask_funcs.py b/benchmarks/cugraph/standalone/cugraph_dask_funcs.py similarity index 100% rename from benchmarks/python_e2e/cugraph_dask_funcs.py rename to benchmarks/cugraph/standalone/cugraph_dask_funcs.py diff --git a/benchmarks/python_e2e/cugraph_funcs.py b/benchmarks/cugraph/standalone/cugraph_funcs.py similarity index 100% rename from benchmarks/python_e2e/cugraph_funcs.py rename to benchmarks/cugraph/standalone/cugraph_funcs.py diff --git a/benchmarks/python_e2e/main.py b/benchmarks/cugraph/standalone/main.py similarity index 100% rename from benchmarks/python_e2e/main.py rename to benchmarks/cugraph/standalone/main.py diff --git a/benchmarks/cugraph/standalone/pylibcugraph_bench.py b/benchmarks/cugraph/standalone/pylibcugraph_bench.py new file mode 100644 index 00000000000..eff3e2e8b7d --- /dev/null +++ b/benchmarks/cugraph/standalone/pylibcugraph_bench.py @@ -0,0 +1,77 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from cugraph_funcs import generate_edgelist + +# Call RMAT +edgelist_df = generate_edgelist(scale=23, edgefactor=16) +srcs = edgelist_df["src"] +dsts = edgelist_df["dst"] +weights = edgelist_df["weight"] +weights = weights.astype("float32") + +print(f"num edges: {len(weights)}") +print() + +######## +import cugraph + +st = time.time() +G2 = cugraph.Graph(directed=True) +print(f"cugraph Graph create time: {time.time()-st}") +G2.from_cudf_edgelist(edgelist_df, source="src", destination="dst", + edge_attr="weight", renumber=True) +st = time.time() +result = cugraph.pagerank(G2, alpha=0.85, tol=1.0e-6, max_iter=500) +print(f"cugraph time: {time.time()-st}") + +######## +import pylibcugraph + +resource_handle = pylibcugraph.experimental.ResourceHandle() +graph_props = pylibcugraph.experimental.GraphProperties( + is_symmetric=False, is_multigraph=False) +st = time.time() +G = pylibcugraph.experimental.SGGraph( + resource_handle, graph_props, srcs, dsts, weights, + store_transposed=True, renumber=True, do_expensive_check=False) +print(f"pylibcugraph Graph create time: {time.time()-st}") +st = time.time() +(vertices, pageranks) = pylibcugraph.experimental.pagerank( + resource_handle, G, None, alpha=0.85, epsilon=1.0e-6, max_iterations=500, + has_initial_guess=False, do_expensive_check=True) +print(f"pylibcugraph time: {time.time()-st} (expensive check)") +st = time.time() +(vertices, pageranks) = pylibcugraph.experimental.pagerank( + resource_handle, G, None, alpha=0.85, epsilon=1.0e-6, max_iterations=500, + has_initial_guess=False, do_expensive_check=False) +print(f"pylibcugraph time: {time.time()-st}") + +######## +print() +vert_to_check = 4800348 +p = result['pagerank'][result['vertex'] == vert_to_check] +print(f"cugraph pagerank for vert: {vert_to_check}: {p.iloc[0]}") + +host_verts = vertices.tolist() +index = host_verts.index(vert_to_check) +print(f"pylibcugraph pagerank for vert: {vert_to_check}: {pageranks[index]}") + +vert_to_check = 268434647 +p = result['pagerank'][result['vertex'] == vert_to_check] +print(f"cugraph pagerank for vert: {vert_to_check}: {p.iloc[0]}") + +index = host_verts.index(vert_to_check) +print(f"pylibcugraph pagerank for vert: {vert_to_check}: {pageranks[index]}") diff --git a/benchmarks/python_e2e/reporting.py b/benchmarks/cugraph/standalone/reporting.py similarity index 100% rename from benchmarks/python_e2e/reporting.py rename to benchmarks/cugraph/standalone/reporting.py diff --git a/benchmarks/python_e2e/run_all_nightly_benches.sh b/benchmarks/cugraph/standalone/run_all_nightly_benches.sh similarity index 97% rename from benchmarks/python_e2e/run_all_nightly_benches.sh rename to benchmarks/cugraph/standalone/run_all_nightly_benches.sh index 0f7299fc00d..38d1c496991 100644 --- a/benchmarks/python_e2e/run_all_nightly_benches.sh +++ b/benchmarks/cugraph/standalone/run_all_nightly_benches.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/benchmarks/pytest.ini b/benchmarks/pytest.ini index 06a67a06040..5c2290fae9d 100644 --- a/benchmarks/pytest.ini +++ b/benchmarks/pytest.ini @@ -1,9 +1,13 @@ [pytest] +pythonpath = + shared/python + +testpaths = + cugraph/pytest_based + cugraph-service/pytest_based + addopts = - --benchmark-warmup=on - --benchmark-warmup-iterations=1 - --benchmark-min-rounds=3 - --benchmark-columns="min, max, mean, stddev, outliers, gpu_mem, rounds" + --benchmark-columns="min, max, mean, stddev, outliers" markers = managedmem_on: RMM managed memory enabled @@ -15,6 +19,32 @@ markers = tiny: tiny datasets directed: directed datasets undirected: undirected datasets + matrix_types: inputs are matrices + nx_types: inputs are NetowrkX Graph objects + cugraph_types: inputs are cuGraph Graph objects + sg: single-GPU + mg: multi-GPU + snmg: single-node multi-GPU + mnmg: multi-node multi-GPU + local: local cugraph + remote: cugraph-service + batch_size_100: batch size of 100 for sampling algos + batch_size_500: batch size of 500 for sampling algos + batch_size_1000: batch size of 1000 for sampling algos + batch_size_2500: batch size of 2500 for sampling algos + batch_size_5000: batch size of 5000 for sampling algos + batch_size_10000: batch size of 10000 for sampling algos + batch_size_20000: batch size of 20000 for sampling algos + batch_size_30000: batch size of 30000 for sampling algos + batch_size_40000: batch size of 40000 for sampling algos + batch_size_50000: batch size of 50000 for sampling algos + batch_size_60000: batch size of 60000 for sampling algos + batch_size_70000: batch size of 70000 for sampling algos + batch_size_80000: batch size of 80000 for sampling algos + batch_size_90000: batch size of 90000 for sampling algos + batch_size_100000: batch size of 100000 for sampling algos + fanout_10_25: fanout [10, 25] for sampling algos + fanout_5_10_15: fanout [5, 10, 15] for sampling algos python_classes = Bench* diff --git a/benchmarks/python_pytest_based/params.py b/benchmarks/python_pytest_based/params.py deleted file mode 100644 index f073e37a824..00000000000 --- a/benchmarks/python_pytest_based/params.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from cugraph.testing import utils -from pathlib import PurePath - -# FIXME: write and use mechanism described here for specifying datasets: -# https://docs.rapids.ai/maintainers/datasets -# FIXME: rlr: soc-twitter-2010.csv crashes with OOM error on my RTX-8000 -UNDIRECTED_DATASETS = [ - pytest.param(PurePath(utils.RAPIDS_DATASET_ROOT_DIR)/"karate.csv", - marks=[pytest.mark.tiny, pytest.mark.undirected]), - pytest.param(PurePath(utils.RAPIDS_DATASET_ROOT_DIR)/"csv/undirected/hollywood.csv", - marks=[pytest.mark.small, pytest.mark.undirected]), - pytest.param(PurePath(utils.RAPIDS_DATASET_ROOT_DIR)/"csv/undirected/europe_osm.csv", - marks=[pytest.mark.undirected]), - # pytest.param("../datasets/csv/undirected/soc-twitter-2010.csv", - # marks=[pytest.mark.undirected]), -] -DIRECTED_DATASETS = [ - pytest.param(PurePath(utils.RAPIDS_DATASET_ROOT_DIR)/"csv/directed/cit-Patents.csv", - marks=[pytest.mark.small, pytest.mark.directed]), - pytest.param(PurePath( - utils.RAPIDS_DATASET_ROOT_DIR)/"csv/directed/soc-LiveJournal1.csv", - marks=[pytest.mark.directed]), -] - -MANAGED_MEMORY = [ - pytest.param(True, - marks=[pytest.mark.managedmem_on]), - pytest.param(False, - marks=[pytest.mark.managedmem_off]), -] - -POOL_ALLOCATOR = [ - pytest.param(True, - marks=[pytest.mark.poolallocator_on]), - pytest.param(False, - marks=[pytest.mark.poolallocator_off]), -] - -FIXTURE_PARAMS = utils.genFixtureParamsProduct( - (DIRECTED_DATASETS + UNDIRECTED_DATASETS, "ds"), - (MANAGED_MEMORY, "mm"), - (POOL_ALLOCATOR, "pa")) diff --git a/benchmarks/python_pytest_based/__init__.py b/benchmarks/shared/python/cugraph_benchmarking/__init__.py similarity index 100% rename from benchmarks/python_pytest_based/__init__.py rename to benchmarks/shared/python/cugraph_benchmarking/__init__.py diff --git a/benchmarks/shared/python/cugraph_benchmarking/params.py b/benchmarks/shared/python/cugraph_benchmarking/params.py new file mode 100644 index 00000000000..74af905c836 --- /dev/null +++ b/benchmarks/shared/python/cugraph_benchmarking/params.py @@ -0,0 +1,142 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +import pytest + +from cugraph.testing import utils +from pylibcugraph.testing.utils import gen_fixture_params + + +# FIXME: omitting soc-twitter-2010.csv due to OOM error on some workstations. +undirected_datasets = [ + pytest.param(Path(utils.RAPIDS_DATASET_ROOT_DIR) / "karate.csv", + marks=[pytest.mark.tiny, pytest.mark.undirected]), + pytest.param(Path(utils.RAPIDS_DATASET_ROOT_DIR) / "csv/undirected/hollywood.csv", + marks=[pytest.mark.small, pytest.mark.undirected]), + pytest.param(Path(utils.RAPIDS_DATASET_ROOT_DIR) / "csv/undirected/europe_osm.csv", + marks=[pytest.mark.undirected]), + # pytest.param("../datasets/csv/undirected/soc-twitter-2010.csv", + # marks=[pytest.mark.undirected]), +] + +directed_datasets = [ + pytest.param(Path(utils.RAPIDS_DATASET_ROOT_DIR) / "csv/directed/cit-Patents.csv", + marks=[pytest.mark.small, pytest.mark.directed]), + pytest.param(Path( + utils.RAPIDS_DATASET_ROOT_DIR) / "csv/directed/soc-LiveJournal1.csv", + marks=[pytest.mark.directed]), +] + +managed_memory = [ + pytest.param(True, + marks=[pytest.mark.managedmem_on]), + pytest.param(False, + marks=[pytest.mark.managedmem_off]), +] + +pool_allocator = [ + pytest.param(True, + marks=[pytest.mark.poolallocator_on]), + pytest.param(False, + marks=[pytest.mark.poolallocator_off]), +] + +sg = pytest.param( + "SG", + marks=[pytest.mark.sg], + id="gpu_config=SG", +) +snmg = pytest.param( + "SNMG", + marks=[pytest.mark.snmg, pytest.mark.mg], + id="gpu_config=SNMG", +) +mnmg = pytest.param( + "MNMG", + marks=[pytest.mark.mnmg, pytest.mark.mg], + id="gpu_config=MNMG", +) + +karate = pytest.param( + "karate", + id="dataset=karate", +) + +# RMAT-generated graph options +_rmat_scales = range(16, 31) +_rmat_edgefactors = [4, 16, 32] +rmat = {} +for scale in _rmat_scales: + for edgefactor in _rmat_edgefactors: + rmat[f"{scale}_{edgefactor}"] = pytest.param( + {"scale": scale, + "edgefactor": edgefactor, + }, + id=f"dataset=rmat_{scale}_{edgefactor}", + ) + +# sampling algos length of start list +_batch_sizes = [100, 500, 1000, 2500, 5000, + 10000, 20000, 30000, 40000, + 50000, 60000, 70000, 80000, + 90000, 100000] +batch_sizes = {} +for bs in _batch_sizes: + batch_sizes[bs] = pytest.param( + bs, + id=f"batch_size={bs}", + marks=[getattr(pytest.mark, f"batch_size_{bs}")], + ) + +# sampling algos fanout size +fanout_10_25 = pytest.param( + [10, 25], + marks=[pytest.mark.fanout_10_25], + id="fanout=10_25", +) +fanout_5_10_15 = pytest.param( + [5, 10, 15], + marks=[pytest.mark.fanout_5_10_15], + id="fanout=5_10_15", +) + +# Parameters for Graph generation fixture +# graph_obj_fixture_params = gen_fixture_params( +# (sg, karate), +# (sg, rmat["16_16"]), +# (sg, rmat["18_16"]), +# (sg, rmat["20_16"]), +# (sg, rmat["25_16"]), +# (sg, rmat["26_16"]), +# (snmg, rmat["26_16"]), +# (snmg, rmat["27_16"]), +# (snmg, rmat["28_16"]), +# (mnmg, rmat["29_16"]), +# (mnmg, rmat["30_16"]), +# ) +graph_obj_fixture_params = gen_fixture_params( + (sg, karate), + (sg, rmat["16_4"]), + (sg, rmat["18_4"]), + (sg, rmat["20_4"]), + (sg, rmat["24_4"]), + (sg, rmat["25_4"]), + (sg, rmat["26_4"]), + (snmg, rmat["26_4"]), + (snmg, rmat["27_4"]), + (snmg, rmat["28_4"]), + (mnmg, rmat["29_4"]), + (mnmg, rmat["30_4"]), +) diff --git a/ci/test.sh b/ci/test.sh index 3b51b51cbfa..6fc060367d5 100755 --- a/ci/test.sh +++ b/ci/test.sh @@ -98,7 +98,7 @@ if hasArg "--run-python-tests"; then echo "Ran Python pytest for cugraph : return code was: $?, test script exit code is now: $EXITCODE" echo "Python benchmarks for cuGraph (running as tests)..." - cd ${CUGRAPH_ROOT}/benchmarks + cd ${CUGRAPH_ROOT}/benchmarks/cugraph pytest -sv -m "managedmem_on and poolallocator_on and tiny" --benchmark-disable echo "Ran Python benchmarks for cuGraph (running as tests) : return code was: $?, test script exit code is now: $EXITCODE" diff --git a/python/cugraph-service/client/cugraph_service_client/remote_graph.py b/python/cugraph-service/client/cugraph_service_client/remote_graph.py index 858ebd14b38..2f3cec21c22 100644 --- a/python/cugraph-service/client/cugraph_service_client/remote_graph.py +++ b/python/cugraph-service/client/cugraph_service_client/remote_graph.py @@ -15,6 +15,7 @@ import numpy as np import importlib +from cugraph_service_client.exceptions import CugraphServiceError from cugraph_service_client.remote_graph_utils import ( _transform_to_backend_dtype, _transform_to_backend_dtype_1d, @@ -65,6 +66,12 @@ def __init__( self.__edge_categorical_dtype = None def __del__(self): + # Assume if a connection cannot be opened that the service is already + # stopped and the delete call can be skipped. + try: + self.__client.open() + except CugraphServiceError: + return self.__client.delete_graph(self.__graph_id) def is_remote(self): @@ -144,10 +151,10 @@ def edges(self, backend=("cudf" if cudf_installed else "numpy")): destination vertex, and edge type. """ + # default edge props include src, dst, edge ID, and edge type np_edges = self.__client.get_graph_edge_data( -1, graph_id=self.__graph_id, - property_keys=[self.src_col_name, self.dst_col_name], ) # Convert edge type to numeric if necessary @@ -219,6 +226,9 @@ def get_num_vertices(self, type=None, *, include_edge_data=True): """ return self.__client.get_num_vertices(type, include_edge_data, self.__graph_id) + def number_of_vertices(self): + return self.get_num_vertices(type=None, include_edge_data=True) + def get_num_edges(self, type=None): """Return the number of all edges or edges of a given type. @@ -234,6 +244,9 @@ def get_num_edges(self, type=None): """ return self.__client.get_num_edges(type, self.__graph_id) + def number_of_edges(self): + return self.get_num_edges(type=None) + def get_vertices(self, selection=None, backend="cudf"): """ Parameters diff --git a/python/cugraph-service/pytest.ini b/python/cugraph-service/pytest.ini index 70f62f96dac..6a0dd36ecec 100644 --- a/python/cugraph-service/pytest.ini +++ b/python/cugraph-service/pytest.ini @@ -12,13 +12,29 @@ # limitations under the License. [pytest] -addopts = - --benchmark-warmup=off - --benchmark-max-time=0 - --benchmark-min-rounds=1 - --benchmark-columns="min, max, mean, rounds" - ## for use with rapids-pytest-benchmark plugin - #--benchmark-gpu-disable - ## for use with pytest-cov plugin - #--cov=cugraph - #--cov-report term-missing:skip-covered +addopts = --benchmark-warmup=off + --benchmark-max-time=0 + --benchmark-min-rounds=1 + --benchmark-columns="min, max, mean, rounds" + ## for use with rapids-pytest-benchmark plugin + #--benchmark-gpu-disable + ## for use with pytest-cov plugin + #--cov=cugraph + #--cov-report term-missing:skip-covered + +markers = sg: single-GPU + mg: multi-GPU + snmg: single-node multi-GPU + mnmg: multi-node multi-GPU + local: local cugraph + remote: cugraph-service + start_list_small: use a "small" start list length for sampling algos + start_list_large: use a "large" start list length for sampling algos + fanout_list_small: use a "small" fanout list length for sampling algos + fanout_list_large: use a "large" fanout list length for sampling algos + +python_files = bench_* + test_* + +python_functions = bench_* + test_* diff --git a/python/cugraph-service/server/cugraph_service_server/__init__.py b/python/cugraph-service/server/cugraph_service_server/__init__.py index 5d13adabdbb..2005f69b3d3 100644 --- a/python/cugraph-service/server/cugraph_service_server/__init__.py +++ b/python/cugraph-service/server/cugraph_service_server/__init__.py @@ -18,24 +18,35 @@ from cugraph_service_server.cugraph_handler import CugraphHandler -def create_handler(graph_creation_extension_dir=None, dask_scheduler_file=None): +def create_handler( + graph_creation_extension_dir=None, + start_local_cuda_cluster=False, + dask_scheduler_file=None, +): """ Create and return a CugraphHandler instance initialized with options. Setting graph_creation_extension_dir to a valid dir results in the handler loading graph creation extensions from that dir. """ handler = CugraphHandler() + if start_local_cuda_cluster and (dask_scheduler_file is not None): + raise ValueError( + "dask_scheduler_file cannot be set if start_local_cuda_cluster is True" + ) + if graph_creation_extension_dir is not None: handler.load_graph_creation_extensions(graph_creation_extension_dir) if dask_scheduler_file is not None: - # FIXME: if initialize_dask_client(None) is called, it creates a - # LocalCUDACluster. Add support for this via a different CLI option? - handler.initialize_dask_client(dask_scheduler_file) + handler.initialize_dask_client(dask_scheduler_file=dask_scheduler_file) + elif start_local_cuda_cluster: + handler.initialize_dask_client() + return handler def start_server_blocking( graph_creation_extension_dir=None, + start_local_cuda_cluster=False, dask_scheduler_file=None, host=defaults.host, port=defaults.port, @@ -50,7 +61,9 @@ def start_server_blocking( starts listening for connections. This call blocks indefinitely until Ctrl-C. """ - handler = create_handler(graph_creation_extension_dir, dask_scheduler_file) + handler = create_handler( + graph_creation_extension_dir, start_local_cuda_cluster, dask_scheduler_file + ) if console_message != "": print(console_message, flush=True) server = create_server(handler, host=host, port=port) diff --git a/python/cugraph-service/server/cugraph_service_server/__main__.py b/python/cugraph-service/server/cugraph_service_server/__main__.py index 6cf71ec69b7..0fe98c9be9c 100644 --- a/python/cugraph-service/server/cugraph_service_server/__main__.py +++ b/python/cugraph-service/server/cugraph_service_server/__main__.py @@ -46,14 +46,34 @@ def main(): help="file generated by a dask scheduler, used " "for connecting to a dask cluster for MG support", ) + arg_parser.add_argument( + "--start-local-cuda-cluster", + action="store_true", + help="use a LocalCUDACluster for multi-GPU", + ) args = arg_parser.parse_args() + msg = "Starting the cugraph_service server " + if (args.dask_scheduler_file is None) and (args.start_local_cuda_cluster is False): + msg += "(single-GPU)..." + elif (args.dask_scheduler_file is not None) and ( + args.start_local_cuda_cluster is True + ): + raise RuntimeError( + "dask-scheduler-file cannot be set if start-local-cuda-cluster is specified" + ) + elif args.dask_scheduler_file is not None: + msg += f"(multi-GPU, scheduler file: {args.dask_scheduler_file})..." + elif args.start_local_cuda_cluster is True: + msg += "(multi-GPU, LocalCUDACluster)..." + start_server_blocking( args.graph_creation_extension_dir, + args.start_local_cuda_cluster, args.dask_scheduler_file, args.host, args.port, - console_message="Starting the cugraph_service server...", + console_message=msg, ) print("done.") diff --git a/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py b/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py index f18f531c28f..c0619dc60f9 100644 --- a/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py +++ b/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py @@ -14,35 +14,41 @@ from functools import cached_property from pathlib import Path - -# FIXME This optional import is required to support graph creation -# extensions that use OGB. It should be removed when a better -# workaround is found. -from cugraph.utilities.utils import import_optional - import importlib import time import traceback import re from inspect import signature import asyncio +import tempfile +# FIXME This optional import is required to support graph creation +# extensions that use OGB. It should be removed when a better +# workaround is found. +from cugraph.utilities.utils import import_optional import numpy as np import cupy as cp import ucp import cudf import dask_cudf -import cugraph +from cugraph import ( + batched_ego_graphs, + uniform_neighbor_sample, + node2vec, + Graph, + MultiGraph, +) from dask.distributed import Client +from dask_cuda import LocalCUDACluster from dask_cuda.initialize import initialize as dask_initialize from cugraph.experimental import PropertyGraph, MGPropertyGraph from cugraph.dask.comms import comms as Comms -from cugraph import uniform_neighbor_sample from cugraph.dask import uniform_neighbor_sample as mg_uniform_neighbor_sample from cugraph.structure.graph_implementation.simpleDistributedGraph import ( simpleDistributedGraphImpl, ) +from cugraph.dask.common.mg_utils import get_visible_devices from cugraph_service_client import defaults from cugraph_service_client import ( @@ -203,70 +209,80 @@ def get_server_info(self): """ # FIXME: expose self.__dask_client.scheduler_info() as needed - return {"num_gpus": ValueWrapper(self.num_gpus).union} + return { + "num_gpus": ValueWrapper(self.num_gpus).union, + "extensions": ValueWrapper(list(self.__extensions.keys())).union, + "graph_creation_extensions": ValueWrapper( + list(self.__graph_creation_extensions.keys()) + ).union, + } - def load_graph_creation_extensions(self, extension_dir_path): + def load_graph_creation_extensions(self, extension_dir_or_mod_path): """ - Loads ("imports") all modules matching the pattern *_extension.py in - the directory specified by extension_dir_path. + Loads ("imports") all modules matching the pattern *_extension.py in the + directory specified by extension_dir_or_mod_path. extension_dir_or_mod_path + can be either a path to a directory on disk, or a python import path to a + package. The modules are searched and their functions are called (if a match is found) when call_graph_creation_extension() is called. + + The extensions loaded are to be used for graph creation, and the server assumes + the return value of the extension functions is a Graph-like object which is + registered and assigned a unique graph ID. """ - extension_dir = Path(extension_dir_path) + modules_loaded = [] + try: + extension_files = self.__get_extension_files_from_path( + extension_dir_or_mod_path + ) - if (not extension_dir.exists()) or (not extension_dir.is_dir()): - raise CugraphServiceError(f"bad directory: {extension_dir}") + for ext_file in extension_files: + module_file_path = ext_file.absolute().as_posix() + spec = importlib.util.spec_from_file_location( + module_file_path, ext_file + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + self.__graph_creation_extensions[module_file_path] = module + modules_loaded.append(module_file_path) - modules_loaded = [] - for ext_file in extension_dir.glob("*_extension.py"): - module_file_path = ext_file.absolute().as_posix() - spec = importlib.util.spec_from_file_location(module_file_path, ext_file) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - self.__graph_creation_extensions[module_file_path] = module - modules_loaded.append(module_file_path) + return modules_loaded - return modules_loaded + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") def load_extensions(self, extension_dir_or_mod_path): """ - Loads ("imports") all modules matching the pattern *_extension.py in - the directory specified by extension_dir_path. + Loads ("imports") all modules matching the pattern *_extension.py in the + directory specified by extension_dir_or_mod_path. extension_dir_or_mod_path + can be either a path to a directory on disk, or a python import path to a + package. The modules are searched and their functions are called (if a match is - found) when call_extension() is called. + found) when call_graph_creation_extension() is called. """ modules_loaded = [] - extension_path = Path(extension_dir_or_mod_path) - # extension_dir_path is either a path on disk or an importable module path - # (eg. import foo.bar.module) - if (not extension_path.exists()) or (not extension_path.is_dir()): - try: - mod = importlib.import_module(str(extension_path)) - except ModuleNotFoundError: - raise CugraphServiceError(f"bad path: {extension_dir_or_mod_path}") - - mod_file_path = Path(mod.__file__).absolute() + try: + extension_files = self.__get_extension_files_from_path( + extension_dir_or_mod_path + ) - # If mod is a package, find all the .py files in it - if mod_file_path.name == "__init__.py": - extension_files = mod_file_path.parent.glob("*.py") - else: - extension_files = [mod_file_path] - else: - extension_files = extension_path.glob("*_extension.py") + for ext_file in extension_files: + module_file_path = ext_file.absolute().as_posix() + spec = importlib.util.spec_from_file_location( + module_file_path, ext_file + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + self.__extensions[module_file_path] = module + modules_loaded.append(module_file_path) - for ext_file in extension_files: - module_file_path = ext_file.absolute().as_posix() - spec = importlib.util.spec_from_file_location(module_file_path, ext_file) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - self.__extensions[module_file_path] = module - modules_loaded.append(module_file_path) + return modules_loaded - return modules_loaded + except Exception: + raise CugraphServiceError(f"{traceback.format_exc()}") def unload_extension_module(self, modname): """ @@ -364,7 +380,15 @@ def call_extension( except Exception: raise CugraphServiceError(f"{traceback.format_exc()}") - def initialize_dask_client(self, dask_scheduler_file=None): + def initialize_dask_client( + self, + dask_scheduler_file=None, + enable_tcp_over_ucx=False, + enable_infiniband=False, + enable_nvlink=False, + enable_rdmacm=False, + net_devices=None, + ): """ Initialize a dask client to be used for MG operations. """ @@ -372,8 +396,18 @@ def initialize_dask_client(self, dask_scheduler_file=None): dask_initialize() self.__dask_client = Client(scheduler_file=dask_scheduler_file) else: - # FIXME: LocalCUDACluster init. Implement when tests are in place. - raise NotImplementedError + # The tempdir created by tempdir_object should be cleaned up once + # tempdir_object goes out-of-scope and is deleted. + tempdir_object = tempfile.TemporaryDirectory() + cluster = LocalCUDACluster( + local_directory=tempdir_object.name, + enable_tcp_over_ucx=enable_tcp_over_ucx, + enable_infiniband=enable_infiniband, + enable_nvlink=enable_nvlink, + enable_rdmacm=enable_rdmacm, + ) + self.__dask_client = Client(cluster) + self.__dask_client.wait_for_workers(len(get_visible_devices())) if not Comms.is_initialized(): Comms.initialize(p2p=True) @@ -903,9 +937,7 @@ def batched_ego_graphs(self, seeds, radius, graph_id): # FIXME: this should not be needed, need to update # cugraph.batched_ego_graphs to also accept a list seeds = cudf.Series(seeds, dtype="int32") - (ego_edge_list, seeds_offsets) = cugraph.batched_ego_graphs( - G, seeds, radius - ) + (ego_edge_list, seeds_offsets) = batched_ego_graphs(G, seeds, radius) # batched_ego_graphs_result = BatchedEgoGraphsResult( # src_verts=ego_edge_list["src"].values_host.tobytes(), #i32 @@ -947,9 +979,7 @@ def node2vec(self, start_vertices, max_depth, graph_id): # to also accept a list start_vertices = cudf.Series(start_vertices, dtype="int32") - (paths, weights, path_sizes) = cugraph.node2vec( - G, start_vertices, max_depth - ) + (paths, weights, path_sizes) = node2vec(G, start_vertices, max_depth) node2vec_result = Node2vecResult( vertex_paths=paths.values_host, @@ -970,15 +1000,18 @@ def uniform_neighbor_sample( result_host, result_port, ): - G = self._get_graph(graph_id) - if isinstance(G, (MGPropertyGraph, PropertyGraph)): - # Implicitly extract a subgraph containing the entire multigraph. - # G will be garbage collected when this function returns. - G = G.extract_subgraph( - create_using=cugraph.MultiGraph(directed=True), default_edge_weight=1.0 - ) - try: + G = self._get_graph(graph_id) + if isinstance(G, (MGPropertyGraph, PropertyGraph)): + # Implicitly extract a subgraph containing the entire multigraph. + # G will be garbage collected when this function returns. + G = G.extract_subgraph( + create_using=MultiGraph(directed=True), + default_edge_weight=1.0, + ) + + print("SERVER: starting sampling...") + st = time.perf_counter_ns() uns_result = call_algo( uniform_neighbor_sample, G, @@ -986,7 +1019,13 @@ def uniform_neighbor_sample( fanout_vals=fanout_vals, with_replacement=with_replacement, ) + print( + f"SERVER: done sampling, took {((time.perf_counter_ns() - st) / 1e9)}s" + ) + if self.__check_host_port_args(result_host, result_port): + print("SERVER: calling ucx_send_results...") + st = time.perf_counter_ns() asyncio.run( self.__ucx_send_results( result_host, @@ -996,6 +1035,10 @@ def uniform_neighbor_sample( uns_result.indices, ) ) + print( + "SERVER: done ucx_send_results, took " + f"{((time.perf_counter_ns() - st) / 1e9)}s" + ) # FIXME: Thrift still expects something of the expected type to # be returned to be serialized and sent. Look into a separate # API that uses the Thrift "oneway" modifier when returning @@ -1125,9 +1168,9 @@ def __parse_create_using_string(self, create_using): raise ValueError(f"Could not parse argument {arg}", e) if graph_type == "Graph": - graph_type = cugraph.Graph + graph_type = Graph else: - graph_type = cugraph.MultiGraph + graph_type = MultiGraph return graph_type(**args_dict) @@ -1146,6 +1189,29 @@ def __check_host_port_args(result_host, result_port): return True return False + @staticmethod + def __get_extension_files_from_path(extension_dir_or_mod_path): + extension_path = Path(extension_dir_or_mod_path) + # extension_dir_path is either a path on disk or an importable module path + # (eg. import foo.bar.module) + if (not extension_path.exists()) or (not extension_path.is_dir()): + try: + mod = importlib.import_module(str(extension_path)) + except ModuleNotFoundError: + raise CugraphServiceError(f"bad path: {extension_dir_or_mod_path}") + + mod_file_path = Path(mod.__file__).absolute() + + # If mod is a package, find all the .py files in it + if mod_file_path.name == "__init__.py": + extension_files = mod_file_path.parent.glob("*.py") + else: + extension_files = [mod_file_path] + else: + extension_files = extension_path.glob("*_extension.py") + + return extension_files + async def __ucx_send_results(self, result_host, result_port, *results): # The cugraph_service_client should have set up a UCX listener waiting # for the result. Create an endpoint, send results, and close. @@ -1293,14 +1359,14 @@ def __call_extension( func_kwargs[facade_param] = ExtensionServerFacade(self) else: raise CugraphServiceError( - f"{facade_param}, if specified, must be the " "last param." + f"{facade_param}, if specified, must be the last param." ) try: return func(*func_args, **func_kwargs) except Exception: # FIXME: raise a more detailed error raise CugraphServiceError( - f"error running {func_name} : " f"{traceback.format_exc()}" + f"error running {func_name} : {traceback.format_exc()}" ) raise CugraphServiceError(f"extension {func_name} was not found") diff --git a/python/cugraph-service/server/cugraph_service_server/testing/__init__.py b/python/cugraph-service/server/cugraph_service_server/testing/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cugraph-service/server/cugraph_service_server/testing/benchmark_server_extension.py b/python/cugraph-service/server/cugraph_service_server/testing/benchmark_server_extension.py new file mode 100644 index 00000000000..8b739b71be9 --- /dev/null +++ b/python/cugraph-service/server/cugraph_service_server/testing/benchmark_server_extension.py @@ -0,0 +1,135 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import cudf +import dask_cudf + +import cugraph +from cugraph.experimental import PropertyGraph, MGPropertyGraph +from cugraph.experimental import datasets +from cugraph.generators import rmat + + +# Graph creation extensions (these are assumed to return a Graph object) +def create_graph_from_builtin_dataset(dataset_name, mg=False, server=None): + dataset_obj = getattr(datasets, dataset_name) + return dataset_obj.get_graph(fetch=True) + + +def create_property_graph_from_builtin_dataset(dataset_name, mg=False, server=None): + dataset_obj = getattr(datasets, dataset_name) + edgelist_df = dataset_obj.get_edgelist(fetch=True) + + if mg and (server is not None) and server.is_multi_gpu(): + G = MGPropertyGraph() + edgelist_df = dask_cudf.from_cudf(edgelist_df) + else: + G = PropertyGraph() + + G.add_edge_data(edgelist_df, vertex_col_names=["src", "dst"]) + return G + + +def create_graph_from_rmat_generator( + scale, + num_edges, + a=0.57, # from Graph500 + b=0.19, # from Graph500 + c=0.19, # from Graph500 + seed=42, + clip_and_flip=False, + scramble_vertex_ids=False, # FIXME: need to understand relevance of this + mg=False, + server=None, +): + if mg and (server is not None) and server.is_multi_gpu: + is_mg = True + else: + is_mg = False + + edgelist_df = rmat( + scale, + num_edges, + a, + b, + c, + seed, + clip_and_flip, + scramble_vertex_ids, + create_using=None, # None == return edgelist + mg=is_mg, + ) + edgelist_df["weight"] = np.float32(1) + + # For PropertyGraph, uncomment: + # if is_mg: + # G = MGPropertyGraph() + # else: + # G = PropertyGraph() + # + # G.add_edge_data(edgelist_df, vertex_col_names=["src", "dst"]) + + # For Graph, uncomment: + G = cugraph.Graph(directed=True) + if is_mg: + G.from_dask_cudf_edgelist( + edgelist_df, source="src", destination="dst", edge_attr="weight" + ) + else: + G.from_cudf_edgelist( + edgelist_df, source="src", destination="dst", edge_attr="weight" + ) + + return G + + +# General-purpose extensions +def gen_vertex_list(graph_id, num_start_verts, seed, server=None): + """ + Create the list of starting vertices by picking num_start_verts random ints + between 0 and num_verts, then map those to actual vertex IDs. Since the + randomly-chosen IDs may not map to actual IDs, keep trying until + num_start_verts have been picked, or max_tries is reached. + """ + rng = np.random.default_rng(seed) + + G = server.get_graph(graph_id) + assert G.renumbered + num_verts = G.number_of_vertices() + + start_list_set = set() + max_tries = 10000 + try_num = 0 + while (len(start_list_set) < num_start_verts) and (try_num < max_tries): + internal_vertex_ids_start_list = rng.choice( + num_verts, size=num_start_verts, replace=False + ) + start_list_df = cudf.DataFrame({"vid": internal_vertex_ids_start_list}) + start_list_df = G.unrenumber(start_list_df, "vid") + + if G.is_multi_gpu(): + start_list_series = start_list_df.compute()["vid"] + else: + start_list_series = start_list_df["vid"] + + start_list_series.dropna(inplace=True) + start_list_set.update(set(start_list_series.values_host.tolist())) + try_num += 1 + + start_list = list(start_list_set) + start_list = start_list[:num_start_verts] + assert len(start_list) == num_start_verts + + return start_list diff --git a/python/cugraph-service/tests/utils.py b/python/cugraph-service/server/cugraph_service_server/testing/utils.py similarity index 78% rename from python/cugraph-service/tests/utils.py rename to python/cugraph-service/server/cugraph_service_server/testing/utils.py index f965403c92b..2f2af40fc55 100644 --- a/python/cugraph-service/tests/utils.py +++ b/python/cugraph-service/server/cugraph_service_server/testing/utils.py @@ -38,6 +38,7 @@ def start_server_subprocess( host="localhost", port=9090, graph_creation_extension_dir=None, + start_local_cuda_cluster=False, dask_scheduler_file=None, env_additions=None, ): @@ -57,14 +58,18 @@ def start_server_subprocess( if env_additions is not None: env_dict.update(env_additions) - # pytest will update sys.path based on the tests it discovers, and for this - # source tree, an entry for the parent of this "tests" directory will be - # added. The parent to this "tests" directory also allows imports to find - # the cugraph_service sources, so in oder to ensure the server that's - # started is also using the same sources, the PYTHONPATH env should be set - # to the sys.path being used in this process. + # pytest will update sys.path based on the tests it discovers and optional + # settings in pytest.ini. Make sure any path settings are passed on the the + # server so modules are properly found. env_dict["PYTHONPATH"] = ":".join(sys.path) + # special case: some projects organize their tests/benchmarks by package + # name, such as "cugraph". Unfortunately, these can collide with installed + # package names since python will treat them as a namespace package if this + # is run from a directory with a "cugraph" or similar subdir. Simply change + # to a temp dir prior to running the server to avoid collisions. + tempdir_object = TemporaryDirectory() + args = [ sys.executable, "-m", @@ -84,6 +89,8 @@ def start_server_subprocess( "--dask-scheduler-file", dask_scheduler_file, ] + if start_local_cuda_cluster: + args += ["--start-local-cuda-cluster"] try: server_process = subprocess.Popen( @@ -92,15 +99,21 @@ def start_server_subprocess( stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, + cwd=tempdir_object.name, ) + # Attach tempdir_object to server_process so it is not deleted and + # removed when it goes out of scope. Instead, it should get deleted + # when server_process is GC'd + server_process.tempdir = tempdir_object + print( "\nLaunched cugraph_service server, waiting for it to start...", end="", flush=True, ) client = CugraphServiceClient(host, port) - max_retries = 20 + max_retries = 60 retries = 0 while retries < max_retries: try: diff --git a/python/cugraph-service/server/setup.py b/python/cugraph-service/server/setup.py index f9cc2f6c9c8..a9451229c7b 100644 --- a/python/cugraph-service/server/setup.py +++ b/python/cugraph-service/server/setup.py @@ -37,7 +37,9 @@ ], author="NVIDIA Corporation", url="https://github.com/rapidsai/cugraph", - packages=find_packages(include=["cugraph_service_server"]), + packages=find_packages( + include=["cugraph_service_server", "cugraph_service_server.*"] + ), entry_points={ "console_scripts": [ "cugraph-service-server=cugraph_service_server.__main__:main" diff --git a/python/cugraph-service/tests/conftest.py b/python/cugraph-service/tests/conftest.py index e2cb1917499..73b21aa16c8 100644 --- a/python/cugraph-service/tests/conftest.py +++ b/python/cugraph-service/tests/conftest.py @@ -14,7 +14,7 @@ import pytest -from . import utils +from cugraph_service_server.testing import utils graph_creation_extension1_file_contents = """ import cudf @@ -229,6 +229,38 @@ def my_extension(arg1, arg2, server): # module scope fixtures +@pytest.fixture(scope="module") +def server(): + """ + Start a cugraph_service server, stop it when done with the fixture. + """ + from cugraph_service_client import CugraphServiceClient + from cugraph_service_client.exceptions import CugraphServiceError + + host = "localhost" + port = 9090 + client = CugraphServiceClient(host, port) + + try: + client.uptime() + print("FOUND RUNNING SERVER, ASSUMING IT SHOULD BE USED FOR TESTING!") + yield + + except CugraphServiceError: + # A server was not found, so start one for testing then stop it when + # testing is done. + server_process = utils.start_server_subprocess(host=host, port=port) + + # yield control to the tests, cleanup on return + yield + + # tests are done, now stop the server + print("\nTerminating server...", end="", flush=True) + server_process.terminate() + server_process.wait(timeout=60) + print("done.", flush=True) + + @pytest.fixture(scope="module") def graph_creation_extension1(): tmp_extension_dir = utils.create_tmp_extension_dir( @@ -336,3 +368,29 @@ def extension_adds_graph(): ) yield tmp_extension_dir.name + + +############################################################################### +# function scope fixtures + + +@pytest.fixture(scope="function") +def client(server): + """ + Creates a client instance to the running server, closes the client when the + fixture is no longer used by tests. + """ + from cugraph_service_client import CugraphServiceClient, defaults + + client = CugraphServiceClient(defaults.host, defaults.port) + + for gid in client.get_graph_ids(): + client.delete_graph(gid) + + # FIXME: should this fixture always unconditionally unload all extensions? + # client.unload_graph_creation_extensions() + + # yield control to the tests, cleanup on return + yield client + + client.close() diff --git a/python/cugraph-service/tests/test_cugraph_handler.py b/python/cugraph-service/tests/test_cugraph_handler.py index e592b643deb..c2af279e157 100644 --- a/python/cugraph-service/tests/test_cugraph_handler.py +++ b/python/cugraph-service/tests/test_cugraph_handler.py @@ -82,7 +82,7 @@ def test_load_and_call_graph_creation_extension(graph_creation_extension2): assert "c" in edge_props -def test_load_and_unload_extensions(graph_creation_extension2, extension1): +def test_load_call_unload_extensions(graph_creation_extension2, extension1): """ Ensure extensions can be loaded, run, and unloaded. """ @@ -181,10 +181,45 @@ def test_extension_with_facade_graph_access( assert results.list_value[2].double_value == 2 + val1 + val2 -def test_load_and_unload_extensions_python_module_path(extension1): +def test_load_call_unload_testing_extensions(): + """ """ + from cugraph_service_server.cugraph_handler import CugraphHandler + + handler = CugraphHandler() + num_loaded = handler.load_graph_creation_extensions( + "cugraph_service_server.testing.benchmark_server_extension" + ) + assert len(num_loaded) == 1 + + gid1 = handler.call_graph_creation_extension( + "create_graph_from_builtin_dataset", "('karate',)", "{}" + ) + scale = 2 + edgefactor = 2 + gid2 = handler.call_graph_creation_extension( + "create_graph_from_rmat_generator", + "()", + f"{{'scale': {scale}, 'num_edges': {(scale**2) * edgefactor}, " + "'seed': 42, 'mg': False}", + ) + assert gid1 != gid2 + + graph_info1 = handler.get_graph_info(keys=[], graph_id=gid1) + # since the handler returns a dictionary of objs used byt her serialization + # code, convert each item to a native python type for easy checking. + graph_info1 = {k: v.get_py_obj() for (k, v) in graph_info1.items()} + assert graph_info1["num_vertices"] == 34 + assert graph_info1["num_edges"] == 78 + graph_info2 = handler.get_graph_info(keys=[], graph_id=gid2) + graph_info2 = {k: v.get_py_obj() for (k, v) in graph_info2.items()} + assert graph_info2["num_vertices"] <= 4 + assert graph_info2["num_edges"] <= 8 + + +def test_load_call_unload_extensions_python_module_path(extension1): """ - Load, run, unload an extension that was loaded using a python module path - (as would be used by an import statement) instead of a file path. + Load, run, unload an extension that was loaded using a python module + path (as would be used by an import statement) instead of a file path. """ from cugraph_service_client.exceptions import CugraphServiceError from cugraph_service_server.cugraph_handler import CugraphHandler @@ -256,7 +291,7 @@ def test_load_and_unload_extensions_python_module_path(extension1): ) -def test_load_and_unload_graph_creation_extension_no_args(graph_creation_extension1): +def test_load_call_unload_graph_creation_extension_no_args(graph_creation_extension1): """ Test graph_creation_extension1 which contains an extension with no args. @@ -275,7 +310,7 @@ def test_load_and_unload_graph_creation_extension_no_args(graph_creation_extensi assert new_graph_ID in handler.get_graph_ids() -def test_load_and_unload_graph_creation_extension_no_facade_arg( +def test_load_call_unload_graph_creation_extension_no_facade_arg( graph_creation_extension_no_facade_arg, ): """ @@ -295,7 +330,7 @@ def test_load_and_unload_graph_creation_extension_no_facade_arg( assert new_graph_ID in handler.get_graph_ids() -def test_load_and_unload_graph_creation_extension_bad_arg_order( +def test_load_call_unload_graph_creation_extension_bad_arg_order( graph_creation_extension_bad_arg_order, ): """ @@ -414,3 +449,32 @@ def test_get_graph_data_empty_graph(graph_creation_extension_empty_graph): ) assert len(pickle.loads(edge_data)) == 0 + + +def test_get_server_info(graph_creation_extension1, extension1): + """ + Ensures the server meta-data from get_server_info() is correct. This + includes information about loaded extensions, so fixtures that provide + extensions to be loaded are used. + """ + from cugraph_service_server.cugraph_handler import CugraphHandler + + handler = CugraphHandler() + + handler.load_graph_creation_extensions(graph_creation_extension1) + handler.load_extensions(extension1) + + meta_data = handler.get_server_info() + assert meta_data["num_gpus"].int32_value is not None + assert ( + str( + Path( + meta_data["graph_creation_extensions"].list_value[0].get_py_obj() + ).parent + ) + == graph_creation_extension1 + ) + assert ( + str(Path(meta_data["extensions"].list_value[0].get_py_obj()).parent) + == extension1 + ) diff --git a/python/cugraph-service/tests/test_e2e.py b/python/cugraph-service/tests/test_e2e.py index 15605378ca4..99b80404fe4 100644 --- a/python/cugraph-service/tests/test_e2e.py +++ b/python/cugraph-service/tests/test_e2e.py @@ -18,65 +18,11 @@ import pytest from . import data -from . import utils ############################################################################### # fixtures - - -@pytest.fixture(scope="module") -def server(): - """ - Start a cugraph_service server, stop it when done with the fixture. - """ - from cugraph_service_client import CugraphServiceClient - from cugraph_service_client.exceptions import CugraphServiceError - - host = "localhost" - port = 9090 - client = CugraphServiceClient(host, port) - - try: - client.uptime() - print("FOUND RUNNING SERVER, ASSUMING IT SHOULD BE USED FOR TESTING!") - yield - - except CugraphServiceError: - # A server was not found, so start one for testing then stop it when - # testing is done. - server_process = utils.start_server_subprocess(host=host, port=port) - - # yield control to the tests, cleanup on return - yield - - # tests are done, now stop the server - print("\nTerminating server...", end="", flush=True) - server_process.terminate() - server_process.wait(timeout=60) - print("done.", flush=True) - - -@pytest.fixture(scope="function") -def client(server): - """ - Creates a client instance to the running server, closes the client when the - fixture is no longer used by tests. - """ - from cugraph_service_client import CugraphServiceClient, defaults - - client = CugraphServiceClient(defaults.host, defaults.port) - - for gid in client.get_graph_ids(): - client.delete_graph(gid) - - # FIXME: should this fixture always unconditionally unload all extensions? - # client.unload_graph_creation_extensions() - - # yield control to the tests, cleanup on return - yield client - - client.close() +# The fixtures used in these tests are defined here and in conftest.py @pytest.fixture(scope="function") @@ -526,3 +472,15 @@ def test_create_property_graph(client): del pG assert set(client.get_graph_ids()) == old_ids + + +def test_get_server_info(client_with_graph_creation_extension_loaded): + """ + Ensures the server meta-data from get_server_info() is correct. This + includes information about loaded extensions, so the fixture which + pre-loads extensions into the server is used. + """ + client = client_with_graph_creation_extension_loaded + meta_data = client.get_server_info() + assert isinstance(meta_data["num_gpus"], int) + assert Path(meta_data["graph_creation_extensions"][0]).exists() diff --git a/python/cugraph-service/tests/test_mg_e2e.py b/python/cugraph-service/tests/test_mg_e2e.py index 6505c264f39..90d06a740a6 100644 --- a/python/cugraph-service/tests/test_mg_e2e.py +++ b/python/cugraph-service/tests/test_mg_e2e.py @@ -18,8 +18,8 @@ import pytest import cupy as cp +from cugraph_service_server.testing import utils from . import data -from . import utils ############################################################################### diff --git a/python/cugraph-service/tests/test_remote_graph.py b/python/cugraph-service/tests/test_remote_graph.py index c2735da0b66..580c3d7b290 100644 --- a/python/cugraph-service/tests/test_remote_graph.py +++ b/python/cugraph-service/tests/test_remote_graph.py @@ -15,77 +15,20 @@ import importlib import random - import pytest - -from . import data, utils - -import cudf -import cupy import pandas as pd import numpy as np - +import cupy +import cudf import cugraph from cugraph.experimental import PropertyGraph + from cugraph_service_client import RemoteGraph +from . import data ############################################################################### # fixtures - - -@pytest.fixture(scope="module") -def server(): - """ - Start a cugraph_service server, stop it when done with the fixture. - """ - from cugraph_service_client import CugraphServiceClient - from cugraph_service_client.exceptions import CugraphServiceError - - host = "localhost" - port = 9090 - client = CugraphServiceClient(host, port) - - try: - client.uptime() - print("FOUND RUNNING SERVER, ASSUMING IT SHOULD BE USED FOR TESTING!") - yield - - except CugraphServiceError: - # A server was not found, so start one for testing then stop it when - # testing is done. - server_process = utils.start_server_subprocess(host=host, port=port) - - # yield control to the tests, cleanup on return - yield - - # tests are done, now stop the server - print("\nTerminating server...", end="", flush=True) - server_process.terminate() - server_process.wait(timeout=60) - print("done.", flush=True) - - -@pytest.fixture(scope="function") -def client(server): - """ - Creates a client instance to the running server, closes the client when the - fixture is no longer used by tests. - """ - from cugraph_service_client import CugraphServiceClient, defaults - - client = CugraphServiceClient(defaults.host, defaults.port) - - for gid in client.get_graph_ids(): - client.delete_graph(gid) - - # FIXME: should this fixture always unconditionally unload all extensions? - # client.unload_graph_creation_extensions() - - # yield control to the tests - yield client - - # tests are done, now stop the server - client.close() +# The fixtures used in these tests are defined here and in conftest.py @pytest.fixture(scope="function") diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 1d78b2c89de..cef40588879 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -334,7 +334,19 @@ def number_of_vertices(self): """ if self.properties.node_count is None: if self.edgelist is not None: - ddf = self.edgelist.edgelist_df[["src", "dst"]] + if self.renumbered is True: + src_col_name = self.renumber_map.renumbered_src_col_name + dst_col_name = self.renumber_map.renumbered_dst_col_name + # FIXME: from_dask_cudf_edgelist() currently requires + # renumber=True for MG, so this else block will not be + # used. Should this else block be removed and added back when + # the restriction is removed? + else: + src_col_name = "src" + dst_col_name = "dst" + + ddf = self.edgelist.edgelist_df[[src_col_name, dst_col_name]] + # ddf = self.edgelist.edgelist_df[["src", "dst"]] self.properties.node_count = ddf.max().max().compute() + 1 else: raise RuntimeError("Graph is Empty") @@ -851,7 +863,19 @@ def edges(self): sources and destinations. It does not return the edge weights. For viewing edges with weights use view_edge_list() """ - return self.view_edge_list()[["src", "dst"]] + if self.renumbered is True: + src_col_name = self.renumber_map.renumbered_src_col_name + dst_col_name = self.renumber_map.renumbered_dst_col_name + # FIXME: from_dask_cudf_edgelist() currently requires + # renumber=True for MG, so this else block will not be + # used. Should this else block be removed and added back when + # the restriction is removed? + else: + src_col_name = "src" + dst_col_name = "dst" + + # return self.view_edge_list()[["src", "dst"]] + return self.view_edge_list()[[src_col_name, dst_col_name]] def nodes(self): """ diff --git a/python/cugraph/cugraph/testing/mg_utils.py b/python/cugraph/cugraph/testing/mg_utils.py new file mode 100644 index 00000000000..4b76bccf3ee --- /dev/null +++ b/python/cugraph/cugraph/testing/mg_utils.py @@ -0,0 +1,73 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile + +from dask.distributed import Client +from dask_cuda import LocalCUDACluster +from dask_cuda.initialize import initialize + +from cugraph.dask.comms import comms as Comms +from cugraph.dask.common.mg_utils import get_visible_devices + + +def start_dask_client( + enable_tcp_over_ucx=True, + enable_nvlink=True, + enable_infiniband=False, + enable_rdmacm=False, + net_devices=None, +): + dask_scheduler_file = os.environ.get("SCHEDULER_FILE") + cluster = None + client = None + tempdir_object = None + + if dask_scheduler_file: + # Env var UCX_MAX_RNDV_RAILS=1 must be set too. + initialize( + enable_tcp_over_ucx=enable_tcp_over_ucx, + enable_nvlink=enable_nvlink, + enable_infiniband=enable_infiniband, + enable_rdmacm=enable_rdmacm, + # net_devices="mlx5_0:1", + ) + client = Client(scheduler_file=dask_scheduler_file) + print("\ndask_client created using " f"{dask_scheduler_file}") + else: + # The tempdir created by tempdir_object should be cleaned up once + # tempdir_object goes out-of-scope and is deleted. + tempdir_object = tempfile.TemporaryDirectory() + cluster = LocalCUDACluster( + local_directory=tempdir_object.name, + enable_tcp_over_ucx=enable_tcp_over_ucx, + enable_infiniband=enable_infiniband, + enable_nvlink=enable_nvlink, + enable_rdmacm=enable_rdmacm, + ) + client = Client(cluster) + client.wait_for_workers(len(get_visible_devices())) + print("\ndask_client created using LocalCUDACluster") + + Comms.initialize(p2p=True) + + return (client, cluster) + + +def stop_dask_client(client, cluster=None): + Comms.destroy() + client.close() + if cluster: + cluster.close() + print("\ndask_client closed.") diff --git a/python/cugraph/cugraph/testing/utils.py b/python/cugraph/cugraph/testing/utils.py index b77bbe62a6a..443fdcb9092 100644 --- a/python/cugraph/cugraph/testing/utils.py +++ b/python/cugraph/cugraph/testing/utils.py @@ -12,7 +12,6 @@ # limitations under the License. import os -from itertools import product # Assume test environment has the following dependencies installed import pytest @@ -391,67 +390,6 @@ def make_float(v, e, rstate): make = {float: make_float, np.int32: make_int32, np.int64: make_int64} -def genFixtureParamsProduct(*args): - """ - Returns the cartesian product of the param lists passed in. The lists must - be flat lists of pytest.param objects, and the result will be a flat list - of pytest.param objects with values and meta-data combined accordingly. A - flat list of pytest.param objects is required for pytest fixtures to - properly recognize the params. The combinations also include ids generated - from the param values and id names associated with each list. For example: - - genFixtureParamsProduct( ([pytest.param(True, marks=[pytest.mark.A_good]), - pytest.param(False, marks=[pytest.mark.A_bad])], - "A"), - ([pytest.param(True, marks=[pytest.mark.B_good]), - pytest.param(False, marks=[pytest.mark.B_bad])], - "B") ) - - results in fixture param combinations: - - True, True - marks=[A_good, B_good] - id="A=True,B=True" - True, False - marks=[A_good, B_bad] - id="A=True,B=False" - False, True - marks=[A_bad, B_good] - id="A=False,B=True" - False, False - marks=[A_bad, B_bad] - id="A=False,B=False" - - Simply using itertools.product on the lists would result in a list of - sublists of individual param objects (ie. not "merged"), which would not be - recognized properly as params for a fixture by pytest. - - NOTE: This function is only needed for parameterized fixtures. - Tests/benchmarks will automatically get this behavior when specifying - multiple @pytest.mark.parameterize(param_name, param_value_list) - decorators. - """ - # Ensure each arg is a list of pytest.param objs, then separate the params - # and IDs. - paramLists = [] - ids = [] - paramType = pytest.param().__class__ - for (paramList, paramId) in args: - paramListCopy = paramList[:] # do not modify the incoming lists! - for i in range(len(paramList)): - if not isinstance(paramList[i], paramType): - paramListCopy[i] = pytest.param(paramList[i]) - paramLists.append(paramListCopy) - ids.append(paramId) - - retList = [] - for paramCombo in product(*paramLists): - values = [p.values[0] for p in paramCombo] - marks = [m for p in paramCombo for m in p.marks] - id_strings = [] - for (p, paramId) in zip(paramCombo, ids): - # Assume paramId is either a string or a callable - if isinstance(paramId, str): - id_strings.append("%s=%s" % (paramId, p.values[0])) - else: - id_strings.append(paramId(p.values[0])) - comboid = ",".join(id_strings) - retList.append(pytest.param(values, marks=marks, id=comboid)) - return retList - - # shared between min and max spanning tree tests def compare_mst(mst_cugraph, mst_nx): mst_nx_df = nx.to_pandas_edgelist(mst_nx) diff --git a/python/cugraph/cugraph/tests/conftest.py b/python/cugraph/cugraph/tests/conftest.py index 3359b5c3d14..a4a577e2e81 100644 --- a/python/cugraph/cugraph/tests/conftest.py +++ b/python/cugraph/cugraph/tests/conftest.py @@ -11,18 +11,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import tempfile - import pytest -from dask.distributed import Client -from dask_cuda import LocalCUDACluster -from dask_cuda.initialize import initialize - -from cugraph.dask.comms import comms as Comms -from cugraph.dask.common.mg_utils import get_visible_devices - +from cugraph.testing.mg_utils import start_dask_client, stop_dask_client # module-wide fixtures @@ -41,37 +32,15 @@ def gpubenchmark(): @pytest.fixture(scope="module") def dask_client(): - dask_scheduler_file = os.environ.get("SCHEDULER_FILE") - cluster = None - client = None - tempdir_object = None - - if dask_scheduler_file: - # Env var UCX_MAX_RNDV_RAILS=1 must be set too. - initialize( - enable_tcp_over_ucx=True, - enable_nvlink=True, - enable_infiniband=True, - enable_rdmacm=True, - # net_devices="mlx5_0:1", - ) - client = Client(scheduler_file=dask_scheduler_file) - print("\ndask_client fixture: client created using " f"{dask_scheduler_file}") - else: - # The tempdir created by tempdir_object should be cleaned up once - # tempdir_object goes out-of-scope and is deleted. - tempdir_object = tempfile.TemporaryDirectory() - cluster = LocalCUDACluster(local_directory=tempdir_object.name) - client = Client(cluster) - client.wait_for_workers(len(get_visible_devices())) - print("\ndask_client fixture: client created using LocalCUDACluster") - - Comms.initialize(p2p=True) + client = start_dask_client( + enable_tcp_over_ucx=True, + enable_nvlink=True, + enable_infiniband=True, + enable_rdmacm=True, + # net_devices="mlx5_0:1", + ) yield client - Comms.destroy() - client.close() - if cluster: - cluster.close() + stop_dask_client(client) print("\ndask_client fixture: client.close() called") diff --git a/python/cugraph/cugraph/tests/mg/test_mg_core_number.py b/python/cugraph/cugraph/tests/mg/test_mg_core_number.py index ef2c43b5cdc..bf4fb006d32 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_core_number.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_core_number.py @@ -14,11 +14,12 @@ import gc import pytest +import dask_cudf +from pylibcugraph.testing.utils import gen_fixture_params_product import cugraph from cugraph.testing import utils import cugraph.dask as dcg -import dask_cudf # ============================================================================= @@ -34,7 +35,7 @@ def setup_function(): datasets = utils.DATASETS_UNDIRECTED degree_type = ["incoming", "outgoing", "bidirectional"] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (degree_type, "degree_type"), ) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_egonet.py b/python/cugraph/cugraph/tests/mg/test_mg_egonet.py index b816acbce69..246b277a77c 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_egonet.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_egonet.py @@ -11,16 +11,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cugraph.dask as dcg import gc import pytest -import cugraph + import dask_cudf -from cugraph.dask.common.mg_utils import is_single_gpu +from cudf.testing.testing import assert_frame_equal, assert_series_equal +from pylibcugraph.testing import gen_fixture_params_product -# from cugraph.dask.common.mg_utils import is_single_gpu +import cugraph +import cugraph.dask as dcg from cugraph.testing import utils -from cudf.testing.testing import assert_frame_equal, assert_series_equal +from cugraph.dask.common.mg_utils import is_single_gpu # ============================================================================= @@ -45,7 +46,7 @@ def setup_function(): utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv" ] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), (SEEDS, "seeds"), diff --git a/python/cugraph/cugraph/tests/mg/test_mg_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_graph.py index 3170957f0a4..7edb3a06423 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_graph.py @@ -11,24 +11,23 @@ # See the License for the specific language governing permissions and # limitations under the License. import gc +import random import pytest -import cugraph.dask as dcg -import dask_cudf -from cugraph.testing import utils -import cugraph -import random import cupy - +from dask.distributed import wait +import cudf +import dask_cudf from pylibcugraph import bfs as pylibcugraph_bfs from pylibcugraph import ResourceHandle +from pylibcugraph.testing.utils import gen_fixture_params_product +import cugraph +import cugraph.dask as dcg +from cugraph.testing import utils from cugraph.dask.traversal.bfs import convert_to_cudf - import cugraph.dask.comms.comms as Comms from cugraph.dask.common.input_utils import get_distributed_data -from dask.distributed import wait -import cudf # ============================================================================= @@ -45,7 +44,7 @@ def setup_function(): datasets = utils.DATASETS_UNDIRECTED + utils.DATASETS_UNRENUMBERED -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), ([True, False], "legacy_renum_only"), diff --git a/python/cugraph/cugraph/tests/mg/test_mg_hits.py b/python/cugraph/cugraph/tests/mg/test_mg_hits.py index 114b74a4544..34bf749e078 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_hits.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_hits.py @@ -11,11 +11,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cugraph.dask as dcg import gc import pytest -import cugraph + import dask_cudf +from pylibcugraph.testing.utils import gen_fixture_params_product + +import cugraph +import cugraph.dask as dcg # from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.testing import utils @@ -41,7 +44,7 @@ def setup_function(): utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv" ] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), ([50], "max_iter"), ([1.0e-6], "tol"), diff --git a/python/cugraph/cugraph/tests/mg/test_mg_jaccard.py b/python/cugraph/cugraph/tests/mg/test_mg_jaccard.py index e7b09ee72fc..6f8514c5b97 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_jaccard.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_jaccard.py @@ -11,14 +11,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cugraph.dask as dcg import gc +import random + import pytest -import cugraph import dask_cudf -import random +from pylibcugraph.testing import gen_fixture_params_product -# from cugraph.dask.common.mg_utils import is_single_gpu +import cugraph.dask as dcg +import cugraph from cugraph.testing import utils @@ -43,7 +44,7 @@ def setup_function(): utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv" ] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), (HAS_VERTEX_PAIR, "has_vertex_pair"), diff --git a/python/cugraph/cugraph/tests/mg/test_mg_k_core.py b/python/cugraph/cugraph/tests/mg/test_mg_k_core.py index 6c3b6384a53..ec1edfda18c 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_k_core.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_k_core.py @@ -14,13 +14,14 @@ import gc import pytest +import dask_cudf +from cudf.testing.testing import assert_frame_equal +from pylibcugraph.testing import gen_fixture_params_product import cugraph from cugraph.testing import utils import cugraph.dask as dcg -import dask_cudf from cugraph.structure.symmetrize import symmetrize_df -from cudf.testing.testing import assert_frame_equal # ============================================================================= @@ -38,7 +39,7 @@ def setup_function(): core_number = [True, False] degree_type = ["bidirectional", "outgoing", "incoming"] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (core_number, "core_number"), (degree_type, "degree_type") ) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_overlap.py b/python/cugraph/cugraph/tests/mg/test_mg_overlap.py index ade24b31d64..fe33921da8e 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_overlap.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_overlap.py @@ -11,14 +11,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cugraph.dask as dcg import gc +import random + import pytest -import cugraph import dask_cudf -import random +from pylibcugraph.testing import gen_fixture_params_product -# from cugraph.dask.common.mg_utils import is_single_gpu +import cugraph +import cugraph.dask as dcg from cugraph.testing import utils @@ -43,7 +44,7 @@ def setup_function(): utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv" ] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), (HAS_VERTEX_PAIR, "has_vertex_pair"), diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index c01b6a42c44..1879d99ecca 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -21,11 +21,11 @@ import numpy as np from cudf.testing import assert_frame_equal, assert_series_equal from cupy.testing import assert_array_equal +from pylibcugraph.testing.utils import gen_fixture_params_product import cugraph.dask as dcg from cugraph.experimental.datasets import cyber -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH -from cugraph.testing import utils +from cugraph.experimental.datasets import netscience # If the rapids-pytest-benchmark plugin is installed, the "gpubenchmark" # fixture will be available automatically. Check that this fixture is available @@ -165,7 +165,7 @@ def df_type_id(dataframe_type): return s + "?" -df_types_fixture_params = utils.genFixtureParamsProduct((df_types, df_type_id)) +df_types_fixture_params = gen_fixture_params_product((df_types, df_type_id)) @pytest.fixture(scope="module", params=df_types_fixture_params) @@ -178,7 +178,7 @@ def net_PropertyGraph(request): from cugraph.experimental import PropertyGraph dataframe_type = request.param[0] - netscience_csv = utils.RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv" + netscience_csv = netscience.get_path() source_col_name = "src" dest_col_name = "dst" @@ -368,7 +368,7 @@ def net_MGPropertyGraph(dask_client): """ from cugraph.experimental import MGPropertyGraph - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv").as_posix() + input_data_path = str(netscience.get_path()) print(f"dataset={input_data_path}") chunksize = dcg.get_chunksize(input_data_path) ddf = dask_cudf.read_csv( diff --git a/python/cugraph/cugraph/tests/mg/test_mg_random_walks.py b/python/cugraph/cugraph/tests/mg/test_mg_random_walks.py index 2da4af92da4..13ed0d738b1 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_random_walks.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_random_walks.py @@ -11,16 +11,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cugraph.dask as dcg import gc +import random + import pytest -import cugraph import dask_cudf -import random +from pylibcugraph.testing.utils import gen_fixture_params_product -# from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing import utils +import cugraph +# from cugraph.dask.common.mg_utils import is_single_gpu +import cugraph.dask as dcg from cugraph.experimental.datasets import DATASETS_SMALL, karate_asymmetric @@ -42,7 +43,7 @@ def setup_function(): datasets = DATASETS_SMALL + [karate_asymmetric] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), ) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_sorensen.py b/python/cugraph/cugraph/tests/mg/test_mg_sorensen.py index 5b25e5a54ca..28c0c0e6920 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_sorensen.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_sorensen.py @@ -11,14 +11,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cugraph.dask as dcg import gc +import random import pytest -import cugraph + import dask_cudf -import random +from pylibcugraph.testing import gen_fixture_params_product -# from cugraph.dask.common.mg_utils import is_single_gpu +import cugraph.dask as dcg +import cugraph from cugraph.testing import utils @@ -43,7 +44,7 @@ def setup_function(): utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv" ] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), (HAS_VERTEX_PAIR, "has_vertex_pair"), diff --git a/python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py b/python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py index 6d9355eddf0..5ab75f691d8 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py @@ -14,10 +14,11 @@ import gc import pytest - import pandas as pd -import cugraph import dask_cudf +from pylibcugraph.testing.utils import gen_fixture_params_product + +import cugraph from cugraph.testing import utils @@ -147,7 +148,7 @@ def compare(ddf1, ddf2, src_col_name, dst_col_name, val_col_name): ] + utils.DATASETS_UNDIRECTED datasets = [pytest.param(d.as_posix()) for d in input_data_path] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), ([True, False], "edgevals"), ([True, False], "multi_columns"), diff --git a/python/cugraph/cugraph/tests/mg/test_mg_triangle_count.py b/python/cugraph/cugraph/tests/mg/test_mg_triangle_count.py index 2bbfe1cd87e..3907cacb4aa 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_triangle_count.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_triangle_count.py @@ -11,16 +11,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random import gc import pytest - import cudf +import dask_cudf +from pylibcugraph.testing.utils import gen_fixture_params_product + import cugraph from cugraph.testing import utils import cugraph.dask as dcg -import dask_cudf -import random # ============================================================================= @@ -34,7 +35,7 @@ def setup_function(): # Pytest fixtures # ============================================================================= datasets = utils.DATASETS_UNDIRECTED -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), ([True, False], "start_list"), ) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_uniform_neighbor_sample.py b/python/cugraph/cugraph/tests/mg/test_mg_uniform_neighbor_sample.py index 0382ab324cf..679305f6e25 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_uniform_neighbor_sample.py @@ -16,6 +16,7 @@ import pytest import cudf import dask_cudf +from pylibcugraph.testing.utils import gen_fixture_params_product import cugraph.dask as dcg import cugraph @@ -52,7 +53,7 @@ def setup_function(): utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv" ] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), ([False, True], "with_replacement"), diff --git a/python/cugraph/cugraph/tests/test_bfs.py b/python/cugraph/cugraph/tests/test_bfs.py index 306809b71c5..679876fa57b 100644 --- a/python/cugraph/cugraph/tests/test_bfs.py +++ b/python/cugraph/cugraph/tests/test_bfs.py @@ -12,23 +12,23 @@ # limitations under the License. import gc -import pandas -import cupy -import numpy as np -import cudf -import pytest -import cugraph -from cugraph.testing import utils import random +import pytest import pandas as pd import cupy as cp +import numpy as np from cupyx.scipy.sparse import coo_matrix as cp_coo_matrix from cupyx.scipy.sparse import csr_matrix as cp_csr_matrix from cupyx.scipy.sparse import csc_matrix as cp_csc_matrix from scipy.sparse import coo_matrix as sp_coo_matrix from scipy.sparse import csr_matrix as sp_csr_matrix from scipy.sparse import csc_matrix as sp_csc_matrix +import cudf +from pylibcugraph.testing.utils import gen_fixture_params_product + +import cugraph +from cugraph.testing import utils from cugraph.experimental import datasets # Temporarily suppress warnings till networkX fixes deprecation warnings @@ -295,31 +295,31 @@ def get_cu_graph_nx_results_and_params(seed, depth_limit, G, dataset, directed, DATASETS_SMALL = [pytest.param(d) for d in datasets.DATASETS_SMALL] DEPTH_LIMIT = [pytest.param(d) for d in DEPTH_LIMITS] -# Call genFixtureParamsProduct() to caluculate the cartesian product of +# Call gen_fixture_params_product() to caluculate the cartesian product of # multiple lists of params. This is required since parameterized fixtures do # not do this automatically (unlike multiply-parameterized tests). The 2nd # item in the tuple is a label for the param value used when displaying the # full test name. -algo_test_fixture_params = utils.genFixtureParamsProduct( +algo_test_fixture_params = gen_fixture_params_product( (SEEDS, "seed"), (DEPTH_LIMIT, "depth_limit") ) -graph_fixture_params = utils.genFixtureParamsProduct( +graph_fixture_params = gen_fixture_params_product( (DATASETS, "ds"), (DIRECTED, "dirctd") ) -small_graph_fixture_params = utils.genFixtureParamsProduct( +small_graph_fixture_params = gen_fixture_params_product( (DATASETS_SMALL, "ds"), (DIRECTED, "dirctd") ) # The single param list variants are used when only 1 param combination is # needed (eg. testing non-native input types where tests for other combinations # was covered elsewhere). -single_algo_test_fixture_params = utils.genFixtureParamsProduct( +single_algo_test_fixture_params = gen_fixture_params_product( ([SEEDS[0]], "seed"), ([DEPTH_LIMIT[0]], "depth_limit") ) -single_small_graph_fixture_params = utils.genFixtureParamsProduct( +single_small_graph_fixture_params = gen_fixture_params_product( ([DATASETS_SMALL[0]], "ds"), (DIRECTED, "dirctd") ) diff --git a/python/cugraph/cugraph/tests/test_compat_pr.py b/python/cugraph/cugraph/tests/test_compat_pr.py index 4ae81000e25..ecaf0546967 100644 --- a/python/cugraph/cugraph/tests/test_compat_pr.py +++ b/python/cugraph/cugraph/tests/test_compat_pr.py @@ -17,12 +17,16 @@ # 'collections.abc' is deprecated, and in 3.8 it will stop working) for # python 3.7. Also, this import networkx needs to be relocated in the # third-party group once this gets fixed. -import pytest -from cugraph.testing import utils -import numpy as np import gc import importlib +import pytest +import numpy as np +from pylibcugraph.testing.utils import gen_fixture_params_product + +from cugraph.testing import utils +from cugraph.experimental.datasets import karate + MAX_ITERATIONS = [100, 200] TOLERANCE = [1.0e-06] @@ -30,7 +34,7 @@ PERS_PERCENT = [0, 15] HAS_GUESS = [0, 1] -FILES_UNDIRECTED = [utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv"] +FILES_UNDIRECTED = [karate.get_path()] # these are only used in the missing parameter tests. KARATE_RANKING = [11, 9, 14, 15, 18, 20, 22, 17, 21, 12, 26, 16, 28, 19] @@ -50,7 +54,7 @@ def setup_function(): datasets = FILES_UNDIRECTED -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (MAX_ITERATIONS, "max_iter"), (TOLERANCE, "tol"), diff --git a/python/cugraph/cugraph/tests/test_core_number.py b/python/cugraph/cugraph/tests/test_core_number.py index b7a9175a5bc..89cf421505a 100644 --- a/python/cugraph/cugraph/tests/test_core_number.py +++ b/python/cugraph/cugraph/tests/test_core_number.py @@ -14,10 +14,11 @@ import gc import pytest - import cudf -import cugraph +from pylibcugraph.testing.utils import gen_fixture_params_product import networkx as nx + +import cugraph from cugraph.testing import utils from cugraph.experimental.datasets import DATASETS_UNDIRECTED @@ -35,7 +36,7 @@ def setup_function(): datasets = DATASETS_UNDIRECTED degree_type = ["incoming", "outgoing"] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (degree_type, "degree_type"), ) diff --git a/python/cugraph/cugraph/tests/test_hits.py b/python/cugraph/cugraph/tests/test_hits.py index 54d18c7bf83..5c83c2fd5f4 100644 --- a/python/cugraph/cugraph/tests/test_hits.py +++ b/python/cugraph/cugraph/tests/test_hits.py @@ -17,6 +17,7 @@ import networkx as nx import pandas as pd import cudf +from pylibcugraph.testing.utils import gen_fixture_params_product import cugraph from cugraph.testing import utils @@ -34,7 +35,7 @@ def setup_function(): # Pytest fixtures # ============================================================================= datasets = DATASETS_UNDIRECTED + [email_Eu_core] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), ([50], "max_iter"), ([1.0e-6], "tol"), diff --git a/python/cugraph/cugraph/tests/test_property_graph.py b/python/cugraph/cugraph/tests/test_property_graph.py index 4523c70bc9a..ff67fafcbfc 100644 --- a/python/cugraph/cugraph/tests/test_property_graph.py +++ b/python/cugraph/cugraph/tests/test_property_graph.py @@ -20,7 +20,7 @@ import cudf import cupy as cp from cudf.testing import assert_frame_equal, assert_series_equal -from cugraph.experimental.datasets import cyber +from pylibcugraph.testing.utils import gen_fixture_params_product # If the rapids-pytest-benchmark plugin is installed, the "gpubenchmark" # fixture will be available automatically. Check that this fixture is available @@ -46,7 +46,7 @@ import cugraph from cugraph.generators import rmat -from cugraph.testing import utils +from cugraph.experimental.datasets import cyber def type_is_categorical(pG): @@ -228,7 +228,7 @@ def df_type_id(dataframe_type): return s + "?" -df_types_fixture_params = utils.genFixtureParamsProduct((df_types, df_type_id)) +df_types_fixture_params = gen_fixture_params_product((df_types, df_type_id)) @pytest.fixture(scope="function", params=df_types_fixture_params) diff --git a/python/cugraph/cugraph/tests/test_sssp.py b/python/cugraph/cugraph/tests/test_sssp.py index 518b0aac622..1e13116daea 100644 --- a/python/cugraph/cugraph/tests/test_sssp.py +++ b/python/cugraph/cugraph/tests/test_sssp.py @@ -24,8 +24,9 @@ from scipy.sparse import coo_matrix as sp_coo_matrix from scipy.sparse import csr_matrix as sp_csr_matrix from scipy.sparse import csc_matrix as sp_csc_matrix - import cudf +from pylibcugraph.testing.utils import gen_fixture_params_product + import cugraph from cugraph.testing import utils from cugraph.experimental import datasets @@ -174,7 +175,7 @@ def networkx_call(graph_file, source, edgevals=True): # Pytest fixtures # ============================================================================= -# Call genFixtureParamsProduct() to caluculate the cartesian product of +# Call gen_fixture_params_product() to caluculate the cartesian product of # multiple lists of params. This is required since parameterized fixtures do # not do this automatically (unlike multiply-parameterized tests). The 2nd # item in the tuple is a label for the param value used when displaying the @@ -184,8 +185,8 @@ def networkx_call(graph_file, source, edgevals=True): # the computation. DATASETS = [pytest.param(d) for d in datasets.DATASETS_SMALL] SOURCES = [pytest.param(1)] -fixture_params = utils.genFixtureParamsProduct((DATASETS, "ds"), (SOURCES, "src")) -fixture_params_single_dataset = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product((DATASETS, "ds"), (SOURCES, "src")) +fixture_params_single_dataset = gen_fixture_params_product( ([DATASETS[0]], "ds"), (SOURCES, "src") ) diff --git a/python/cugraph/cugraph/tests/test_triangle_count.py b/python/cugraph/cugraph/tests/test_triangle_count.py index 290d3dcab52..7ed02ad0146 100644 --- a/python/cugraph/cugraph/tests/test_triangle_count.py +++ b/python/cugraph/cugraph/tests/test_triangle_count.py @@ -12,11 +12,12 @@ # limitations under the License. import gc - -import pytest import random +import pytest import cudf +from pylibcugraph.testing.utils import gen_fixture_params_product + import cugraph from cugraph.testing import utils from cugraph.experimental.datasets import DATASETS_UNDIRECTED, karate_asymmetric @@ -45,7 +46,7 @@ def setup_function(): # Pytest fixtures # ============================================================================= datasets = DATASETS_UNDIRECTED -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), ([True, False], "edgevals"), ([True, False], "start_list"), diff --git a/python/cugraph/cugraph/tests/test_uniform_neighbor_sample.py b/python/cugraph/cugraph/tests/test_uniform_neighbor_sample.py index 2a675be56ad..4e40e00a575 100644 --- a/python/cugraph/cugraph/tests/test_uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/tests/test_uniform_neighbor_sample.py @@ -15,9 +15,9 @@ import pytest import cudf +from pylibcugraph.testing.utils import gen_fixture_params_product import cugraph -from cugraph.testing import utils from cugraph import uniform_neighbor_sample from cugraph.experimental.datasets import DATASETS_UNDIRECTED, email_Eu_core, small_tree @@ -36,7 +36,7 @@ def setup_function(): datasets = DATASETS_UNDIRECTED + [email_Eu_core] -fixture_params = utils.genFixtureParamsProduct( +fixture_params = gen_fixture_params_product( (datasets, "graph_file"), (IS_DIRECTED, "directed"), ([False, True], "with_replacement"), diff --git a/python/pylibcugraph/pylibcugraph/testing/__init__.py b/python/pylibcugraph/pylibcugraph/testing/__init__.py index e69de29bb2d..34335777c58 100644 --- a/python/pylibcugraph/pylibcugraph/testing/__init__.py +++ b/python/pylibcugraph/pylibcugraph/testing/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pylibcugraph.testing.utils import gen_fixture_params_product diff --git a/python/pylibcugraph/pylibcugraph/testing/utils.py b/python/pylibcugraph/pylibcugraph/testing/utils.py index 8929be0c0cf..d5b7b8f2147 100644 --- a/python/pylibcugraph/pylibcugraph/testing/utils.py +++ b/python/pylibcugraph/pylibcugraph/testing/utils.py @@ -23,7 +23,54 @@ RAPIDS_DATASET_ROOT_DIR_PATH = Path(RAPIDS_DATASET_ROOT_DIR) -def genFixtureParamsProduct(*args): +def gen_fixture_params(*param_values): + """ + Returns a list of pytest.param objects suitable for use as fixture + parameters created by merging the values in each tuple into individual + pytest.param objects. + + Each tuple can contain multiple values or pytest.param objects. If pytest.param + objects are given, the marks and ids are also merged. + + If ids is specicified, it must either be a list of string ids for each + combination passed in, or a callable that accepts a list of values and + returns a string. + + gen_fixture_params( (pytest.param(True, marks=[pytest.mark.A_good], id="A=True"), + pytest.param(False, marks=[pytest.mark.B_bad], id="B=False")), + (pytest.param(False, marks=[pytest.mark.A_bad], id="A=False"), + pytest.param(True, marks=[pytest.mark.B_good], id="B=True")), + ) + + results in fixture param combinations: + + True, False - marks=[A_good, B_bad] - id="A=True,B=False" + False, False - marks=[A_bad, B_bad] - id="A=False,B=True" + """ + fixture_params = [] + param_type = pytest.param().__class__ # + + for vals in param_values: + new_param_values = [] + new_param_marks = [] + new_param_ids = [] + for val in vals: + if isinstance(val, param_type): + new_param_values += val.values + new_param_marks += val.marks + new_param_ids.append(val.id) + else: + new_param_values += val + new_param_ids.append(str(val)) + fixture_params.append( + pytest.param( + new_param_values, marks=new_param_marks, id="-".join(new_param_ids) + ) + ) + return fixture_params + + +def gen_fixture_params_product(*args): """ Returns the cartesian product of the param lists passed in. The lists must be flat lists of pytest.param objects, and the result will be a flat list @@ -32,12 +79,12 @@ def genFixtureParamsProduct(*args): properly recognize the params. The combinations also include ids generated from the param values and id names associated with each list. For example: - genFixtureParamsProduct( ([pytest.param(True, marks=[pytest.mark.A_good]), - pytest.param(False, marks=[pytest.mark.A_bad])], - "A"), - ([pytest.param(True, marks=[pytest.mark.B_good]), - pytest.param(False, marks=[pytest.mark.B_bad])], - "B") ) + gen_fixture_params_product( ([pytest.param(True, marks=[pytest.mark.A_good]), + pytest.param(False, marks=[pytest.mark.A_bad])], + "A"), + ([pytest.param(True, marks=[pytest.mark.B_good]), + pytest.param(False, marks=[pytest.mark.B_bad])], + "B") ) results in fixture param combinations: @@ -60,19 +107,25 @@ def genFixtureParamsProduct(*args): paramLists = [] ids = [] paramType = pytest.param().__class__ - for (paramList, id) in args: + for (paramList, paramId) in args: + paramListCopy = paramList[:] # do not modify the incoming lists! for i in range(len(paramList)): if not isinstance(paramList[i], paramType): - paramList[i] = pytest.param(paramList[i]) - paramLists.append(paramList) - ids.append(id) + paramListCopy[i] = pytest.param(paramList[i]) + paramLists.append(paramListCopy) + ids.append(paramId) retList = [] for paramCombo in product(*paramLists): values = [p.values[0] for p in paramCombo] marks = [m for p in paramCombo for m in p.marks] - comboid = ",".join( - ["%s=%s" % (id, p.values[0]) for (p, id) in zip(paramCombo, ids)] - ) + id_strings = [] + for (p, paramId) in zip(paramCombo, ids): + # Assume paramId is either a string or a callable + if isinstance(paramId, str): + id_strings.append("%s=%s" % (paramId, p.values[0])) + else: + id_strings.append(paramId(p.values[0])) + comboid = ",".join(id_strings) retList.append(pytest.param(values, marks=marks, id=comboid)) return retList