Skip to content

Commit

Permalink
MGPropertyGraph: fix OOM when renumbering by type (#3123)
Browse files Browse the repository at this point in the history
Fixes #3110

Previously, all data for a single type would be collected in a single partition, which could easily overwhelm a worker or GPU.

We now sort by multiple columns to ensure data can be evenly distributed.

Authors:
  - Erik Welch (https://github.com/eriknw)

Approvers:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #3123
  • Loading branch information
eriknw authored Jan 19, 2023
1 parent f7d99ff commit f8f075f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
44 changes: 25 additions & 19 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ def add_edge_data(
self.dst_col_name: dataframe[vertex_col_names[1]].dtype,
},
)
temp_dataframe.index.name = self.edge_id_col_name
temp_dataframe.index = temp_dataframe.index.rename(self.edge_id_col_name)
if edge_id_col_name is not None:
temp_dataframe.index = temp_dataframe.index.astype(
dataframe[edge_id_col_name].dtype
Expand Down Expand Up @@ -1331,16 +1331,16 @@ def renumber_vertices_by_type(self, prev_id_column=None):

df = self.__vertex_prop_dataframe
index_dtype = df.index.dtype
if self.__edge_prop_dataframe is not None:
# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
cat_dtype = df.dtypes[self.type_col_name]
df[self.type_col_name] = df[self.type_col_name].astype(str)

df = df.reset_index().sort_values(by=TCN)

# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
df[self.type_col_name] = df[self.type_col_name].astype(cat_dtype)
# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
cat_dtype = df.dtypes[TCN]
df[TCN] = df[TCN].astype(str)

# Include self.vertex_col_name when sorting by values to ensure we can
# evenly distribute the data across workers.
df = df.reset_index().persist()
df = df.sort_values(by=[TCN, self.vertex_col_name], ignore_index=True).persist()
if self.__edge_prop_dataframe is not None:
new_name = f"new_{self.vertex_col_name}"
df[new_name] = 1
df[new_name] = df[new_name].cumsum() - 1
Expand All @@ -1360,6 +1360,9 @@ def renumber_vertices_by_type(self, prev_id_column=None):
self.__edge_prop_dataframe.index = self.__edge_prop_dataframe.index.astype(
edge_index_dtype
)
self.__edge_prop_dataframe.index = self.__edge_prop_dataframe.index.rename(
self.edge_id_col_name
)
if prev_id_column is None:
df[self.vertex_col_name] = df[new_name]
del df[new_name]
Expand All @@ -1371,14 +1374,14 @@ def renumber_vertices_by_type(self, prev_id_column=None):
}
)
else:
if prev_id_column is None:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
else:
df.index.name = prev_id_column
df = df.sort_values(by=self.type_col_name).reset_index()
if prev_id_column is not None:
df[prev_id_column] = df[self.vertex_col_name]
df[self.vertex_col_name] = 1
df[self.vertex_col_name] = df[self.vertex_col_name].cumsum() - 1

# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
df[TCN] = df[TCN].astype(cat_dtype)

df[self.vertex_col_name] = df[self.vertex_col_name].astype(index_dtype)
self.__vertex_prop_dataframe = (
df.persist().set_index(self.vertex_col_name, sorted=True).persist()
Expand Down Expand Up @@ -1424,11 +1427,14 @@ def renumber_edges_by_type(self, prev_id_column=None):
cat_dtype = df.dtypes[self.type_col_name]
df[self.type_col_name] = df[self.type_col_name].astype(str)

if prev_id_column is None:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
else:
df.index = df.index.rename(prev_id_column)
df = df.sort_values(by=self.type_col_name).reset_index()
# Include self.edge_id_col_name when sorting by values to ensure we can
# evenly distribute the data across workers.
df = df.reset_index().persist()
df = df.sort_values(
by=[self.type_col_name, self.edge_id_col_name], ignore_index=True
).persist()
if prev_id_column is not None:
df[prev_id_column] = df[self.edge_id_col_name]

# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
df[self.type_col_name] = df[self.type_col_name].astype(cat_dtype)
Expand Down
8 changes: 5 additions & 3 deletions python/cugraph/cugraph/tests/mg/test_mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,12 +865,13 @@ def test_renumber_vertices_by_type(dataset1_MGPropertyGraph, prev_id_column):
assert df_id_ranges.loc[key, "start"] == start
assert df_id_ranges.loc[key, "stop"] == stop
df = pG.get_vertex_data(types=[key]).compute().to_pandas()
df = df.reset_index(drop=True)
assert len(df) == stop - start + 1
assert (df["_VERTEX_"] == cudf.Series(range(start, stop + 1))).all()
assert (df["_VERTEX_"] == pd.Series(range(start, stop + 1))).all()
if prev_id_column is not None:
cur = df[prev_id_column].sort_values()
expected = sorted(x for x, *args in data[key][1])
assert (cur == cudf.Series(expected, index=cur.index)).all()
assert (cur == pd.Series(expected, index=cur.index)).all()

# Make sure we renumber vertex IDs in edge data too
df = pG.get_edge_data().compute()
Expand Down Expand Up @@ -905,8 +906,9 @@ def test_renumber_edges_by_type(dataset1_MGPropertyGraph, prev_id_column):
assert df_id_ranges.loc[key, "start"] == start
assert df_id_ranges.loc[key, "stop"] == stop
df = pG.get_edge_data(types=[key]).compute().to_pandas()
df = df.reset_index(drop=True)
assert len(df) == stop - start + 1
assert (df[pG.edge_id_col_name] == cudf.Series(range(start, stop + 1))).all()
assert (df[pG.edge_id_col_name] == pd.Series(range(start, stop + 1))).all()

if prev_id_column is not None:
assert prev_id_column in df.columns
Expand Down

0 comments on commit f8f075f

Please sign in to comment.