diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py index 1c73ebb0216..9f0980d4199 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py @@ -93,6 +93,3 @@ def test_mg_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py index 4530dd3da86..4764c01f0fc 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py @@ -84,5 +84,3 @@ def test_mg_edge_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py index db34c68a054..ff8859a01b1 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION.: +# Copyright (c) 2020-2024, NVIDIA CORPORATION.: # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -111,11 +111,18 @@ def calc_betweenness_centrality( else: edge_attr = None - G = graph_file.get_graph( - download=True, - create_using=cugraph.Graph(directed=directed), - ignore_weights=not edgevals, - ) + G = None + if multi_gpu_batch: + G = graph_file.get_dask_graph( + create_using=cugraph.Graph(directed=directed), ignore_weights=not edgevals + ) + G.enable_batch() + else: + G = graph_file.get_graph( + download=True, + create_using=cugraph.Graph(directed=directed), + ignore_weights=not edgevals, + ) M = G.to_pandas_edgelist().rename( columns={"src": "0", "dst": "1", "wgt": edge_attr} @@ -130,8 +137,6 @@ def calc_betweenness_centrality( ) assert G is not None and Gnx is not None - if multi_gpu_batch: - G.enable_batch() calc_func = None if k is not None and seed is not None: diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py index c94c2dcaff6..35e199093ce 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py @@ -49,14 +49,12 @@ def setup_function(): def get_sg_graph(dataset, directed): - dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -96,7 +94,6 @@ def test_dask_mg_betweenness_centrality( benchmark, ): g = get_sg_graph(dataset, directed) - dataset.unload() dg = get_mg_graph(dataset, directed) random_state = subset_seed @@ -143,6 +140,3 @@ def test_dask_mg_betweenness_centrality( diff = cupy.isclose(mg_bc_results, sg_bc_results) assert diff.all() - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py index 68daff9238c..8606649c745 100644 --- a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py @@ -45,14 +45,12 @@ def setup_function(): def get_sg_graph(dataset, directed): - dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -118,6 +116,3 @@ def test_dask_mg_degree(dask_client, dataset, directed): check_names=False, check_dtype=False, ) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py index 80acfe1c4ad..5b83a05e2a2 100644 --- a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py @@ -47,7 +47,6 @@ def setup_function(): def get_sg_graph(dataset, directed, edge_ids): - dataset.unload() df = dataset.get_edgelist() if edge_ids: if not directed: @@ -71,7 +70,6 @@ def get_sg_graph(dataset, directed, edge_ids): def get_mg_graph(dataset, directed, edge_ids, weight): - dataset.unload() ddf = dataset.get_dask_edgelist() if weight: @@ -178,6 +176,3 @@ def test_dask_mg_edge_betweenness_centrality( assert len(edge_bc_diffs1) == 0 assert len(edge_bc_diffs2) == 0 - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py index 8cd77fb5e24..3a840c82e95 100644 --- a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py @@ -52,7 +52,6 @@ def setup_function(): def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -89,15 +88,11 @@ def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): dataset = DATASETS[0] - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -110,6 +105,3 @@ def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): with pytest.warns(UserWarning, match=warning_msg): dcg.eigenvector_centrality(dg) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py index ebbe5974814..5dcbd8173df 100644 --- a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py @@ -53,7 +53,6 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed): input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -95,16 +94,12 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") @pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -136,14 +131,10 @@ def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg @pytest.mark.parametrize("dataset", DATASETS) def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -156,6 +147,3 @@ def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): with pytest.warns(UserWarning, match=warning_msg): dcg.katz_centrality(dg) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/comms/test_comms_mg.py b/python/cugraph/cugraph/tests/comms/test_comms_mg.py index 75462924c9d..d096eb7e5c2 100644 --- a/python/cugraph/cugraph/tests/comms/test_comms_mg.py +++ b/python/cugraph/cugraph/tests/comms/test_comms_mg.py @@ -16,10 +16,9 @@ import pytest import cugraph.dask as dcg -import cudf -import dask_cudf import cugraph -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import karate, dolphins + # ============================================================================= # Pytest Setup / Teardown - called for each test function @@ -30,12 +29,36 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate, dolphins] IS_DIRECTED = [True, False] -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_pagerank_result(dataset, is_mg): + """Return the cugraph.pagerank result for an MG or SG graph""" + + if is_mg: + dg = dataset.get_dask_graph(store_transposed=True) + return dcg.pagerank(dg).compute() + else: + g = dataset.get_graph(store_transposed=True) + return cugraph.pagerank(g) + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.parametrize("directed", IS_DIRECTED) def test_dask_mg_pagerank(dask_client, directed): @@ -43,62 +66,17 @@ def test_dask_mg_pagerank(dask_client, directed): # Initialize and run pagerank on two distributed graphs # with same communicator - input_data_path1 = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() + input_data_path1 = karate.get_path() print(f"dataset1={input_data_path1}") - chunksize1 = dcg.get_chunksize(input_data_path1) + result_pr1 = get_pagerank_result(karate, is_mg=True) - input_data_path2 = (RAPIDS_DATASET_ROOT_DIR_PATH / "dolphins.csv").as_posix() + input_data_path2 = dolphins.get_path() print(f"dataset2={input_data_path2}") - chunksize2 = dcg.get_chunksize(input_data_path2) - - ddf1 = dask_cudf.read_csv( - input_data_path1, - blocksize=chunksize1, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg1 = cugraph.Graph(directed=directed) - dg1.from_dask_cudf_edgelist(ddf1, "src", "dst") - - result_pr1 = dcg.pagerank(dg1).compute() - - ddf2 = dask_cudf.read_csv( - input_data_path2, - blocksize=chunksize2, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg2 = cugraph.Graph(directed=directed) - dg2.from_dask_cudf_edgelist(ddf2, "src", "dst") - - result_pr2 = dcg.pagerank(dg2).compute() + result_pr2 = get_pagerank_result(dolphins, is_mg=True) # Calculate single GPU pagerank for verification of results - df1 = cudf.read_csv( - input_data_path1, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g1 = cugraph.Graph(directed=directed) - g1.from_cudf_edgelist(df1, "src", "dst") - expected_pr1 = cugraph.pagerank(g1) - - df2 = cudf.read_csv( - input_data_path2, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g2 = cugraph.Graph(directed=directed) - g2.from_cudf_edgelist(df2, "src", "dst") - expected_pr2 = cugraph.pagerank(g2) + expected_pr1 = get_pagerank_result(karate, is_mg=False) + expected_pr2 = get_pagerank_result(dolphins, is_mg=False) # Compare and verify pagerank results diff --git a/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py b/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py index 45ec8eca0e8..311fd7a24bc 100644 --- a/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py +++ b/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py @@ -17,7 +17,6 @@ import cugraph import cugraph.dask as dcg -import dask_cudf from cudf.testing.testing import assert_frame_equal from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.datasets import karate, dolphins, email_Eu_core @@ -36,11 +35,13 @@ def setup_function(): # Parameters # ============================================================================= + DATASETS = [karate, dolphins, email_Eu_core] IS_DIRECTED = [True, False] NUM_VERTICES = [2, 5, 10, 20] OFFSETS = [None] + # ============================================================================= # Helper functions # ============================================================================= @@ -53,15 +54,7 @@ def get_sg_graph(dataset, directed): def get_mg_graph(dataset, directed): - input_data_path = dataset.get_path() - blocksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=blocksize, - delimiter=dataset.metadata["delim"], - names=dataset.metadata["col_names"], - dtype=dataset.metadata["col_types"], - ) + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( ddf, @@ -108,7 +101,7 @@ def test_mg_induced_subgraph( # FIXME: This parameter is not yet tested # mg_offsets = mg_offsets.compute().reset_index(drop=True) - mg_df, mg_offsets = result_induced_subgraph + mg_df, _ = result_induced_subgraph if mg_df is not None and sg_induced_subgraph is not None: # FIXME: 'edges()' or 'view_edgelist()' takes half the edges out if diff --git a/python/cugraph/cugraph/tests/community/test_leiden_mg.py b/python/cugraph/cugraph/tests/community/test_leiden_mg.py index b1908ae10a2..2904ecd12a2 100644 --- a/python/cugraph/cugraph/tests/community/test_leiden_mg.py +++ b/python/cugraph/cugraph/tests/community/test_leiden_mg.py @@ -13,123 +13,56 @@ import pytest - -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils - +from cugraph.datasets import karate_asymmetric, karate, dolphins -try: - from rapids_pytest_benchmark import setFixtureParamNames -except ImportError: - print( - "\n\nWARNING: rapids_pytest_benchmark is not installed, " - "falling back to pytest_benchmark fixtures.\n" - ) - # if rapids_pytest_benchmark is not available, just perfrom time-only - # benchmarking and replace the util functions with nops - import pytest_benchmark +# ============================================================================= +# Parameters +# ============================================================================= - gpubenchmark = pytest_benchmark.plugin.benchmark - def setFixtureParamNames(*args, **kwargs): - pass +DATASETS = [karate, dolphins] +DATASETS_ASYMMETRIC = [karate_asymmetric] # ============================================================================= -# Parameters +# Helper Functions # ============================================================================= -DATASETS_ASYMMETRIC = [utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv"] - - -############################################################################### -# Fixtures -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.fixture( - scope="module", - params=DATASETS_ASYMMETRIC, - ids=[f"dataset={d.as_posix()}" for d in DATASETS_ASYMMETRIC], -) -def daskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates a directed Graph. - """ - # Since parameterized fixtures do not assign param names to param values, - # manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg -@pytest.fixture( - scope="module", - params=utils.DATASETS_UNDIRECTED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNDIRECTED], -) -def uddaskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates an undirected Graph. - """ - # Since parameterized fixtures do not assign param names to param - # values, manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") +def get_mg_graph(dataset, directed): + """Returns an MG graph""" + ddf = dataset.get_dask_edgelist() + + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist(ddf, "src", "dst", "wgt") + return dg -############################################################################### +# ============================================================================= # Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= # FIXME: Implement more robust tests + + @pytest.mark.mg -def test_mg_leiden_with_edgevals_directed_graph(daskGraphFromDataset): +@pytest.mark.parametrize("dataset", DATASETS_ASYMMETRIC) +def test_mg_leiden_with_edgevals_directed_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=True) # Directed graphs are not supported by Leiden and a ValueError should be # raised with pytest.raises(ValueError): - parts, mod = dcg.leiden(daskGraphFromDataset) + parts, mod = dcg.leiden(dg) -############################################################################### -# Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -# FIXME: Implement more robust tests @pytest.mark.mg -def test_mg_leiden_with_edgevals_undirected_graph(uddaskGraphFromDataset): - parts, mod = dcg.leiden(uddaskGraphFromDataset) +@pytest.mark.parametrize("dataset", DATASETS) +def test_mg_leiden_with_edgevals_undirected_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=False) + parts, mod = dcg.leiden(dg) # FIXME: either call Nx with the same dataset and compare results, or # hardcode golden results to compare to. diff --git a/python/cugraph/cugraph/tests/community/test_louvain_mg.py b/python/cugraph/cugraph/tests/community/test_louvain_mg.py index 19fffe96b5c..0dff7f1c8b0 100644 --- a/python/cugraph/cugraph/tests/community/test_louvain_mg.py +++ b/python/cugraph/cugraph/tests/community/test_louvain_mg.py @@ -14,122 +14,41 @@ import pytest import cugraph.dask as dcg +from cugraph.datasets import karate_asymmetric, karate, dolphins -import cugraph -import dask_cudf -from cugraph.testing import utils - - -try: - from rapids_pytest_benchmark import setFixtureParamNames -except ImportError: - print( - "\n\nWARNING: rapids_pytest_benchmark is not installed, " - "falling back to pytest_benchmark fixtures.\n" - ) - - # if rapids_pytest_benchmark is not available, just perfrom time-only - # benchmarking and replace the util functions with nops - import pytest_benchmark - - gpubenchmark = pytest_benchmark.plugin.benchmark - - def setFixtureParamNames(*args, **kwargs): - pass +from test_leiden_mg import get_mg_graph # ============================================================================= # Parameters # ============================================================================= -DATASETS_ASYMMETRIC = [utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv"] - - -############################################################################### -# Fixtures -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.fixture( - scope="module", - params=DATASETS_ASYMMETRIC, - ids=[f"dataset={d.as_posix()}" for d in DATASETS_ASYMMETRIC], -) -def daskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates a directed Graph. - """ - # Since parameterized fixtures do not assign param names to param values, - # manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg -@pytest.fixture( - scope="module", - params=utils.DATASETS_UNDIRECTED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNDIRECTED], -) -def uddaskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates an undirected Graph. - """ - # Since parameterized fixtures do not assign param names to param - # values, manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param +DATASETS_ASYMMETRIC = DATASETS_ASYMMETRIC = [karate_asymmetric] +DATASETS = [karate, dolphins] - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg - - -############################################################################### +# ============================================================================= # Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= # FIXME: Implement more robust tests + + @pytest.mark.mg -def test_mg_louvain_with_edgevals_directed_graph(daskGraphFromDataset): +@pytest.mark.parametrize("dataset", DATASETS_ASYMMETRIC) +def test_mg_louvain_with_edgevals_directed_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=True) # Directed graphs are not supported by Louvain and a ValueError should be # raised with pytest.raises(ValueError): - parts, mod = dcg.louvain(daskGraphFromDataset) + parts, mod = dcg.louvain(dg) -############################################################################### -# Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -# FIXME: Implement more robust tests @pytest.mark.mg -def test_mg_louvain_with_edgevals_undirected_graph(uddaskGraphFromDataset): - parts, mod = dcg.louvain(uddaskGraphFromDataset) +@pytest.mark.parametrize("dataset", DATASETS) +def test_mg_louvain_with_edgevals_undirected_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=False) + parts, mod = dcg.louvain(dg) # FIXME: either call Nx with the same dataset and compare results, or # hardcode golden results to compare to. diff --git a/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py b/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py index 0a052845cf8..e2c47af8a1b 100644 --- a/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py +++ b/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py @@ -16,115 +16,81 @@ import random import pytest -import cudf -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils -from pylibcugraph.testing.utils import gen_fixture_params_product +from cugraph.datasets import karate, dolphins # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - ([True, False], "start_list"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "start_list", "edgevals"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the triangle - count algo. - """ - start_list = input_combo["start_list"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) - input_combo["SGGraph"] = G - if start_list: +DATASETS = [karate, dolphins] +START_LIST = [True, False] + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_graph(dataset, directed, start): + G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + if start: # sample k nodes from the cuGraph graph - k = random.randint(1, 10) - srcs = G.view_edge_list()[G.source_columns] - dsts = G.view_edge_list()[G.destination_columns] - nodes = cudf.concat([srcs, dsts]).drop_duplicates() - start_list = nodes.sample(k) + start = G.select_random_vertices(num_vertices=random.randint(1, 10)) else: - start_list = None + start = None - sg_triangle_results = cugraph.triangle_count(G, start_list) - sg_triangle_results = sg_triangle_results.sort_values("vertex").reset_index( - drop=True - ) + return G, start - input_combo["sg_triangle_results"] = sg_triangle_results - input_combo["start_list"] = start_list - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) +def get_mg_graph(dataset, directed): + ddf = dataset.get_dask_edgelist() + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( - ddf, source="src", destination="dst", edge_attr="value", renumber=True + ddf, source="src", destination="dst", edge_attr="wgt", renumber=True ) - input_combo["MGGraph"] = dg - - return input_combo + return dg # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_triangles(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("start", START_LIST) +def test_sg_triangles(dask_client, dataset, start, benchmark): # This test is only for benchmark purposes. sg_triangle_results = None - G = input_expected_output["SGGraph"] - start_list = input_expected_output["start_list"] - sg_triangle_results = benchmark(cugraph.triangle_count, G, start_list) + G, start = get_sg_graph(dataset, False, start) + + sg_triangle_results = benchmark(cugraph.triangle_count, G, start) + sg_triangle_results.sort_values("vertex").reset_index(drop=True) assert sg_triangle_results is not None @pytest.mark.mg -def test_triangles(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - start_list = input_expected_output["start_list"] - - result_counts = benchmark(dcg.triangle_count, dg, start_list) +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("start", START_LIST) +def test_triangles(dask_client, dataset, start, benchmark): + G, start = get_sg_graph(dataset, False, start) + dg = get_mg_graph(dataset, False) + result_counts = benchmark(dcg.triangle_count, dg, start) result_counts = ( result_counts.drop_duplicates() .compute() @@ -132,8 +98,9 @@ def test_triangles(dask_client, benchmark, input_expected_output): .reset_index(drop=True) .rename(columns={"counts": "mg_counts"}) ) - - expected_output = input_expected_output["sg_triangle_results"] + expected_output = ( + cugraph.triangle_count(G, start).sort_values("vertex").reset_index(drop=True) + ) # Update the mg triangle count with sg triangle count results # for easy comparison using cuDF DataFrame methods. diff --git a/python/cugraph/cugraph/tests/components/test_connectivity_mg.py b/python/cugraph/cugraph/tests/components/test_connectivity_mg.py index 26e8ed17bcb..4ab251c0e29 100644 --- a/python/cugraph/cugraph/tests/components/test_connectivity_mg.py +++ b/python/cugraph/cugraph/tests/components/test_connectivity_mg.py @@ -15,11 +15,9 @@ import pytest -import cudf -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import netscience # ============================================================================= @@ -31,42 +29,47 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [netscience] # Directed graph is not currently supported IS_DIRECTED = [False, True] -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= +# Helper +# ============================================================================= + + +def get_mg_graph(dataset, directed): + """Returns an MG graph""" + ddf = dataset.get_dask_edgelist() + + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist(ddf, "src", "dst", "wgt") + + return dg + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_wcc(dask_client, directed): - - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv").as_posix() +def test_dask_mg_wcc(dask_client, dataset, directed): + input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g = cugraph.Graph(directed=directed) - g.from_cudf_edgelist(df, "src", "dst", renumber=True) - dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist(ddf, "src", "dst") + g = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + dg = get_mg_graph(dataset, directed) + # breakpoint() if not directed: expected_dist = cugraph.weakly_connected_components(g) result_dist = dcg.weakly_connected_components(dg) diff --git a/python/cugraph/cugraph/tests/core/test_core_number_mg.py b/python/cugraph/cugraph/tests/core/test_core_number_mg.py index f771ce513eb..3d9a7bef5be 100644 --- a/python/cugraph/cugraph/tests/core/test_core_number_mg.py +++ b/python/cugraph/cugraph/tests/core/test_core_number_mg.py @@ -15,107 +15,64 @@ import pytest -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils -from pylibcugraph.testing.utils import gen_fixture_params_product +from cugraph.datasets import karate, dolphins, karate_asymmetric # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED -degree_type = ["incoming", "outgoing", "bidirectional"] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - (degree_type, "degree_type"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "degree_type"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the Core number - algo. - """ - degree_type = input_combo["degree_type"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) - input_combo["SGGraph"] = G - sg_core_number_results = cugraph.core_number(G, degree_type) - sg_core_number_results = sg_core_number_results.sort_values("vertex").reset_index( - drop=True - ) +DATASETS = [karate, dolphins] +DEGREE_TYPE = ["incoming", "outgoing", "bidirectional"] - input_combo["sg_core_number_results"] = sg_core_number_results - input_combo["degree_type"] = degree_type - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, source="src", destination="dst", edge_attr="value", renumber=True - ) +# ============================================================================= +# Helper Functions +# ============================================================================= - input_combo["MGGraph"] = dg - return input_combo +def get_sg_results(dataset, degree_type): + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) + res = cugraph.core_number(G, degree_type) + res = res.sort_values("vertex").reset_index(drop=True) + return res # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_core_number(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_sg_core_number(dask_client, dataset, degree_type, benchmark): # This test is only for benchmark purposes. sg_core_number_results = None - G = input_expected_output["SGGraph"] - degree_type = input_expected_output["degree_type"] - + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) sg_core_number_results = benchmark(cugraph.core_number, G, degree_type) assert sg_core_number_results is not None @pytest.mark.mg -def test_core_number(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - degree_type = input_expected_output["degree_type"] +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_core_number(dask_client, dataset, degree_type, benchmark): + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) result_core_number = benchmark(dcg.core_number, dg, degree_type) - result_core_number = ( result_core_number.drop_duplicates() .compute() @@ -124,7 +81,7 @@ def test_core_number(dask_client, benchmark, input_expected_output): .rename(columns={"core_number": "mg_core_number"}) ) - expected_output = input_expected_output["sg_core_number_results"] + expected_output = get_sg_results(dataset, degree_type) # Update the mg core number with sg core number results # for easy comparison using cuDF DataFrame methods. @@ -135,30 +92,10 @@ def test_core_number(dask_client, benchmark, input_expected_output): @pytest.mark.mg -def test_core_number_invalid_input(input_expected_output): - input_data_path = ( - utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv" - ).as_posix() - - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - ) +def test_core_number_invalid_input(): + dg = karate_asymmetric.get_graph(create_using=cugraph.Graph(directed=True)) invalid_degree_type = 3 - dg = input_expected_output["MGGraph"] + with pytest.raises(ValueError): dcg.core_number(dg, invalid_degree_type) diff --git a/python/cugraph/cugraph/tests/core/test_k_core_mg.py b/python/cugraph/cugraph/tests/core/test_k_core_mg.py index b2ac18cf3a9..c7ad6d2d41d 100644 --- a/python/cugraph/cugraph/tests/core/test_k_core_mg.py +++ b/python/cugraph/cugraph/tests/core/test_k_core_mg.py @@ -1,4 +1,5 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Copyright (c) 2024, NVIDIA CORPORATION. + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,58 +16,39 @@ import pytest -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils +from cugraph.datasets import karate, dolphins from cudf.testing.testing import assert_frame_equal from cugraph.structure.symmetrize import symmetrize_df -from pylibcugraph.testing import gen_fixture_params_product # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED - -core_number = [True, False] -degree_type = ["bidirectional", "outgoing", "incoming"] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), (core_number, "core_number"), (degree_type, "degree_type") -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "core_number", "degree_type"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the Core number - algo. - """ - core_number = input_combo["core_number"] - degree_type = input_combo["degree_type"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) + + +DATASETS = [karate, dolphins] +CORE_NUMBER = [True, False] +DEGREE_TYPE = ["bidirectional", "outgoing", "incoming"] + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_results(dataset, core_number, degree_type): + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) if core_number: # compute the core_number @@ -74,62 +56,41 @@ def input_expected_output(dask_client, input_combo): else: core_number = None - input_combo["core_number"] = core_number - - input_combo["SGGraph"] = G - sg_k_core_graph = cugraph.k_core( G, core_number=core_number, degree_type=degree_type ) - sg_k_core_results = sg_k_core_graph.view_edge_list() + res = sg_k_core_graph.view_edge_list() # FIXME: The result will come asymetric. Symmetrize the results srcCol = sg_k_core_graph.source_columns dstCol = sg_k_core_graph.destination_columns wgtCol = sg_k_core_graph.weight_column - sg_k_core_results = ( - symmetrize_df(sg_k_core_results, srcCol, dstCol, wgtCol) + res = ( + symmetrize_df(res, srcCol, dstCol, wgtCol) .sort_values([srcCol, dstCol]) .reset_index(drop=True) ) - input_combo["sg_k_core_results"] = sg_k_core_results - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - # FIXME: False when renumbering (C++ and python renumbering) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - ) - - input_combo["MGGraph"] = dg - - return input_combo + return res, core_number # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_k_core(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("core_number", CORE_NUMBER) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_sg_k_core(dask_client, dataset, core_number, degree_type, benchmark): # This test is only for benchmark purposes. sg_k_core = None - G = input_expected_output["SGGraph"] - core_number = input_expected_output["core_number"] - degree_type = input_expected_output["degree_type"] - + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) + if core_number: + # compute the core_number + core_number = cugraph.core_number(G, degree_type=degree_type) + else: + core_number = None sg_k_core = benchmark( cugraph.k_core, G, core_number=core_number, degree_type=degree_type ) @@ -137,15 +98,16 @@ def test_sg_k_core(dask_client, benchmark, input_expected_output): @pytest.mark.mg -def test_dask_mg_k_core(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - core_number = input_expected_output["core_number"] +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("core_number", CORE_NUMBER) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmark): + expected_k_core_results, core_number = get_sg_results( + dataset, core_number, degree_type + ) + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) k_core_results = benchmark(dcg.k_core, dg, core_number=core_number) - - expected_k_core_results = input_expected_output["sg_k_core_results"] - k_core_results = ( k_core_results.compute() .sort_values(["src", "dst"]) @@ -160,36 +122,13 @@ def test_dask_mg_k_core(dask_client, benchmark, input_expected_output): @pytest.mark.mg def test_dask_mg_k_core_invalid_input(dask_client): - input_data_path = datasets[0] - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + dataset = DATASETS[0] + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=True)) - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - store_transposed=True, - ) with pytest.raises(ValueError): dcg.k_core(dg) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - store_transposed=True, - ) + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) degree_type = "invalid" with pytest.raises(ValueError): diff --git a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py index 45a3c46309d..64917d0c747 100644 --- a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py @@ -24,33 +24,61 @@ import dask_cudf import cugraph.dask as dcg import cugraph +from cugraph.datasets import karate, karate_disjoint from cugraph.testing import utils from cugraph.structure.number_map import NumberMap from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH from cudf.testing import assert_frame_equal, assert_series_equal # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate] +DATASETS_UNRENUMBERED = [karate_disjoint] IS_DIRECTED = [True, False] +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_graph(dataset, directed): + dataset.unload() + g = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + + return g + + +def get_mg_graph(dataset, directed): + dataset.unload() + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=directed)) + + return dg + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber(graph_file, dask_client): - - M = utils.read_csv_for_nx(graph_file) +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber(dataset, dask_client): + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) @@ -96,13 +124,9 @@ def test_mg_renumber(graph_file, dask_client): @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber_add_internal_vertex_id(graph_file, dask_client): - M = utils.read_csv_for_nx(graph_file) +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber_add_internal_vertex_id(dataset, dask_client): + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) @@ -131,33 +155,13 @@ def test_mg_renumber_add_internal_vertex_id(graph_file, dask_client): @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_pagerank(dask_client, directed): +def test_dask_mg_pagerank(dask_client, dataset, directed): pandas.set_option("display.max_rows", 10000) - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g = cugraph.Graph(directed=directed) - g.from_cudf_edgelist(df, "src", "dst") - - dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist(ddf, "src", "dst") + g = get_sg_graph(dataset, directed) + dg = get_mg_graph(dataset, directed) expected_pr = cugraph.pagerank(g) result_pr = dcg.pagerank(dg).compute() @@ -178,20 +182,18 @@ def test_dask_mg_pagerank(dask_client, directed): print("Mismatches:", err) assert err == 0 + dataset.unload() + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber_common_col_names(graph_file, dask_client): +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber_common_col_names(dataset, dask_client): """ Ensure that commonly-used column names in the input do not conflict with names used internally by NumberMap. """ - M = utils.read_csv_for_nx(graph_file) + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) diff --git a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py index 3bdb5c079ef..09936e954e8 100644 --- a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -17,73 +17,54 @@ import dask_cudf import numpy as np -from cugraph.testing import UNDIRECTED_DATASETS, karate_disjoint - +from cugraph.datasets import karate, dolphins, karate_disjoint from cugraph.structure.replicate_edgelist import replicate_edgelist from cudf.testing.testing import assert_frame_equal -from pylibcugraph.testing.utils import gen_fixture_params_product # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + edgeWeightCol = "weights" edgeIdCol = "edge_id" edgeTypeCol = "edge_type" srcCol = "src" dstCol = "dst" - -input_data = UNDIRECTED_DATASETS + [karate_disjoint] -datasets = [pytest.param(d) for d in input_data] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - ([True, False], "distributed"), - ([True, False], "use_weights"), - ([True, False], "use_edge_ids"), - ([True, False], "use_edge_type_ids"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - return dict( - zip( - ( - "graph_file", - "use_weights", - "use_edge_ids", - "use_edge_type_ids", - "distributed", - ), - request.param, - ) - ) +DATASETS = [karate, dolphins, karate_disjoint] +IS_DISTRIBUTED = [True, False] +USE_WEIGHTS = [True, False] +USE_EDGE_IDS = [True, False] +USE_EDGE_TYPE_IDS = [True, False] # ============================================================================= # Tests # ============================================================================= -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.mark.mg -def test_mg_replicate_edgelist(dask_client, input_combo): - df = input_combo["graph_file"].get_edgelist() - distributed = input_combo["distributed"] - use_weights = input_combo["use_weights"] - use_edge_ids = input_combo["use_edge_ids"] - use_edge_type_ids = input_combo["use_edge_type_ids"] + +@pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("distributed", IS_DISTRIBUTED) +@pytest.mark.parametrize("use_weights", USE_WEIGHTS) +@pytest.mark.parametrize("use_edge_ids", USE_EDGE_IDS) +@pytest.mark.parametrize("use_edge_type_ids", USE_EDGE_TYPE_IDS) +def test_mg_replicate_edgelist( + dask_client, dataset, distributed, use_weights, use_edge_ids, use_edge_type_ids +): + dataset.unload() + df = dataset.get_edgelist() columns = [srcCol, dstCol] weight = None diff --git a/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py b/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py index 05cc06e6282..913443fe400 100644 --- a/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -25,6 +25,8 @@ # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect()