Skip to content

Commit

Permalink
Provide option to keep original vertex/edge IDs when renumbering (#2951)
Browse files Browse the repository at this point in the history
  • Loading branch information
eriknw authored Nov 21, 2022
1 parent 4b7d4fb commit 371aaae
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 32 deletions.
49 changes: 43 additions & 6 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,9 +1260,14 @@ def edge_props_to_graph(

return G

def renumber_vertices_by_type(self):
def renumber_vertices_by_type(self, prev_id_column=None):
"""Renumber vertex IDs to be contiguous by type.
Parameters
----------
prev_id_column : str, optional
Column name to save the vertex ID before renumbering.
Returns a DataFrame with the start and stop IDs for each vertex type.
Stop is *inclusive*.
"""
Expand All @@ -1278,6 +1283,13 @@ def renumber_vertices_by_type(self):
)
if self.__vertex_prop_dataframe is None:
return None
if (
prev_id_column is not None
and prev_id_column in self.__vertex_prop_dataframe
):
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)

# Use categorical dtype for the type column
if self.__series_type is dask_cudf.Series:
Expand Down Expand Up @@ -1318,10 +1330,22 @@ def renumber_vertices_by_type(self):
.drop(columns=[self.dst_col_name])
.rename(columns={new_name: self.dst_col_name})
)
df[self.vertex_col_name] = df[new_name]
del df[new_name]
if prev_id_column is None:
df[self.vertex_col_name] = df[new_name]
del df[new_name]
else:
df = df.rename(
columns={
new_name: self.vertex_col_name,
self.vertex_col_name: prev_id_column,
}
)
else:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
if prev_id_column is None:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
else:
df.index.name = prev_id_column
df = df.sort_values(by=self.type_col_name).reset_index()
df[self.vertex_col_name] = 1
df[self.vertex_col_name] = df[self.vertex_col_name].cumsum() - 1

Expand All @@ -1344,22 +1368,35 @@ def renumber_vertices_by_type(self):
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

def renumber_edges_by_type(self):
def renumber_edges_by_type(self, prev_id_column=None):
"""Renumber edge IDs to be contiguous by type.
Parameters
----------
prev_id_column : str, optional
Column name to save the edge ID before renumbering.
Returns a DataFrame with the start and stop IDs for each edge type.
Stop is *inclusive*.
"""
# TODO: keep track if edges are already numbered correctly.
if self.__edge_prop_dataframe is None:
return None
if prev_id_column is not None and prev_id_column in self.__edge_prop_dataframe:
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)
df = self.__edge_prop_dataframe

# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
cat_dtype = df.dtypes[self.type_col_name]
df[self.type_col_name] = df[self.type_col_name].astype(str)

df = df.sort_values(by=self.type_col_name, ignore_index=True)
if prev_id_column is None:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
else:
df.index = df.index.rename(prev_id_column)
df = df.sort_values(by=self.type_col_name).reset_index()

# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
df[self.type_col_name] = df[self.type_col_name].astype(cat_dtype)
Expand Down
44 changes: 36 additions & 8 deletions python/cugraph/cugraph/structure/property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1816,10 +1816,15 @@ def edge_props_to_graph(

return G

def renumber_vertices_by_type(self):
def renumber_vertices_by_type(self, prev_id_column=None):
"""
Renumber vertex IDs to be contiguous by type.
Parameters
----------
prev_id_column : str, optional
Column name to save the vertex ID before renumbering.
Returns
-------
a DataFrame with the start and stop IDs for each vertex type.
Expand Down Expand Up @@ -1862,6 +1867,13 @@ def renumber_vertices_by_type(self):
)
if self.__vertex_prop_dataframe is None:
return None
if (
prev_id_column is not None
and prev_id_column in self.__vertex_prop_dataframe
):
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)

# Use categorical dtype for the type column
if self.__series_type is cudf.Series:
Expand All @@ -1885,18 +1897,26 @@ def renumber_vertices_by_type(self):
self.__edge_prop_dataframe[self.dst_col_name] = self.__edge_prop_dataframe[
self.dst_col_name
].map(mapper)
df.drop(columns=[self.vertex_col_name], inplace=True)
if prev_id_column is None:
df.drop(columns=[self.vertex_col_name], inplace=True)
else:
df.rename(columns={self.vertex_col_name: prev_id_column}, inplace=True)
df.index.name = self.vertex_col_name
self.__vertex_prop_dataframe = df
rv = self._vertex_type_value_counts.sort_index().cumsum().to_frame("stop")
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

def renumber_edges_by_type(self):
def renumber_edges_by_type(self, prev_id_column=None):
"""
Renumber edge IDs to be contiguous by type.
Parameters
----------
prev_id_column : str, optional
Column name to save the edge ID before renumbering.
Returns
-------
DataFrame
Expand Down Expand Up @@ -1934,9 +1954,12 @@ def renumber_edges_by_type(self):
"""
TCN = self.type_col_name

# TODO: keep track if edges are already numbered correctly.
if self.__edge_prop_dataframe is None:
return None
if prev_id_column is not None and prev_id_column in self.__edge_prop_dataframe:
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)

# Use categorical dtype for the type column
if self.__series_type is cudf.Series:
Expand All @@ -1951,10 +1974,15 @@ def renumber_edges_by_type(self):
cat_dtype
)

self.__edge_prop_dataframe = self.__edge_prop_dataframe.sort_values(
by=TCN, ignore_index=True
)
self.__edge_prop_dataframe.index.name = self.edge_id_col_name
df = self.__edge_prop_dataframe
if prev_id_column is None:
df = df.sort_values(by=TCN, ignore_index=True)
else:
df = df.sort_values(by=TCN)
df.index.name = prev_id_column
df.reset_index(inplace=True)
df.index.name = self.edge_id_col_name
self.__edge_prop_dataframe = df
rv = self._edge_type_value_counts.sort_index().cumsum().to_frame("stop")
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
Expand Down
29 changes: 20 additions & 9 deletions python/cugraph/cugraph/tests/mg/test_mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def dataset1_PropertyGraph(request):
return (pG, dataset1)


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def dataset1_MGPropertyGraph(dask_client):
"""
Fixture which returns an instance of a PropertyGraph with vertex and edge
Expand Down Expand Up @@ -849,11 +849,14 @@ def test_get_data_empty_graphs(dask_client):
assert pG.get_edge_data([0, 1, 2]) is None


def test_renumber_vertices_by_type(dataset1_MGPropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_vertices_by_type(dataset1_MGPropertyGraph, prev_id_column):
from cugraph.experimental import MGPropertyGraph

(pG, data) = dataset1_MGPropertyGraph
df_id_ranges = pG.renumber_vertices_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_vertices_by_type("merchant_size")
df_id_ranges = pG.renumber_vertices_by_type(prev_id_column)
expected = {
"merchants": [0, 4], # stop is inclusive
"users": [5, 8],
Expand All @@ -864,28 +867,34 @@ def test_renumber_vertices_by_type(dataset1_MGPropertyGraph):
df = pG.get_vertex_data(types=[key]).compute()
assert len(df) == stop - start + 1
assert (df["_VERTEX_"] == list(range(start, stop + 1))).all()

if prev_id_column is not None:
cur = df[prev_id_column].sort_values()
expected = sorted(x for x, *args in data[key][1])
assert (cur == expected).all()
# Make sure we renumber vertex IDs in edge data too
df = pG.get_edge_data().compute()
assert 0 <= df[pG.src_col_name].min() < df[pG.src_col_name].max() < 9
assert 0 <= df[pG.dst_col_name].min() < df[pG.dst_col_name].max() < 9

empty_pG = MGPropertyGraph()
assert empty_pG.renumber_vertices_by_type() is None
assert empty_pG.renumber_vertices_by_type(prev_id_column) is None

# Test when vertex IDs only exist in edge data
df = cudf.DataFrame({"src": [99998], "dst": [99999]})
df = dask_cudf.from_cudf(df, npartitions=1)
empty_pG.add_edge_data(df, ["src", "dst"])
with pytest.raises(NotImplementedError, match="only exist in edge"):
empty_pG.renumber_vertices_by_type()
empty_pG.renumber_vertices_by_type(prev_id_column)


def test_renumber_edges_by_type(dataset1_MGPropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_edges_by_type(dataset1_MGPropertyGraph, prev_id_column):
from cugraph.experimental import MGPropertyGraph

(pG, data) = dataset1_MGPropertyGraph
df_id_ranges = pG.renumber_edges_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_edges_by_type("time")
df_id_ranges = pG.renumber_edges_by_type(prev_id_column)
expected = {
"referrals": [0, 5], # stop is inclusive
"relationships": [6, 9],
Expand All @@ -897,9 +906,11 @@ def test_renumber_edges_by_type(dataset1_MGPropertyGraph):
df = pG.get_edge_data(types=[key]).compute()
assert len(df) == stop - start + 1
assert (df[pG.edge_id_col_name] == list(range(start, stop + 1))).all()
if prev_id_column is not None:
assert prev_id_column in df.columns

empty_pG = MGPropertyGraph()
assert empty_pG.renumber_edges_by_type() is None
assert empty_pG.renumber_edges_by_type(prev_id_column) is None


def test_add_data_noncontiguous(dask_client):
Expand Down
30 changes: 21 additions & 9 deletions python/cugraph/cugraph/tests/test_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def df_type_id(dataframe_type):
df_types_fixture_params = utils.genFixtureParamsProduct((df_types, df_type_id))


@pytest.fixture(scope="module", params=df_types_fixture_params)
@pytest.fixture(scope="function", params=df_types_fixture_params)
def dataset1_PropertyGraph(request):
"""
Fixture which returns an instance of a PropertyGraph with vertex and edge
Expand Down Expand Up @@ -1733,11 +1733,14 @@ def test_get_data_empty_graphs():
assert pG.get_edge_data([0, 1, 2]) is None


def test_renumber_vertices_by_type(dataset1_PropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_vertices_by_type(dataset1_PropertyGraph, prev_id_column):
from cugraph.experimental import PropertyGraph

(pG, data) = dataset1_PropertyGraph
df_id_ranges = pG.renumber_vertices_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_vertices_by_type("merchant_size")
df_id_ranges = pG.renumber_vertices_by_type(prev_id_column)
expected = {
"merchants": [0, 4], # stop is inclusive
"users": [5, 8],
Expand All @@ -1748,27 +1751,33 @@ def test_renumber_vertices_by_type(dataset1_PropertyGraph):
df = pG.get_vertex_data(types=[key])
assert len(df) == stop - start + 1
assert (df["_VERTEX_"] == list(range(start, stop + 1))).all()

if prev_id_column is not None:
cur = df[prev_id_column].sort_values()
expected = sorted(x for x, *args in data[key][1])
assert (cur == expected).all()
# Make sure we renumber vertex IDs in edge data too
df = pG.get_edge_data()
assert 0 <= df[pG.src_col_name].min() < df[pG.src_col_name].max() < 9
assert 0 <= df[pG.dst_col_name].min() < df[pG.dst_col_name].max() < 9

empty_pG = PropertyGraph()
assert empty_pG.renumber_vertices_by_type() is None
assert empty_pG.renumber_vertices_by_type(prev_id_column) is None

# Test when vertex IDs only exist in edge data
df = type(df)({"src": [99998], "dst": [99999]})
empty_pG.add_edge_data(df, ["src", "dst"])
with pytest.raises(NotImplementedError, match="only exist in edge"):
empty_pG.renumber_vertices_by_type()
empty_pG.renumber_vertices_by_type(prev_id_column)


def test_renumber_edges_by_type(dataset1_PropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_edges_by_type(dataset1_PropertyGraph, prev_id_column):
from cugraph.experimental import PropertyGraph

(pG, data) = dataset1_PropertyGraph
df_id_ranges = pG.renumber_edges_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_edges_by_type("time")
df_id_ranges = pG.renumber_edges_by_type(prev_id_column)
expected = {
"transactions": [0, 3], # stop is inclusive
"relationships": [4, 7],
Expand All @@ -1784,9 +1793,11 @@ def test_renumber_edges_by_type(dataset1_PropertyGraph):
df = pG.get_edge_data(types=[key])
assert len(df) == stop - start + 1
assert (df[pG.edge_id_col_name] == list(range(start, stop + 1))).all()
if prev_id_column is not None:
assert prev_id_column in df.columns

empty_pG = PropertyGraph()
assert empty_pG.renumber_edges_by_type() is None
assert empty_pG.renumber_edges_by_type(prev_id_column) is None


@pytest.mark.parametrize("df_type", df_types, ids=df_type_id)
Expand Down Expand Up @@ -2302,6 +2313,7 @@ def bench_extract_subgraph_for_rmat(gpubenchmark, rmat_PropertyGraph):
)


@pytest.mark.slow
@pytest.mark.parametrize("n_rows", [15_000_000, 30_000_000, 60_000_000, 120_000_000])
def bench_add_edge_data(gpubenchmark, n_rows):
from cugraph.experimental import PropertyGraph
Expand Down

0 comments on commit 371aaae

Please sign in to comment.