From 1d62262447e743bede7c8c343305385ba8c85eb0 Mon Sep 17 00:00:00 2001 From: Rick Ratzel Date: Mon, 12 Sep 2022 22:23:00 -0500 Subject: [PATCH 1/3] Fixed bug in NumberMap where id_type assignment happened too early causing it to use the incoming data type instead of the mapped type, which matters more when strings are passed in since they must be mapped to ints. Added new MG test that uses string vertex IDs. Cleaned up and fixed bugs in various MG test files that prevented pytest from discovering and running all tests in the "mg" test directory. --- .../cugraph/cugraph/structure/number_map.py | 2 +- python/cugraph/cugraph/testing/utils.py | 19 +++--- .../cugraph/tests/mg/test_mg_doctests.py | 25 +++----- .../cugraph/tests/mg/test_mg_renumber.py | 58 ++++++++++++++++++- 4 files changed, 75 insertions(+), 29 deletions(-) diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index 7f5b71a3762..35cc5bfc99f 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -533,7 +533,6 @@ def renumber_and_segment( renumber_map.set_renumbered_col_names( src_col_names, dst_col_names, df.columns) - id_type = df[src_col_names[0]].dtype if isinstance(df, cudf.DataFrame): renumber_map.implementation = NumberMap.SingleGPU( df, src_col_names, dst_col_names, renumber_map.id_type, @@ -615,6 +614,7 @@ def get_renumbered_df(id_type, data): .astype(id_type) return data[2] + id_type = df[renumber_map.renumbered_src_col_name].dtype renumbering_map = dask_cudf.from_delayed( [client.submit(get_renumber_map, id_type, diff --git a/python/cugraph/cugraph/testing/utils.py b/python/cugraph/cugraph/testing/utils.py index d907fac98db..7d593658282 100644 --- a/python/cugraph/cugraph/testing/utils.py +++ b/python/cugraph/cugraph/testing/utils.py @@ -447,24 +447,25 @@ def genFixtureParamsProduct(*args): paramLists = [] ids = [] paramType = pytest.param().__class__ - for (paramList, id) in args: + for (paramList, paramId) in args: + paramListCopy = paramList[:] # do not modify the incoming lists! for i in range(len(paramList)): if not isinstance(paramList[i], paramType): - paramList[i] = pytest.param(paramList[i]) - paramLists.append(paramList) - ids.append(id) + paramListCopy[i] = pytest.param(paramList[i]) + paramLists.append(paramListCopy) + ids.append(paramId) retList = [] for paramCombo in product(*paramLists): values = [p.values[0] for p in paramCombo] marks = [m for p in paramCombo for m in p.marks] id_strings = [] - for (p, id) in zip(paramCombo, ids): - # Assume id is either a string or a callable - if isinstance(id, str): - id_strings.append("%s=%s" % (id, p.values[0])) + for (p, paramId) in zip(paramCombo, ids): + # Assume paramId is either a string or a callable + if isinstance(paramId, str): + id_strings.append("%s=%s" % (paramId, p.values[0])) else: - id_strings.append(id(p.values[0])) + id_strings.append(paramId(p.values[0])) comboid = ",".join(id_strings) retList.append(pytest.param(values, marks=marks, id=comboid)) return retList diff --git a/python/cugraph/cugraph/tests/mg/test_mg_doctests.py b/python/cugraph/cugraph/tests/mg/test_mg_doctests.py index c0660456f19..e678e90d1a2 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_doctests.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_doctests.py @@ -26,16 +26,8 @@ import cudf from cugraph.testing import utils -from dask.distributed import Client -from dask_cuda import LocalCUDACluster -import cugraph.dask.comms.comms as Comms - datasets = utils.RAPIDS_DATASET_ROOT_DIR_PATH -cluster = LocalCUDACluster() -client = Client(cluster) -Comms.initialize(p2p=True) - def _is_public_name(name): return not name.startswith("_") @@ -94,6 +86,12 @@ def _fetch_doctests(): yield from _find_doctests_in_obj(finder, cugraph.dask, 'dask', _is_public_name) +@pytest.fixture(scope="module", + params=_fetch_doctests(), + ids=lambda docstring: docstring.name) +def docstring(request): + return request.param + class TestDoctests: abs_datasets_path = datasets.absolute() @@ -107,10 +105,7 @@ def chdir_to_tmp_path(cls, tmp_path): finally: os.chdir(original_directory) - @pytest.mark.parametrize( - "docstring", _fetch_doctests(), ids=lambda docstring: docstring.name, - ) - def test_docstring(self, docstring): + def test_docstring(self, dask_client, docstring): # We ignore differences in whitespace in the doctest output, and enable # the use of an ellipsis "..." to match any string in the doctest # output. An ellipsis is useful for, e.g., memory addresses or @@ -132,9 +127,3 @@ def test_docstring(self, docstring): f"{results.failed} of {results.attempted} doctests failed for " f"{docstring.name}:\n{doctest_stdout.getvalue()}" ) - - def close_comms(): - Comms.destroy() - client.close() - if cluster: - cluster.close() diff --git a/python/cugraph/cugraph/tests/mg/test_mg_renumber.py b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py index 75586e54af8..2e016591968 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_renumber.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py @@ -15,13 +15,15 @@ import gc import pytest +import tempfile +from pathlib import Path import pandas import numpy as np import dask_cudf import dask import cudf -from cudf.testing import assert_series_equal +from cudf.testing import assert_frame_equal, assert_series_equal import cugraph.dask as dcg import cugraph @@ -287,3 +289,57 @@ def test_mg_renumber_common_col_names(graph_file, dask_client): assert renumber_map.renumbered_dst_col_name != "dst" assert renumber_map.renumbered_src_col_name in renumbered_df.columns assert renumber_map.renumbered_dst_col_name in renumbered_df.columns + + +@pytest.mark.skipif( + is_single_gpu(), reason="skipping MG testing on Single GPU system" +) +def test_pagerank_string_vertex_ids(dask_client): + """ + Ensures string vertex IDs can be used. + + Note: the dask_client fixture sets up and tears down a LocalCUDACluster. + See ../conftest.py + """ + # Use pandas and to_csv() to create a CSV file that can be read in by both + # dask_cudf and cudf. + df = pandas.DataFrame({"src": ['a1', 'a1', 'a2', 'a3'], + "dst": ['a2', 'a3', 'a4', 'a4'], + } + ) + tempdir_object = tempfile.TemporaryDirectory() + input_file = Path(tempdir_object.name) / "graph_input.csv" + df.to_csv(input_file, index=False, header=False, sep="\t") + + # MG + chunksize = dcg.get_chunksize(input_file) + ddf = dask_cudf.read_csv( + input_file, chunksize=chunksize, delimiter="\t", + names=["src", "dst"], + ) + + G_dask = cugraph.Graph(directed=True) + G_dask.from_dask_cudf_edgelist(ddf, source="src", destination="dst") + + mg_results = dcg.pagerank(G_dask) + # Organize results for easy comparison, this does not change the values. MG + # Pagerank defaults to float64, so convert to float32 when comparing to SG + mg_results = (mg_results.compute(). + sort_values("pagerank"). + reset_index(drop=True) + ) + mg_results["pagerank"] = mg_results["pagerank"].astype("float32") + + # SG + df = cudf.read_csv( + input_file, chunksize=chunksize, delimiter="\t", + names=["src", "dst"], + ) + + G = cugraph.Graph(directed=True) + G.from_cudf_edgelist(df, source="src", destination="dst") + + sg_results = cugraph.pagerank(G) + sg_results = sg_results.sort_values("pagerank").reset_index(drop=True) + + assert_frame_equal(sg_results, mg_results) From dbf31a2548839f10cf92029b6190638c5524a022 Mon Sep 17 00:00:00 2001 From: Rick Ratzel Date: Mon, 12 Sep 2022 22:49:26 -0500 Subject: [PATCH 2/3] flake8 --- python/cugraph/cugraph/tests/mg/test_mg_doctests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_doctests.py b/python/cugraph/cugraph/tests/mg/test_mg_doctests.py index e678e90d1a2..6a7a02f255c 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_doctests.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_doctests.py @@ -86,6 +86,7 @@ def _fetch_doctests(): yield from _find_doctests_in_obj(finder, cugraph.dask, 'dask', _is_public_name) + @pytest.fixture(scope="module", params=_fetch_doctests(), ids=lambda docstring: docstring.name) From c6474f78a023074429af017e7fd1ab08db0bfc7d Mon Sep 17 00:00:00 2001 From: Rick Ratzel Date: Tue, 13 Sep 2022 13:09:37 -0500 Subject: [PATCH 3/3] Simplified test and made more consistent by using `dask_cudf.rom_cudf() instead of a tempfile and `read_csv()`. --- .../cugraph/tests/mg/test_mg_renumber.py | 38 ++++++------------- 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_renumber.py b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py index 2e016591968..53179865a50 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_renumber.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py @@ -15,8 +15,6 @@ import gc import pytest -import tempfile -from pathlib import Path import pandas import numpy as np @@ -303,21 +301,19 @@ def test_pagerank_string_vertex_ids(dask_client): """ # Use pandas and to_csv() to create a CSV file that can be read in by both # dask_cudf and cudf. - df = pandas.DataFrame({"src": ['a1', 'a1', 'a2', 'a3'], - "dst": ['a2', 'a3', 'a4', 'a4'], - } - ) - tempdir_object = tempfile.TemporaryDirectory() - input_file = Path(tempdir_object.name) / "graph_input.csv" - df.to_csv(input_file, index=False, header=False, sep="\t") + df = cudf.DataFrame({"src": ['a1', 'a1', 'a2', 'a3'], + "dst": ['a2', 'a3', 'a4', 'a4'], + } + ) + # SG + G = cugraph.Graph(directed=True) + G.from_cudf_edgelist(df, source="src", destination="dst") - # MG - chunksize = dcg.get_chunksize(input_file) - ddf = dask_cudf.read_csv( - input_file, chunksize=chunksize, delimiter="\t", - names=["src", "dst"], - ) + sg_results = cugraph.pagerank(G) + sg_results = sg_results.sort_values("pagerank").reset_index(drop=True) + # MG + ddf = dask_cudf.from_cudf(df, npartitions=2) G_dask = cugraph.Graph(directed=True) G_dask.from_dask_cudf_edgelist(ddf, source="src", destination="dst") @@ -330,16 +326,4 @@ def test_pagerank_string_vertex_ids(dask_client): ) mg_results["pagerank"] = mg_results["pagerank"].astype("float32") - # SG - df = cudf.read_csv( - input_file, chunksize=chunksize, delimiter="\t", - names=["src", "dst"], - ) - - G = cugraph.Graph(directed=True) - G.from_cudf_edgelist(df, source="src", destination="dst") - - sg_results = cugraph.pagerank(G) - sg_results = sg_results.sort_values("pagerank").reset_index(drop=True) - assert_frame_equal(sg_results, mg_results)