Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow PropertyGraph default_edge_weight to be used to add an edge weight value on extracted Graphs even when a weight property wasn't specified #2071

Merged
Merged
116 changes: 69 additions & 47 deletions python/cugraph/cugraph/structure/property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class EXPERIMENTAL__PropertyGraph:
type_col_name = "_TYPE_"
edge_id_col_name = "_EDGE_ID_"
vertex_id_col_name = "_VERTEX_ID_"
weight_col_name = "_WEIGHT_"

def __init__(self):
# The dataframe containing the properties for each vertex.
Expand Down Expand Up @@ -215,7 +216,7 @@ def vertices_ids(self):

def add_vertex_data(self,
dataframe,
vertex_id_column,
vertex_col_name,
type_name=None,
property_columns=None
):
Expand All @@ -227,7 +228,7 @@ def add_vertex_data(self,
dataframe : DataFrame-compatible instance
A DataFrame instance with a compatible Pandas-like DataFrame
interface.
vertex_id_column : string
vertex_col_name : string
The column name that contains the values to be used as vertex IDs.
type_name : string
The name to be assigned to the type of property being added. For
Expand All @@ -250,8 +251,8 @@ def add_vertex_data(self,
if type(dataframe) not in _dataframe_types:
raise TypeError("dataframe must be one of the following types: "
f"{_dataframe_types}, got: {type(dataframe)}")
if vertex_id_column not in dataframe.columns:
raise ValueError(f"{vertex_id_column} is not a column in "
if vertex_col_name not in dataframe.columns:
raise ValueError(f"{vertex_col_name} is not a column in "
f"dataframe: {dataframe.columns}")
if (type_name is not None) and not(isinstance(type_name, str)):
raise TypeError("type_name must be a string, got: "
Expand Down Expand Up @@ -293,7 +294,7 @@ def add_vertex_data(self,
# https://github.com/rapidsai/cudf/issues/9981)
self.__update_dataframe_dtypes(
self.__vertex_prop_dataframe,
{self.vertex_col_name: dataframe[vertex_id_column].dtype})
{self.vertex_col_name: dataframe[vertex_col_name].dtype})

# Ensure that both the predetermined vertex ID column name and vertex
# type column name are present for proper merging.
Expand All @@ -302,7 +303,7 @@ def add_vertex_data(self,
# columns. The copied DataFrame is then merged (another copy) and then
# deleted when out-of-scope.
tmp_df = dataframe.copy(deep=True)
tmp_df[self.vertex_col_name] = tmp_df[vertex_id_column]
tmp_df[self.vertex_col_name] = tmp_df[vertex_col_name]
# FIXME: handle case of a type_name column already being in tmp_df
tmp_df[self.type_col_name] = type_name

Expand Down Expand Up @@ -331,7 +332,7 @@ def add_vertex_data(self,

def add_edge_data(self,
dataframe,
vertex_id_columns,
vertex_col_names,
type_name=None,
property_columns=None
):
Expand All @@ -343,7 +344,7 @@ def add_edge_data(self,
dataframe : DataFrame-compatible instance
A DataFrame instance with a compatible Pandas-like DataFrame
interface.
vertex_id_columns : list of strings
vertex_col_names : list of strings
The column names that contain the values to be used as the source
and destination vertex IDs for the edges.
type_name : string
Expand All @@ -367,12 +368,12 @@ def add_edge_data(self,
if type(dataframe) not in _dataframe_types:
raise TypeError("dataframe must be one of the following types: "
f"{_dataframe_types}, got: {type(dataframe)}")
if type(vertex_id_columns) not in [list, tuple]:
raise TypeError("vertex_id_columns must be a list or tuple, got: "
f"{type(vertex_id_columns)}")
invalid_columns = set(vertex_id_columns).difference(dataframe.columns)
if type(vertex_col_names) not in [list, tuple]:
raise TypeError("vertex_col_names must be a list or tuple, got: "
f"{type(vertex_col_names)}")
invalid_columns = set(vertex_col_names).difference(dataframe.columns)
if invalid_columns:
raise ValueError("vertex_id_columns contains column(s) not found "
raise ValueError("vertex_col_names contains column(s) not found "
f"in dataframe: {list(invalid_columns)}")
if (type_name is not None) and not(isinstance(type_name, str)):
raise TypeError("type_name must be a string, got: "
Expand Down Expand Up @@ -415,16 +416,16 @@ def add_edge_data(self,
# https://github.com/rapidsai/cudf/issues/9981)
self.__update_dataframe_dtypes(
self.__edge_prop_dataframe,
{self.src_col_name: dataframe[vertex_id_columns[0]].dtype,
self.dst_col_name: dataframe[vertex_id_columns[1]].dtype,
{self.src_col_name: dataframe[vertex_col_names[0]].dtype,
self.dst_col_name: dataframe[vertex_col_names[1]].dtype,
self.edge_id_col_name: "Int64"})

# NOTE: This copies the incoming DataFrame in order to add the new
# columns. The copied DataFrame is then merged (another copy) and then
# deleted when out-of-scope.
tmp_df = dataframe.copy(deep=True)
tmp_df[self.src_col_name] = tmp_df[vertex_id_columns[0]]
tmp_df[self.dst_col_name] = tmp_df[vertex_id_columns[1]]
tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]]
tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]]
# FIXME: handle case of a type_name column already being in tmp_df
tmp_df[self.type_col_name] = type_name

Expand Down Expand Up @@ -605,7 +606,7 @@ def extract_subgraph(self,
selected_vertex_dataframe = \
self.__vertex_prop_dataframe[selection.vertex_selections]
else:
selected_vertex_dataframe = self.__vertex_prop_dataframe
selected_vertex_dataframe = None

if (selection is not None) and \
(selection.edge_selections is not None):
Expand All @@ -629,25 +630,6 @@ def extract_subgraph(self,
else:
edges = selected_edge_dataframe

if edge_weight_property:
if edge_weight_property not in edges.columns:
raise ValueError("edge_weight_property "
f'"{edge_weight_property}" was not found in '
"the properties of the subgraph")

# Ensure a valid edge_weight_property can be used for applying
# weights to the subgraph, and if a default_edge_weight was
# specified, apply it to all NAs in the weight column.
prop_col = edges[edge_weight_property]
if prop_col.count() != prop_col.size:
if default_edge_weight is None:
raise ValueError("edge_weight_property "
f'"{edge_weight_property}" '
"contains NA values in the subgraph and "
"default_edge_weight is not set")
else:
prop_col.fillna(default_edge_weight, inplace=True)

# The __*_prop_dataframes have likely been merged several times and
# possibly had their dtypes converted in order to accommodate NaN
# values. Restore the original dtypes in the resulting edges df prior
Expand All @@ -658,23 +640,24 @@ def extract_subgraph(self,
edges,
create_using=create_using,
edge_weight_property=edge_weight_property,
default_edge_weight=default_edge_weight,
allow_multi_edges=allow_multi_edges)

def annotate_dataframe(self, df, G, edge_vertex_id_columns):
def annotate_dataframe(self, df, G, edge_vertex_col_names):
"""
Add properties to df that represent the vertices and edges in graph G.

Parameters
----------
df : cudf.DataFrame or pandas.DataFrame
A DataFrame containing edges identified by edge_vertex_id_columns
A DataFrame containing edges identified by edge_vertex_col_names
which will have properties for those edges added to it.
G : cugraph.Graph (or subclass of) instance.
Graph containing the edges specified in df. The Graph instance must
have been generated from a prior call to extract_subgraph() in
order to have the edge meta-data used to look up the correct
properties.
edge_vertex_id_columns : tuple of strings
edge_vertex_col_names : tuple of strings
The column names in df that represent the source and destination
vertices, used for identifying edges.

Expand All @@ -689,7 +672,7 @@ def annotate_dataframe(self, df, G, edge_vertex_id_columns):
>>>
"""
# FIXME: check all args
(src_col_name, dst_col_name) = edge_vertex_id_columns
(src_col_name, dst_col_name) = edge_vertex_col_names

df_type = type(df)
if df_type is not self.__dataframe_type:
Expand Down Expand Up @@ -729,15 +712,42 @@ def edge_props_to_graph(self,
edge_prop_df,
create_using,
edge_weight_property=None,
default_edge_weight=None,
allow_multi_edges=False):
"""
Create and return a Graph from the edges in edge_prop_df.
"""
if edge_weight_property and \
(edge_weight_property not in edge_prop_df.columns):
raise ValueError("edge_weight_property "
f'"{edge_weight_property}" was not found in '
"edge_prop_df")
# FIXME: check default_edge_weight is valid

if edge_weight_property:
if edge_weight_property not in edge_prop_df.columns:
raise ValueError("edge_weight_property "
f'"{edge_weight_property}" was not found in '
"edge_prop_df")

# Ensure a valid edge_weight_property can be used for applying
# weights to the subgraph, and if a default_edge_weight was
# specified, apply it to all NAs in the weight column.
prop_col = edge_prop_df[edge_weight_property]
if prop_col.count() != prop_col.size:
if default_edge_weight is None:
raise ValueError("edge_weight_property "
f'"{edge_weight_property}" '
"contains NA values in the subgraph and "
"default_edge_weight is not set")
else:
prop_col.fillna(default_edge_weight, inplace=True)
edge_attr = edge_weight_property

# If a default_edge_weight was specified but an edge_weight_property
# was not, a new edge weight column must be added.
elif default_edge_weight:
edge_attr = self.__gen_unique_name(edge_prop_df.columns,
prefix=self.weight_col_name)
edge_prop_df[edge_attr] = default_edge_weight

else:
edge_attr = None

# Set up the new Graph to return
if isinstance(create_using, cugraph.Graph):
Expand Down Expand Up @@ -771,7 +781,7 @@ def edge_props_to_graph(self,

create_args = {"source": self.src_col_name,
"destination": self.dst_col_name,
"edge_attr": edge_weight_property,
"edge_attr": edge_attr,
"renumber": True,
}
if type(edge_prop_df) is cudf.DataFrame:
Expand Down Expand Up @@ -851,6 +861,18 @@ def __get_all_vertices_series(self):
vert_sers.append(epd[self.dst_col_name])
return vert_sers

@staticmethod
def __gen_unique_name(current_names, prefix="col"):
"""
Helper function to generate a currently unused name.
"""
name = prefix
counter = 2
while name in current_names:
name = f"{prefix}{counter}"
counter += 1
return name

@staticmethod
def __get_new_column_dtypes(from_df, to_df):
"""
Expand Down
8 changes: 4 additions & 4 deletions python/cugraph/cugraph/tests/test_graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_using_pgraph(graph_file):
pG = PropertyGraph()
pG.add_edge_data(cu_M,
type_name="edge",
vertex_id_columns=("0", "1"),
vertex_col_names=("0", "1"),
property_columns=None)

gstore = cugraph.gnn.CuGraphStore(graph=pG)
Expand All @@ -70,7 +70,7 @@ def test_node_data_pg(graph_file):
pG = PropertyGraph()
pG.add_edge_data(cu_M,
type_name="edge",
vertex_id_columns=("0", "1"),
vertex_col_names=("0", "1"),
property_columns=None)

gstore = cugraph.gnn.CuGraphStore(graph=pG)
Expand All @@ -91,7 +91,7 @@ def test_egonet(graph_file):
pG = PropertyGraph()
pG.add_edge_data(cu_M,
type_name="edge",
vertex_id_columns=("0", "1"),
vertex_col_names=("0", "1"),
property_columns=None)

gstore = cugraph.gnn.CuGraphStore(graph=pG)
Expand All @@ -117,7 +117,7 @@ def test_workflow(graph_file):
pg = PropertyGraph()
pg.add_edge_data(cu_M,
type_name="edge",
vertex_id_columns=("0", "1"),
vertex_col_names=("0", "1"),
property_columns=["2"])

gstore = cugraph.gnn.CuGraphStore(graph=pg)
Expand Down
Loading