diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 04bb4f9b035..dee41001058 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -456,7 +456,7 @@ def add_vertex_data( self.__vertex_prop_dtypes.update(new_col_info) # Join on shared columns and the indices - tmp_df = tmp_df.set_index(self.vertex_col_name) + tmp_df = tmp_df.persist().set_index(self.vertex_col_name).persist() cols = self.__vertex_prop_dataframe.columns.intersection( tmp_df.columns ).to_list() @@ -465,7 +465,9 @@ def add_vertex_data( self.__vertex_prop_dataframe = ( self.__vertex_prop_dataframe.reset_index() .merge(tmp_df.reset_index(), on=cols, how="outer") + .persist() .set_index(self.vertex_col_name) + .persist() ) # self.__vertex_prop_dataframe = \ # self.__vertex_prop_dataframe.merge(tmp_df, on=cols, how="outer") @@ -479,8 +481,6 @@ def add_vertex_data( self.__vertex_prop_eval_dict[ self.vertex_col_name ] = self.__vertex_prop_dataframe.index - # Should we persist? - # self.__vertex_prop_dataframe = self.__vertex_prop_dataframe.persist() def get_vertex_data(self, vertex_ids=None, types=None, columns=None): """ @@ -666,16 +666,15 @@ def add_edge_data( tmp_df[self.edge_id_col_name] = ( tmp_df[self.edge_id_col_name].cumsum() + starting_eid ) - tmp_df = tmp_df.persist() - tmp_df = tmp_df.set_index(self.edge_id_col_name) - tmp_df = tmp_df.persist() + tmp_df = tmp_df.persist().set_index(self.edge_id_col_name).persist() self.__last_edge_id = starting_eid + len(tmp_df) else: - tmp_df = tmp_df.persist() - tmp_df = tmp_df.rename( - columns={edge_id_col_name: self.edge_id_col_name} - ).set_index(self.edge_id_col_name) - tmp_df = tmp_df.persist() + tmp_df = ( + tmp_df.rename(columns={edge_id_col_name: self.edge_id_col_name}) + .persist() + .set_index(self.edge_id_col_name) + .persist() + ) if property_columns: # all columns @@ -701,7 +700,9 @@ def add_edge_data( self.__edge_prop_dataframe = ( self.__edge_prop_dataframe.reset_index() .merge(tmp_df.reset_index(), on=cols, how="outer") + .persist() .set_index(self.edge_id_col_name) + .persist() ) # self.__edge_prop_dataframe = \ # self.__edge_prop_dataframe.merge(tmp_df, on=cols, how="outer") @@ -717,8 +718,6 @@ def add_edge_data( self.__edge_prop_eval_dict[ self.edge_id_col_name ] = self.__edge_prop_dataframe.index - # Should we persist? - # self.__edge_prop_dataframe = self.__edge_prop_dataframe.persist() def get_edge_data(self, edge_ids=None, types=None, columns=None): """ @@ -1118,9 +1117,9 @@ def renumber_vertices_by_type(self): df[self.vertex_col_name] = 1 df[self.vertex_col_name] = df[self.vertex_col_name].cumsum() - 1 - self.__vertex_prop_dataframe = df.set_index( - self.vertex_col_name, sorted=True - ).persist() + self.__vertex_prop_dataframe = ( + df.persist().set_index(self.vertex_col_name, sorted=True).persist() + ) # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 df = self._vertex_type_value_counts @@ -1163,9 +1162,9 @@ def renumber_edges_by_type(self): df[self.edge_id_col_name] = 1 df[self.edge_id_col_name] = df[self.edge_id_col_name].cumsum() - 1 - self.__edge_prop_dataframe = df.set_index( - self.edge_id_col_name, sorted=True - ).persist() + self.__edge_prop_dataframe = ( + df.persist().set_index(self.edge_id_col_name, sorted=True).persist() + ) # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 df = self._edge_type_value_counts diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index d5242aeada5..ad1e21ea027 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -20,6 +20,7 @@ from cudf.testing import assert_frame_equal, assert_series_equal import cugraph.dask as dcg +from cugraph.experimental.datasets import cyber from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH from cugraph.testing import utils @@ -28,6 +29,12 @@ # by trying to import rapids_pytest_benchmark, and if that fails, set # "gpubenchmark" to the standard "benchmark" fixture provided by # pytest-benchmark. +try: + import rapids_pytest_benchmark # noqa: F401 +except ImportError: + import pytest_benchmark + + gpubenchmark = pytest_benchmark.plugin.benchmark import cugraph @@ -942,3 +949,31 @@ def test_add_data_noncontiguous(): cur_df["edge_type"], check_names=False, ) + + +# ============================================================================= +# Benchmarks +# ============================================================================= + + +@pytest.mark.slow +@pytest.mark.parametrize("N", [1, 3, 10, 30]) +def bench_add_edges_cyber(gpubenchmark, dask_client, N): + from cugraph.experimental import MGPropertyGraph + + # Partition the dataframe to add in chunks + cyber_df = cyber.get_edgelist() + chunk = (len(cyber_df) + N - 1) // N + dfs = [ + dask_cudf.from_cudf(cyber_df.iloc[i * chunk : (i + 1) * chunk], npartitions=2) + for i in range(N) + ] + + def func(): + mpG = MGPropertyGraph() + for df in dfs: + mpG.add_edge_data(df, ("srcip", "dstip")) + df = mpG.get_edge_data().compute() + assert len(df) == len(cyber_df) + + gpubenchmark(func) diff --git a/python/cugraph/cugraph/tests/test_property_graph.py b/python/cugraph/cugraph/tests/test_property_graph.py index 18acf43e7e3..f9df0e9d5a0 100644 --- a/python/cugraph/cugraph/tests/test_property_graph.py +++ b/python/cugraph/cugraph/tests/test_property_graph.py @@ -1962,3 +1962,23 @@ def func(): ) gpubenchmark(func) + + +@pytest.mark.slow +@pytest.mark.parametrize("N", [1, 3, 10, 30]) +def bench_add_edges_cyber(gpubenchmark, N): + from cugraph.experimental import PropertyGraph + + # Partition the dataframe to add in chunks + cyber_df = cyber.get_edgelist() + chunk = (len(cyber_df) + N - 1) // N + dfs = [cyber_df.iloc[i * chunk : (i + 1) * chunk] for i in range(N)] + + def func(): + pG = PropertyGraph() + for df in dfs: + pG.add_edge_data(df, ("srcip", "dstip")) + df = pG.get_edge_data() + assert len(df) == len(cyber_df) + + gpubenchmark(func)