Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes bug in NumberMap preventing use of string vertex IDs for MG graphs #2688

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/structure/number_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,6 @@ def renumber_and_segment(
renumber_map.set_renumbered_col_names(
src_col_names, dst_col_names, df.columns)

id_type = df[src_col_names[0]].dtype
if isinstance(df, cudf.DataFrame):
renumber_map.implementation = NumberMap.SingleGPU(
df, src_col_names, dst_col_names, renumber_map.id_type,
Expand Down Expand Up @@ -615,6 +614,7 @@ def get_renumbered_df(id_type, data):
.astype(id_type)
return data[2]

id_type = df[renumber_map.renumbered_src_col_name].dtype
renumbering_map = dask_cudf.from_delayed(
[client.submit(get_renumber_map,
id_type,
Expand Down
19 changes: 10 additions & 9 deletions python/cugraph/cugraph/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,24 +447,25 @@ def genFixtureParamsProduct(*args):
paramLists = []
ids = []
paramType = pytest.param().__class__
for (paramList, id) in args:
for (paramList, paramId) in args:
paramListCopy = paramList[:] # do not modify the incoming lists!
for i in range(len(paramList)):
if not isinstance(paramList[i], paramType):
paramList[i] = pytest.param(paramList[i])
paramLists.append(paramList)
ids.append(id)
paramListCopy[i] = pytest.param(paramList[i])
paramLists.append(paramListCopy)
ids.append(paramId)

retList = []
for paramCombo in product(*paramLists):
values = [p.values[0] for p in paramCombo]
marks = [m for p in paramCombo for m in p.marks]
id_strings = []
for (p, id) in zip(paramCombo, ids):
# Assume id is either a string or a callable
if isinstance(id, str):
id_strings.append("%s=%s" % (id, p.values[0]))
for (p, paramId) in zip(paramCombo, ids):
# Assume paramId is either a string or a callable
if isinstance(paramId, str):
id_strings.append("%s=%s" % (paramId, p.values[0]))
else:
id_strings.append(id(p.values[0]))
id_strings.append(paramId(p.values[0]))
comboid = ",".join(id_strings)
retList.append(pytest.param(values, marks=marks, id=comboid))
return retList
Expand Down
26 changes: 8 additions & 18 deletions python/cugraph/cugraph/tests/mg/test_mg_doctests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,8 @@
import cudf
from cugraph.testing import utils

from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import cugraph.dask.comms.comms as Comms

datasets = utils.RAPIDS_DATASET_ROOT_DIR_PATH

cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)


def _is_public_name(name):
return not name.startswith("_")
Expand Down Expand Up @@ -95,6 +87,13 @@ def _fetch_doctests():
_is_public_name)


@pytest.fixture(scope="module",
params=_fetch_doctests(),
ids=lambda docstring: docstring.name)
def docstring(request):
return request.param


class TestDoctests:
abs_datasets_path = datasets.absolute()

Expand All @@ -107,10 +106,7 @@ def chdir_to_tmp_path(cls, tmp_path):
finally:
os.chdir(original_directory)

@pytest.mark.parametrize(
"docstring", _fetch_doctests(), ids=lambda docstring: docstring.name,
)
def test_docstring(self, docstring):
def test_docstring(self, dask_client, docstring):
# We ignore differences in whitespace in the doctest output, and enable
# the use of an ellipsis "..." to match any string in the doctest
# output. An ellipsis is useful for, e.g., memory addresses or
Expand All @@ -132,9 +128,3 @@ def test_docstring(self, docstring):
f"{results.failed} of {results.attempted} doctests failed for "
f"{docstring.name}:\n{doctest_stdout.getvalue()}"
)

def close_comms():
Comms.destroy()
client.close()
if cluster:
cluster.close()
42 changes: 41 additions & 1 deletion python/cugraph/cugraph/tests/mg/test_mg_renumber.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import dask_cudf
import dask
import cudf
from cudf.testing import assert_series_equal
from cudf.testing import assert_frame_equal, assert_series_equal

import cugraph.dask as dcg
import cugraph
Expand Down Expand Up @@ -287,3 +287,43 @@ def test_mg_renumber_common_col_names(graph_file, dask_client):
assert renumber_map.renumbered_dst_col_name != "dst"
assert renumber_map.renumbered_src_col_name in renumbered_df.columns
assert renumber_map.renumbered_dst_col_name in renumbered_df.columns


@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
def test_pagerank_string_vertex_ids(dask_client):
"""
Ensures string vertex IDs can be used.

Note: the dask_client fixture sets up and tears down a LocalCUDACluster.
See ../conftest.py
"""
# Use pandas and to_csv() to create a CSV file that can be read in by both
# dask_cudf and cudf.
df = cudf.DataFrame({"src": ['a1', 'a1', 'a2', 'a3'],
"dst": ['a2', 'a3', 'a4', 'a4'],
}
)
# SG
G = cugraph.Graph(directed=True)
G.from_cudf_edgelist(df, source="src", destination="dst")

sg_results = cugraph.pagerank(G)
sg_results = sg_results.sort_values("pagerank").reset_index(drop=True)

# MG
ddf = dask_cudf.from_cudf(df, npartitions=2)
G_dask = cugraph.Graph(directed=True)
G_dask.from_dask_cudf_edgelist(ddf, source="src", destination="dst")

mg_results = dcg.pagerank(G_dask)
# Organize results for easy comparison, this does not change the values. MG
# Pagerank defaults to float64, so convert to float32 when comparing to SG
mg_results = (mg_results.compute().
sort_values("pagerank").
reset_index(drop=True)
)
mg_results["pagerank"] = mg_results["pagerank"].astype("float32")

assert_frame_equal(sg_results, mg_results)