Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide option to keep original vertex/edge IDs when renumbering #2951

Merged
merged 3 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,9 +1078,14 @@ def edge_props_to_graph(

return G

def renumber_vertices_by_type(self):
def renumber_vertices_by_type(self, prev_id_column=None):
"""Renumber vertex IDs to be contiguous by type.

Parameters
----------
prev_id_column : str, optional
Column name to save the vertex ID before renumbering.

Returns a DataFrame with the start and stop IDs for each vertex type.
Stop is *inclusive*.
"""
Expand All @@ -1096,6 +1101,13 @@ def renumber_vertices_by_type(self):
)
if self.__vertex_prop_dataframe is None:
return None
if (
prev_id_column is not None
and prev_id_column in self.__vertex_prop_dataframe
):
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)

# Use categorical dtype for the type column
if self.__series_type is dask_cudf.Series:
Expand Down Expand Up @@ -1136,10 +1148,22 @@ def renumber_vertices_by_type(self):
.drop(columns=[self.dst_col_name])
.rename(columns={new_name: self.dst_col_name})
)
df[self.vertex_col_name] = df[new_name]
del df[new_name]
if prev_id_column is None:
df[self.vertex_col_name] = df[new_name]
del df[new_name]
else:
df = df.rename(
columns={
new_name: self.vertex_col_name,
self.vertex_col_name: prev_id_column,
}
)
else:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
if prev_id_column is None:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
else:
df.index.name = prev_id_column
df = df.sort_values(by=self.type_col_name).reset_index()
df[self.vertex_col_name] = 1
df[self.vertex_col_name] = df[self.vertex_col_name].cumsum() - 1

Expand All @@ -1162,22 +1186,35 @@ def renumber_vertices_by_type(self):
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

def renumber_edges_by_type(self):
def renumber_edges_by_type(self, prev_id_column=None):
"""Renumber edge IDs to be contiguous by type.

Parameters
----------
prev_id_column : str, optional
Column name to save the edge ID before renumbering.

Returns a DataFrame with the start and stop IDs for each edge type.
Stop is *inclusive*.
"""
# TODO: keep track if edges are already numbered correctly.
if self.__edge_prop_dataframe is None:
return None
if prev_id_column is not None and prev_id_column in self.__edge_prop_dataframe:
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)
df = self.__edge_prop_dataframe

# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
cat_dtype = df.dtypes[self.type_col_name]
df[self.type_col_name] = df[self.type_col_name].astype(str)

df = df.sort_values(by=self.type_col_name, ignore_index=True)
if prev_id_column is None:
df = df.sort_values(by=self.type_col_name, ignore_index=True)
else:
df.index = df.index.rename(prev_id_column)
df = df.sort_values(by=self.type_col_name).reset_index()

# FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795
df[self.type_col_name] = df[self.type_col_name].astype(cat_dtype)
Expand Down
44 changes: 36 additions & 8 deletions python/cugraph/cugraph/structure/property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1635,10 +1635,15 @@ def edge_props_to_graph(

return G

def renumber_vertices_by_type(self):
def renumber_vertices_by_type(self, prev_id_column=None):
"""
Renumber vertex IDs to be contiguous by type.

Parameters
----------
prev_id_column : str, optional
Column name to save the vertex ID before renumbering.

Returns
-------
a DataFrame with the start and stop IDs for each vertex type.
Expand Down Expand Up @@ -1681,6 +1686,13 @@ def renumber_vertices_by_type(self):
)
if self.__vertex_prop_dataframe is None:
return None
if (
prev_id_column is not None
and prev_id_column in self.__vertex_prop_dataframe
):
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)

# Use categorical dtype for the type column
if self.__series_type is cudf.Series:
Expand All @@ -1704,18 +1716,26 @@ def renumber_vertices_by_type(self):
self.__edge_prop_dataframe[self.dst_col_name] = self.__edge_prop_dataframe[
self.dst_col_name
].map(mapper)
df.drop(columns=[self.vertex_col_name], inplace=True)
if prev_id_column is None:
df.drop(columns=[self.vertex_col_name], inplace=True)
else:
df.rename(columns={self.vertex_col_name: prev_id_column}, inplace=True)
df.index.name = self.vertex_col_name
self.__vertex_prop_dataframe = df
rv = self._vertex_type_value_counts.sort_index().cumsum().to_frame("stop")
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

def renumber_edges_by_type(self):
def renumber_edges_by_type(self, prev_id_column=None):
"""
Renumber edge IDs to be contiguous by type.

Parameters
----------
prev_id_column : str, optional
Column name to save the edge ID before renumbering.

Returns
-------
DataFrame
Expand Down Expand Up @@ -1753,9 +1773,12 @@ def renumber_edges_by_type(self):
"""
TCN = self.type_col_name

# TODO: keep track if edges are already numbered correctly.
if self.__edge_prop_dataframe is None:
return None
if prev_id_column is not None and prev_id_column in self.__edge_prop_dataframe:
raise ValueError(
f"Can't save previous IDs to existing column {prev_id_column!r}"
)

# Use categorical dtype for the type column
if self.__series_type is cudf.Series:
Expand All @@ -1770,10 +1793,15 @@ def renumber_edges_by_type(self):
cat_dtype
)

self.__edge_prop_dataframe = self.__edge_prop_dataframe.sort_values(
by=TCN, ignore_index=True
)
self.__edge_prop_dataframe.index.name = self.edge_id_col_name
df = self.__edge_prop_dataframe
if prev_id_column is None:
df = df.sort_values(by=TCN, ignore_index=True)
else:
df = df.sort_values(by=TCN)
df.index.name = prev_id_column
df.reset_index(inplace=True)
df.index.name = self.edge_id_col_name
self.__edge_prop_dataframe = df
rv = self._edge_type_value_counts.sort_index().cumsum().to_frame("stop")
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
Expand Down
29 changes: 20 additions & 9 deletions python/cugraph/cugraph/tests/mg/test_mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def dataset1_PropertyGraph(request):
return (pG, dataset1)


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def dataset1_MGPropertyGraph(dask_client):
"""
Fixture which returns an instance of a PropertyGraph with vertex and edge
Expand Down Expand Up @@ -846,11 +846,14 @@ def test_get_data_empty_graphs(dask_client):
assert pG.get_edge_data([0, 1, 2]) is None


def test_renumber_vertices_by_type(dataset1_MGPropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_vertices_by_type(dataset1_MGPropertyGraph, prev_id_column):
from cugraph.experimental import MGPropertyGraph

(pG, data) = dataset1_MGPropertyGraph
df_id_ranges = pG.renumber_vertices_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_vertices_by_type("merchant_size")
df_id_ranges = pG.renumber_vertices_by_type(prev_id_column)
expected = {
"merchants": [0, 4], # stop is inclusive
"users": [5, 8],
Expand All @@ -861,28 +864,34 @@ def test_renumber_vertices_by_type(dataset1_MGPropertyGraph):
df = pG.get_vertex_data(types=[key]).compute()
assert len(df) == stop - start + 1
assert (df["_VERTEX_"] == list(range(start, stop + 1))).all()

if prev_id_column is not None:
cur = df[prev_id_column].sort_values()
expected = sorted(x for x, *args in data[key][1])
assert (cur == expected).all()
# Make sure we renumber vertex IDs in edge data too
df = pG.get_edge_data().compute()
assert 0 <= df[pG.src_col_name].min() < df[pG.src_col_name].max() < 9
assert 0 <= df[pG.dst_col_name].min() < df[pG.dst_col_name].max() < 9

empty_pG = MGPropertyGraph()
assert empty_pG.renumber_vertices_by_type() is None
assert empty_pG.renumber_vertices_by_type(prev_id_column) is None

# Test when vertex IDs only exist in edge data
df = cudf.DataFrame({"src": [99998], "dst": [99999]})
df = dask_cudf.from_cudf(df, npartitions=1)
empty_pG.add_edge_data(df, ["src", "dst"])
with pytest.raises(NotImplementedError, match="only exist in edge"):
empty_pG.renumber_vertices_by_type()
empty_pG.renumber_vertices_by_type(prev_id_column)


def test_renumber_edges_by_type(dataset1_MGPropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_edges_by_type(dataset1_MGPropertyGraph, prev_id_column):
from cugraph.experimental import MGPropertyGraph

(pG, data) = dataset1_MGPropertyGraph
df_id_ranges = pG.renumber_edges_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_edges_by_type("time")
df_id_ranges = pG.renumber_edges_by_type(prev_id_column)
expected = {
"referrals": [0, 5], # stop is inclusive
"relationships": [6, 9],
Expand All @@ -894,9 +903,11 @@ def test_renumber_edges_by_type(dataset1_MGPropertyGraph):
df = pG.get_edge_data(types=[key]).compute()
assert len(df) == stop - start + 1
assert (df[pG.edge_id_col_name] == list(range(start, stop + 1))).all()
if prev_id_column is not None:
assert prev_id_column in df.columns

empty_pG = MGPropertyGraph()
assert empty_pG.renumber_edges_by_type() is None
assert empty_pG.renumber_edges_by_type(prev_id_column) is None


def test_add_data_noncontiguous(dask_client):
Expand Down
30 changes: 21 additions & 9 deletions python/cugraph/cugraph/tests/test_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def df_type_id(dataframe_type):
df_types_fixture_params = utils.genFixtureParamsProduct((df_types, df_type_id))


@pytest.fixture(scope="module", params=df_types_fixture_params)
@pytest.fixture(scope="function", params=df_types_fixture_params)
def dataset1_PropertyGraph(request):
"""
Fixture which returns an instance of a PropertyGraph with vertex and edge
Expand Down Expand Up @@ -1733,11 +1733,14 @@ def test_get_data_empty_graphs():
assert pG.get_edge_data([0, 1, 2]) is None


def test_renumber_vertices_by_type(dataset1_PropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_vertices_by_type(dataset1_PropertyGraph, prev_id_column):
from cugraph.experimental import PropertyGraph

(pG, data) = dataset1_PropertyGraph
df_id_ranges = pG.renumber_vertices_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_vertices_by_type("merchant_size")
df_id_ranges = pG.renumber_vertices_by_type(prev_id_column)
expected = {
"merchants": [0, 4], # stop is inclusive
"users": [5, 8],
Expand All @@ -1748,27 +1751,33 @@ def test_renumber_vertices_by_type(dataset1_PropertyGraph):
df = pG.get_vertex_data(types=[key])
assert len(df) == stop - start + 1
assert (df["_VERTEX_"] == list(range(start, stop + 1))).all()

if prev_id_column is not None:
cur = df[prev_id_column].sort_values()
expected = sorted(x for x, *args in data[key][1])
assert (cur == expected).all()
# Make sure we renumber vertex IDs in edge data too
df = pG.get_edge_data()
assert 0 <= df[pG.src_col_name].min() < df[pG.src_col_name].max() < 9
assert 0 <= df[pG.dst_col_name].min() < df[pG.dst_col_name].max() < 9

empty_pG = PropertyGraph()
assert empty_pG.renumber_vertices_by_type() is None
assert empty_pG.renumber_vertices_by_type(prev_id_column) is None

# Test when vertex IDs only exist in edge data
df = type(df)({"src": [99998], "dst": [99999]})
empty_pG.add_edge_data(df, ["src", "dst"])
with pytest.raises(NotImplementedError, match="only exist in edge"):
empty_pG.renumber_vertices_by_type()
empty_pG.renumber_vertices_by_type(prev_id_column)


def test_renumber_edges_by_type(dataset1_PropertyGraph):
@pytest.mark.parametrize("prev_id_column", [None, "prev_id"])
def test_renumber_edges_by_type(dataset1_PropertyGraph, prev_id_column):
from cugraph.experimental import PropertyGraph

(pG, data) = dataset1_PropertyGraph
df_id_ranges = pG.renumber_edges_by_type()
with pytest.raises(ValueError, match="existing column"):
pG.renumber_edges_by_type("time")
df_id_ranges = pG.renumber_edges_by_type(prev_id_column)
expected = {
"transactions": [0, 3], # stop is inclusive
"relationships": [4, 7],
Expand All @@ -1784,9 +1793,11 @@ def test_renumber_edges_by_type(dataset1_PropertyGraph):
df = pG.get_edge_data(types=[key])
assert len(df) == stop - start + 1
assert (df[pG.edge_id_col_name] == list(range(start, stop + 1))).all()
if prev_id_column is not None:
assert prev_id_column in df.columns

empty_pG = PropertyGraph()
assert empty_pG.renumber_edges_by_type() is None
assert empty_pG.renumber_edges_by_type(prev_id_column) is None


@pytest.mark.parametrize("df_type", df_types, ids=df_type_id)
Expand Down Expand Up @@ -2068,6 +2079,7 @@ def bench_extract_subgraph_for_rmat(gpubenchmark, rmat_PropertyGraph):
)


@pytest.mark.slow
@pytest.mark.parametrize("n_rows", [15_000_000, 30_000_000, 60_000_000, 120_000_000])
def bench_add_edge_data(gpubenchmark, n_rows):
from cugraph.experimental import PropertyGraph
Expand Down