diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 780c6212797..53f7757eea3 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -787,7 +787,7 @@ def add_edge_data( self.dst_col_name: dataframe[vertex_col_names[1]].dtype, }, ) - temp_dataframe.index.name = self.edge_id_col_name + temp_dataframe.index = temp_dataframe.index.rename(self.edge_id_col_name) if edge_id_col_name is not None: temp_dataframe.index = temp_dataframe.index.astype( dataframe[edge_id_col_name].dtype @@ -1331,16 +1331,16 @@ def renumber_vertices_by_type(self, prev_id_column=None): df = self.__vertex_prop_dataframe index_dtype = df.index.dtype - if self.__edge_prop_dataframe is not None: - # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 - cat_dtype = df.dtypes[self.type_col_name] - df[self.type_col_name] = df[self.type_col_name].astype(str) - - df = df.reset_index().sort_values(by=TCN) - # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 - df[self.type_col_name] = df[self.type_col_name].astype(cat_dtype) + # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 + cat_dtype = df.dtypes[TCN] + df[TCN] = df[TCN].astype(str) + # Include self.vertex_col_name when sorting by values to ensure we can + # evenly distribute the data across workers. + df = df.reset_index().persist() + df = df.sort_values(by=[TCN, self.vertex_col_name], ignore_index=True).persist() + if self.__edge_prop_dataframe is not None: new_name = f"new_{self.vertex_col_name}" df[new_name] = 1 df[new_name] = df[new_name].cumsum() - 1 @@ -1360,6 +1360,9 @@ def renumber_vertices_by_type(self, prev_id_column=None): self.__edge_prop_dataframe.index = self.__edge_prop_dataframe.index.astype( edge_index_dtype ) + self.__edge_prop_dataframe.index = self.__edge_prop_dataframe.index.rename( + self.edge_id_col_name + ) if prev_id_column is None: df[self.vertex_col_name] = df[new_name] del df[new_name] @@ -1371,14 +1374,14 @@ def renumber_vertices_by_type(self, prev_id_column=None): } ) else: - if prev_id_column is None: - df = df.sort_values(by=self.type_col_name, ignore_index=True) - else: - df.index.name = prev_id_column - df = df.sort_values(by=self.type_col_name).reset_index() + if prev_id_column is not None: + df[prev_id_column] = df[self.vertex_col_name] df[self.vertex_col_name] = 1 df[self.vertex_col_name] = df[self.vertex_col_name].cumsum() - 1 + # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 + df[TCN] = df[TCN].astype(cat_dtype) + df[self.vertex_col_name] = df[self.vertex_col_name].astype(index_dtype) self.__vertex_prop_dataframe = ( df.persist().set_index(self.vertex_col_name, sorted=True).persist() @@ -1424,11 +1427,14 @@ def renumber_edges_by_type(self, prev_id_column=None): cat_dtype = df.dtypes[self.type_col_name] df[self.type_col_name] = df[self.type_col_name].astype(str) - if prev_id_column is None: - df = df.sort_values(by=self.type_col_name, ignore_index=True) - else: - df.index = df.index.rename(prev_id_column) - df = df.sort_values(by=self.type_col_name).reset_index() + # Include self.edge_id_col_name when sorting by values to ensure we can + # evenly distribute the data across workers. + df = df.reset_index().persist() + df = df.sort_values( + by=[self.type_col_name, self.edge_id_col_name], ignore_index=True + ).persist() + if prev_id_column is not None: + df[prev_id_column] = df[self.edge_id_col_name] # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 df[self.type_col_name] = df[self.type_col_name].astype(cat_dtype) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index 8978a2a1e5d..88290163ae7 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -865,12 +865,13 @@ def test_renumber_vertices_by_type(dataset1_MGPropertyGraph, prev_id_column): assert df_id_ranges.loc[key, "start"] == start assert df_id_ranges.loc[key, "stop"] == stop df = pG.get_vertex_data(types=[key]).compute().to_pandas() + df = df.reset_index(drop=True) assert len(df) == stop - start + 1 - assert (df["_VERTEX_"] == cudf.Series(range(start, stop + 1))).all() + assert (df["_VERTEX_"] == pd.Series(range(start, stop + 1))).all() if prev_id_column is not None: cur = df[prev_id_column].sort_values() expected = sorted(x for x, *args in data[key][1]) - assert (cur == cudf.Series(expected, index=cur.index)).all() + assert (cur == pd.Series(expected, index=cur.index)).all() # Make sure we renumber vertex IDs in edge data too df = pG.get_edge_data().compute() @@ -905,8 +906,9 @@ def test_renumber_edges_by_type(dataset1_MGPropertyGraph, prev_id_column): assert df_id_ranges.loc[key, "start"] == start assert df_id_ranges.loc[key, "stop"] == stop df = pG.get_edge_data(types=[key]).compute().to_pandas() + df = df.reset_index(drop=True) assert len(df) == stop - start + 1 - assert (df[pG.edge_id_col_name] == cudf.Series(range(start, stop + 1))).all() + assert (df[pG.edge_id_col_name] == pd.Series(range(start, stop + 1))).all() if prev_id_column is not None: assert prev_id_column in df.columns