From 4e75c0aa2ae997c7f2e697a392a205959d07c661 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Tue, 11 Oct 2022 21:23:45 -0700 Subject: [PATCH 1/5] PG: join new vertex data by vertex ids Fixes #2793. We are using the assumption that there should be a single row for each vertex id. --- .../cugraph/structure/property_graph.py | 29 ++++++++++++++++--- .../cugraph/tests/test_property_graph.py | 6 +++- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/python/cugraph/cugraph/structure/property_graph.py b/python/cugraph/cugraph/structure/property_graph.py index 13e8788bd00..9871ee3bc90 100644 --- a/python/cugraph/cugraph/structure/property_graph.py +++ b/python/cugraph/cugraph/structure/property_graph.py @@ -468,15 +468,36 @@ def add_vertex_data(self, tmp_df, self.__vertex_prop_dataframe) self.__vertex_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices + # Join on vertex ids (the index) tmp_df.set_index(self.vertex_col_name, inplace=True) + # Option 1 + # df = self.__vertex_prop_dataframe.merge( + # tmp_df, + # on=self.vertex_col_name, + # how="outer", + # suffixes=("", "_NEW_"), + # ) + # Option 2 + df = self.__vertex_prop_dataframe.join( + tmp_df, + how="outer", + rsuffix="_NEW_", + ) cols = ( self.__vertex_prop_dataframe.columns.intersection(tmp_df.columns) .to_list() ) - cols.append(self.vertex_col_name) - self.__vertex_prop_dataframe = \ - self.__vertex_prop_dataframe.merge(tmp_df, on=cols, how="outer") + if cols: + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + df.drop(columns=new_cols, inplace=True) + # TODO: should we use the values from `sub_df` if both are non-null? + df.fillna(sub_df, inplace=True) # Option 1 + # df[sub_df.columns] = sub_df.fillna(df[sub_df.columns]) # Option 2 + # Should we update the dtypes? + + self.__vertex_prop_dataframe = df # Update the vertex eval dict with the latest column instances if self.__series_type is cudf.Series: diff --git a/python/cugraph/cugraph/tests/test_property_graph.py b/python/cugraph/cugraph/tests/test_property_graph.py index 5bb81c2b05d..c715d9ea195 100644 --- a/python/cugraph/cugraph/tests/test_property_graph.py +++ b/python/cugraph/cugraph/tests/test_property_graph.py @@ -1721,7 +1721,11 @@ def test_add_data_noncontiguous(df_type): check_names=False, ) - df['vertex'] = 10 * df['src'] + df['dst'] + df["vertex"] = ( + 100 * df["src"] + + df["dst"] + + df["edge_type"].map({"pig": 0, "dog": 10, "cat": 20}) + ) pG = PropertyGraph() for edge_type in ["cat", "dog", "pig"]: pG.add_vertex_data( From 6de76998f59a66663e1361e0bde9abb7d65acce0 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Mon, 24 Oct 2022 22:12:40 -0700 Subject: [PATCH 2/5] Update join for add edge data, and for MG (which is failing) --- .../dask/structure/mg_property_graph.py | 214 ++++++++--------- .../cugraph/structure/property_graph.py | 215 ++++++++---------- 2 files changed, 189 insertions(+), 240 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 04bb4f9b035..23e66ea554e 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -365,20 +365,19 @@ def add_vertex_data( f"{vertex_col_name} is not a column in " f"dataframe: {dataframe.columns}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) @@ -387,52 +386,30 @@ def add_vertex_data( self.__num_vertices = None self.__vertex_type_value_counts = None # Could update instead - # Initialize the __vertex_prop_dataframe if necessary using the same - # type as the incoming dataframe. + # Add `type_name` to the TYPE categorical dtype if necessary TCN = self.type_col_name - default_vertex_columns = [self.vertex_col_name, TCN] - if self.__vertex_prop_dataframe is None: - temp_dataframe = cudf.DataFrame(columns=default_vertex_columns) - self.__vertex_prop_dataframe = dask_cudf.from_cudf( - temp_dataframe, npartitions=self.__num_workers - ) - # Initialize the new columns to the same dtype as the appropriate - # column in the incoming dataframe, since the initial merge may not - # result in the same dtype. (see - # https://github.com/rapidsai/cudf/issues/9981) - self.__update_dataframe_dtypes( - self.__vertex_prop_dataframe, - {self.vertex_col_name: dataframe[vertex_col_name].dtype}, - ) - self.__vertex_prop_dataframe = self.__vertex_prop_dataframe.set_index( - self.vertex_col_name - ) - - # Use categorical dtype for the type column + is_first_data = self.__vertex_prop_dataframe is None + if is_first_data: if self.__series_type is dask_cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ - TCN - ].astype(cat_dtype) - - # Ensure that both the predetermined vertex ID column name and vertex - # type column name are present for proper merging. + else: + cat_dtype = self.__update_categorical_dtype( + self.__vertex_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then # deleted when out-of-scope. + + # Ensure that both the predetermined vertex ID column name and vertex + # type column name are present for proper merging. tmp_df = dataframe.copy() tmp_df[self.vertex_col_name] = tmp_df[vertex_col_name] # FIXME: handle case of a type_name column already being in tmp_df - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__vertex_prop_dataframe, TCN, type_name - ) - tmp_df[TCN] = type_name tmp_df[TCN] = tmp_df[TCN].astype(cat_dtype) @@ -441,7 +418,7 @@ def add_vertex_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_vertex_columns + property_columns + [self.vertex_col_name, TCN] ) else: column_names_to_drop = {vertex_col_name} @@ -450,25 +427,33 @@ def add_vertex_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes( - tmp_df, self.__vertex_prop_dataframe - ) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__vertex_prop_dataframe + ) self.__vertex_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices - tmp_df = tmp_df.set_index(self.vertex_col_name) - cols = self.__vertex_prop_dataframe.columns.intersection( - tmp_df.columns - ).to_list() - cols.append(self.vertex_col_name) - # FIXME: workaround for: https://github.com/rapidsai/cudf/issues/11550 - self.__vertex_prop_dataframe = ( - self.__vertex_prop_dataframe.reset_index() - .merge(tmp_df.reset_index(), on=cols, how="outer") - .set_index(self.vertex_col_name) - ) - # self.__vertex_prop_dataframe = \ - # self.__vertex_prop_dataframe.merge(tmp_df, on=cols, how="outer") + # TODO: allow tmp_df to come in with vertex id already as index + tmp_df = tmp_df.persist().set_index(self.vertex_col_name).persist() + self.__update_dataframe_dtypes(tmp_df, self.__vertex_prop_dtypes) + + if is_first_data: + self.__vertex_prop_dataframe = tmp_df + else: + # Join on vertex ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__vertex_prop_dataframe.join(tmp_df, how="outer", rsuffix="_NEW_") + cols = self.__vertex_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + # This only adds data--it doesn't replace existing data + df = df.drop(columns=new_cols).fillna(sub_df) + self.__vertex_prop_dataframe = df # Update the vertex eval dict with the latest column instances latest = { @@ -479,8 +464,6 @@ def add_vertex_data( self.__vertex_prop_eval_dict[ self.vertex_col_name ] = self.__vertex_prop_dataframe.index - # Should we persist? - # self.__vertex_prop_dataframe = self.__vertex_prop_dataframe.persist() def get_vertex_data(self, vertex_ids=None, types=None, columns=None): """ @@ -582,20 +565,19 @@ def add_edge_data( "vertex_col_names contains column(s) not found " f"in dataframe: {list(invalid_columns)}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) if self.__is_edge_id_autogenerated is False and edge_id_col_name is None: @@ -616,31 +598,20 @@ def add_edge_data( self.__num_vertices = None self.__edge_type_value_counts = None # Could update instead + # Add `type_name` to the categorical dtype if necessary TCN = self.type_col_name - default_edge_columns = [self.src_col_name, self.dst_col_name, TCN] - if self.__edge_prop_dataframe is None: - temp_dataframe = cudf.DataFrame(columns=default_edge_columns) - self.__update_dataframe_dtypes( - temp_dataframe, - { - self.src_col_name: dataframe[vertex_col_names[0]].dtype, - self.dst_col_name: dataframe[vertex_col_names[1]].dtype, - }, - ) - temp_dataframe.index.name = self.edge_id_col_name - - # Use categorical dtype for the type column + is_first_data = self.__edge_prop_dataframe is None + if is_first_data: if self.__series_type is dask_cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - temp_dataframe[TCN] = temp_dataframe[TCN].astype(cat_dtype) - - self.__edge_prop_dataframe = dask_cudf.from_cudf( - temp_dataframe, npartitions=self.__num_workers - ) self.__is_edge_id_autogenerated = edge_id_col_name is None + else: + cat_dtype = self.__update_categorical_dtype( + self.__edge_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then @@ -649,11 +620,6 @@ def add_edge_data( tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]] tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]] - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__edge_prop_dataframe, TCN, type_name - ) - tmp_df[TCN] = type_name tmp_df[TCN] = tmp_df[TCN].astype(cat_dtype) @@ -682,7 +648,7 @@ def add_edge_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_edge_columns + property_columns + [self.src_col_name, self.dst_col_name, TCN] ) else: column_names_to_drop = {vertex_col_names[0], vertex_col_names[1]} @@ -691,20 +657,32 @@ def add_edge_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes(tmp_df, self.__edge_prop_dataframe) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__edge_prop_dataframe + ) self.__edge_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices - cols = self.__edge_prop_dataframe.columns.intersection(tmp_df.columns).to_list() - cols.append(self.edge_id_col_name) - # FIXME: workaround for: https://github.com/rapidsai/cudf/issues/11550 - self.__edge_prop_dataframe = ( - self.__edge_prop_dataframe.reset_index() - .merge(tmp_df.reset_index(), on=cols, how="outer") - .set_index(self.edge_id_col_name) - ) - # self.__edge_prop_dataframe = \ - # self.__edge_prop_dataframe.merge(tmp_df, on=cols, how="outer") + # TODO: allow tmp_df to come in with edge id already as index + self.__update_dataframe_dtypes(tmp_df, self.__edge_prop_dtypes) + + if is_first_data: + self.__edge_prop_dataframe = tmp_df + else: + # Join on edge ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__edge_prop_dataframe.join(tmp_df, how="outer", rsuffix="_NEW_") + cols = self.__edge_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + # This only adds data--it doesn't replace existing data + df = df.drop(columns=new_cols).fillna(sub_df) + self.__edge_prop_dataframe = df # Update the edge eval dict with the latest column instances latest = dict( @@ -717,8 +695,6 @@ def add_edge_data( self.__edge_prop_eval_dict[ self.edge_id_col_name ] = self.__edge_prop_dataframe.index - # Should we persist? - # self.__edge_prop_dataframe = self.__edge_prop_dataframe.persist() def get_edge_data(self, edge_ids=None, types=None, columns=None): """ @@ -846,7 +822,7 @@ def extract_subgraph( -------- >>> """ - if (selection is not None) and not isinstance( + if selection is not None and not isinstance( selection, EXPERIMENTAL__MGPropertySelection ): raise TypeError( @@ -860,14 +836,14 @@ def extract_subgraph( # dtypes (eg. int64 to float64 in order to add NaN entries). This # should not be a problem since the conversions do not change the # values. - if (selection is not None) and (selection.vertex_selections is not None): + if selection is not None and selection.vertex_selections is not None: selected_vertex_dataframe = self.__vertex_prop_dataframe[ selection.vertex_selections ] else: selected_vertex_dataframe = None - if (selection is not None) and (selection.edge_selections is not None): + if selection is not None and selection.edge_selections is not None: selected_edge_dataframe = self.__edge_prop_dataframe[ selection.edge_selections ] @@ -880,7 +856,8 @@ def extract_subgraph( # selected verts in both src and dst if ( selected_vertex_dataframe is not None - ) and not selected_vertex_dataframe.empty: + and not selected_vertex_dataframe.empty + ): has_srcs = selected_edge_dataframe[self.src_col_name].isin( selected_vertex_dataframe.index ) @@ -961,8 +938,7 @@ def edge_props_to_graph( if prop_col.count().compute() != prop_col.size: if default_edge_weight is None: raise ValueError( - "edge_weight_property " - f'"{edge_weight_property}" ' + f'edge_weight_property "{edge_weight_property}" ' "contains NA values in the subgraph and " "default_edge_weight is not set" ) @@ -1026,7 +1002,7 @@ def edge_props_to_graph( # take place. The C renumbering only occurs for pylibcugraph algos, # hence the reason these extracted subgraphs only work with PLC algos. if renumber_graph is False: - raise ValueError("currently, renumber_graph must be set to True " "for MG") + raise ValueError("currently, renumber_graph must be set to True for MG") legacy_renum_only = True col_names = [self.src_col_name, self.dst_col_name] @@ -1078,7 +1054,7 @@ def renumber_vertices_by_type(self): else: cat_class = pd.CategoricalDtype - is_cat = isinstance(self.__vertex_prop_dataframe[TCN].dtype, cat_class) + is_cat = isinstance(self.__vertex_prop_dataframe.dtypes[TCN], cat_class) if not is_cat: cat_dtype = cat_class([TCN], ordered=False) self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ @@ -1127,12 +1103,8 @@ def renumber_vertices_by_type(self): cat_dtype = df.index.dtype df.index = df.index.astype(str) - rv = ( - # self._vertex_type_value_counts - df.sort_index() - .cumsum() - .to_frame("stop") - ) + # self._vertex_type_value_counts + rv = df.sort_index().cumsum().to_frame("stop") # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 df.index = df.index.astype(cat_dtype) @@ -1172,12 +1144,8 @@ def renumber_edges_by_type(self): assert df.index.dtype == cat_dtype df.index = df.index.astype(str) - rv = ( - # self._edge_type_value_counts - df.sort_index() - .cumsum() - .to_frame("stop") - ) + # self._edge_type_value_counts + rv = df.sort_index().cumsum().to_frame("stop") # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 df.index = df.index.astype(cat_dtype) @@ -1252,7 +1220,7 @@ def __get_new_column_dtypes(from_df, to_df): column in from_df that is not present in to_df. """ new_cols = set(from_df.columns) - set(to_df.columns) - return [(col, from_df[col].dtype) for col in new_cols] + return [(col, from_df.dtypes[col]) for col in new_cols] @staticmethod def __update_dataframe_dtypes(df, column_dtype_dict): @@ -1262,6 +1230,8 @@ def __update_dataframe_dtypes(df, column_dtype_dict): integer dtypes, needed to accommodate NA values in columns. """ for (col, dtype) in column_dtype_dict.items(): + if col not in df.columns: + continue # If the DataFrame is Pandas and the dtype is an integer type, # ensure a nullable integer array is used by specifying the correct # dtype. The alias for these dtypes is simply a capitalized string @@ -1270,7 +1240,7 @@ def __update_dataframe_dtypes(df, column_dtype_dict): dtype_str = str(dtype) if dtype_str in ["int32", "int64"]: dtype_str = dtype_str.title() - if str(df[col].dtype) != dtype_str: + if str(df.dtypes[col]) != dtype_str: df[col] = df[col].astype(dtype_str) def __update_categorical_dtype(self, df, column, val): diff --git a/python/cugraph/cugraph/structure/property_graph.py b/python/cugraph/cugraph/structure/property_graph.py index 9b7aa3c883a..9cd2613763c 100644 --- a/python/cugraph/cugraph/structure/property_graph.py +++ b/python/cugraph/cugraph/structure/property_graph.py @@ -364,25 +364,24 @@ def add_vertex_data( f"{vertex_col_name} is not a column in " f"dataframe: {dataframe.columns}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) # Save the DataFrame and Series types for future instantiations - if (self.__dataframe_type is None) or (self.__series_type is None): + if self.__dataframe_type is None or self.__series_type is None: self.__dataframe_type = type(dataframe) self.__series_type = type(dataframe[dataframe.columns[0]]) else: @@ -398,49 +397,30 @@ def add_vertex_data( self.__num_vertices = None self.__vertex_type_value_counts = None # Could update instead - # Initialize the __vertex_prop_dataframe if necessary using the same - # type as the incoming dataframe. + # Add `type_name` to the TYPE categorical dtype if necessary TCN = self.type_col_name - default_vertex_columns = [self.vertex_col_name, TCN] - if self.__vertex_prop_dataframe is None: - self.__vertex_prop_dataframe = self.__dataframe_type( - columns=default_vertex_columns - ) - # Initialize the new columns to the same dtype as the appropriate - # column in the incoming dataframe, since the initial merge may not - # result in the same dtype. (see - # https://github.com/rapidsai/cudf/issues/9981) - self.__vertex_prop_dataframe = self.__update_dataframe_dtypes( - self.__vertex_prop_dataframe, - {self.vertex_col_name: dataframe[vertex_col_name].dtype}, - ) - self.__vertex_prop_dataframe.set_index(self.vertex_col_name, inplace=True) - - # Use categorical dtype for the type column + is_first_data = self.__vertex_prop_dataframe is None + if is_first_data: if self.__series_type is cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ - TCN - ].astype(cat_dtype) - - # Ensure that both the predetermined vertex ID column name and vertex - # type column name are present for proper merging. + else: + cat_dtype = self.__update_categorical_dtype( + self.__vertex_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then # deleted when out-of-scope. + + # Ensure that both the predetermined vertex ID column name and vertex + # type column name are present for proper merging. tmp_df = dataframe.copy(deep=True) tmp_df[self.vertex_col_name] = tmp_df[vertex_col_name] # FIXME: handle case of a type_name column already being in tmp_df - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__vertex_prop_dataframe, TCN, type_name - ) - if self.__series_type is cudf.Series: # cudf does not yet support initialization with a scalar tmp_df[TCN] = cudf.Series( @@ -455,7 +435,7 @@ def add_vertex_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_vertex_columns + property_columns + [self.vertex_col_name, TCN] ) else: column_names_to_drop = {vertex_col_name} @@ -464,40 +444,34 @@ def add_vertex_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes( - tmp_df, self.__vertex_prop_dataframe - ) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__vertex_prop_dataframe + ) self.__vertex_prop_dtypes.update(new_col_info) - # Join on vertex ids (the index) + # TODO: allow tmp_df to come in with vertex id already as index tmp_df.set_index(self.vertex_col_name, inplace=True) - # Option 1 - # df = self.__vertex_prop_dataframe.merge( - # tmp_df, - # on=self.vertex_col_name, - # how="outer", - # suffixes=("", "_NEW_"), - # ) - # Option 2 - df = self.__vertex_prop_dataframe.join( - tmp_df, - how="outer", - rsuffix="_NEW_", - ) - cols = self.__vertex_prop_dataframe.columns.intersection( - tmp_df.columns - ).to_list() - if cols: + tmp_df = self.__update_dataframe_dtypes(tmp_df, self.__vertex_prop_dtypes) + + if is_first_data: + self.__vertex_prop_dataframe = tmp_df + else: + # Join on vertex ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__vertex_prop_dataframe.join(tmp_df, how="outer", rsuffix="_NEW_") + cols = self.__vertex_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() rename_cols = {f"{col}_NEW_": col for col in cols} new_cols = list(rename_cols) sub_df = df[new_cols].rename(columns=rename_cols) df.drop(columns=new_cols, inplace=True) - # TODO: should we use the values from `sub_df` if both are non-null? - df.fillna(sub_df, inplace=True) # Option 1 - # df[sub_df.columns] = sub_df.fillna(df[sub_df.columns]) # Option 2 - # Should we update the dtypes? - - self.__vertex_prop_dataframe = df + # This only adds data--it doesn't replace existing data + df.fillna(sub_df, inplace=True) + self.__vertex_prop_dataframe = df # Update the vertex eval dict with the latest column instances if self.__series_type is cudf.Series: @@ -615,25 +589,24 @@ def add_edge_data( "vertex_col_names contains column(s) not found " f"in dataframe: {list(invalid_columns)}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) # Save the DataFrame and Series types for future instantiations - if (self.__dataframe_type is None) or (self.__series_type is None): + if self.__dataframe_type is None or self.__series_type is None: self.__dataframe_type = type(dataframe) self.__series_type = type(dataframe[dataframe.columns[0]]) else: @@ -661,35 +634,20 @@ def add_edge_data( self.__num_vertices = None self.__edge_type_value_counts = None # Could update instead + # Add `type_name` to the categorical dtype if necessary TCN = self.type_col_name - default_edge_columns = [self.src_col_name, self.dst_col_name, TCN] - if self.__edge_prop_dataframe is None: - self.__edge_prop_dataframe = self.__dataframe_type( - columns=default_edge_columns - ) - # Initialize the new columns to the same dtype as the appropriate - # column in the incoming dataframe, since the initial merge may not - # result in the same dtype. (see - # https://github.com/rapidsai/cudf/issues/9981) - self.__edge_prop_dataframe = self.__update_dataframe_dtypes( - self.__edge_prop_dataframe, - { - self.src_col_name: dataframe[vertex_col_names[0]].dtype, - self.dst_col_name: dataframe[vertex_col_names[1]].dtype, - }, - ) - self.__edge_prop_dataframe.index.name = self.edge_id_col_name - - # Use categorical dtype for the type column + is_first_data = self.__edge_prop_dataframe is None + if is_first_data: if self.__series_type is cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - self.__edge_prop_dataframe[TCN] = self.__edge_prop_dataframe[TCN].astype( - cat_dtype - ) self.__is_edge_id_autogenerated = edge_id_col_name is None + else: + cat_dtype = self.__update_categorical_dtype( + self.__edge_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then @@ -698,11 +656,6 @@ def add_edge_data( tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]] tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]] - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__edge_prop_dataframe, TCN, type_name - ) - if self.__series_type is cudf.Series: # cudf does not yet support initialization with a scalar tmp_df[TCN] = cudf.Series( @@ -732,7 +685,7 @@ def add_edge_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_edge_columns + property_columns + [self.src_col_name, self.dst_col_name, TCN] ) else: column_names_to_drop = {vertex_col_names[0], vertex_col_names[1]} @@ -741,15 +694,33 @@ def add_edge_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes(tmp_df, self.__edge_prop_dataframe) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__edge_prop_dataframe + ) self.__edge_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices - cols = self.__edge_prop_dataframe.columns.intersection(tmp_df.columns).to_list() - cols.append(self.edge_id_col_name) - self.__edge_prop_dataframe = self.__edge_prop_dataframe.merge( - tmp_df, on=cols, how="outer" - ) + # TODO: allow tmp_df to come in with edge id already as index + tmp_df = self.__update_dataframe_dtypes(tmp_df, self.__edge_prop_dtypes) + + if is_first_data: + self.__edge_prop_dataframe = tmp_df + else: + # Join on edge ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__edge_prop_dataframe.join(tmp_df, how="outer", rsuffix="_NEW_") + cols = self.__edge_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + df.drop(columns=new_cols, inplace=True) + # This only adds data--it doesn't replace existing data + df.fillna(sub_df, inplace=True) + self.__edge_prop_dataframe = df # Update the edge eval dict with the latest column instances if self.__series_type is cudf.Series: @@ -836,8 +807,9 @@ def select_vertices(self, expr, from_previous_selection=None): # Check if the expr is to be evaluated in the context of properties # from only the previously selected vertices (as opposed to all # properties from all vertices) - if (from_previous_selection is not None) and ( - from_previous_selection.vertex_selections is not None + if ( + from_previous_selection is not None + and from_previous_selection.vertex_selections is not None ): previously_selected_rows = self.__vertex_prop_dataframe[ from_previous_selection.vertex_selections @@ -950,7 +922,7 @@ def extract_subgraph( -------- >>> """ - if (selection is not None) and not isinstance( + if selection is not None and not isinstance( selection, EXPERIMENTAL__PropertySelection ): raise TypeError( @@ -964,14 +936,14 @@ def extract_subgraph( # dtypes (eg. int64 to float64 in order to add NaN entries). This # should not be a problem since the conversions do not change the # values. - if (selection is not None) and (selection.vertex_selections is not None): + if selection is not None and selection.vertex_selections is not None: selected_vertex_dataframe = self.__vertex_prop_dataframe[ selection.vertex_selections ] else: selected_vertex_dataframe = None - if (selection is not None) and (selection.edge_selections is not None): + if selection is not None and selection.edge_selections is not None: selected_edge_dataframe = self.__edge_prop_dataframe[ selection.edge_selections ] @@ -984,7 +956,8 @@ def extract_subgraph( # selected verts in both src and dst if ( selected_vertex_dataframe is not None - ) and not selected_vertex_dataframe.empty: + and not selected_vertex_dataframe.empty + ): has_srcs = selected_edge_dataframe[self.src_col_name].isin( selected_vertex_dataframe.index ) @@ -1096,7 +1069,7 @@ def annotate_dataframe(self, df, G, edge_vertex_col_names): # restore the original dtypes new_df = self.__update_dataframe_dtypes(new_df, self.__edge_prop_dtypes) for col in df.columns: - new_df[col] = new_df[col].astype(df[col].dtype) + new_df[col] = new_df[col].astype(df.dtypes[col]) # FIXME: consider removing internal columns (_EDGE_ID_, etc.) and # columns from edge types not included in the edges in df. @@ -1115,6 +1088,8 @@ def edge_props_to_graph( """ Create and return a Graph from the edges in edge_prop_df. """ + # Don't mutate input data, and ensure DataFrame is not a view + edge_prop_df = edge_prop_df.copy() # FIXME: check default_edge_weight is valid if edge_weight_property: if ( @@ -1137,13 +1112,15 @@ def edge_props_to_graph( if prop_col.count() != prop_col.size: if default_edge_weight is None: raise ValueError( - "edge_weight_property " - f'"{edge_weight_property}" ' + f'edge_weight_property "{edge_weight_property}" ' "contains NA values in the subgraph and " "default_edge_weight is not set" ) + prop_col = prop_col.fillna(default_edge_weight) + if edge_weight_property in edge_prop_df.columns: + edge_prop_df[edge_weight_property] = prop_col else: - prop_col.fillna(default_edge_weight, inplace=True) + edge_prop_df.index = prop_col edge_attr = edge_weight_property # If a default_edge_weight was specified but an edge_weight_property @@ -1237,7 +1214,7 @@ def renumber_vertices_by_type(self): else: cat_class = pd.CategoricalDtype - is_cat = isinstance(self.__vertex_prop_dataframe[TCN].dtype, cat_class) + is_cat = isinstance(self.__vertex_prop_dataframe.dtypes[TCN], cat_class) if not is_cat: cat_dtype = cat_class([TCN], ordered=False) self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ @@ -1280,7 +1257,7 @@ def renumber_edges_by_type(self): else: cat_class = pd.CategoricalDtype - is_cat = isinstance(self.__edge_prop_dataframe[TCN].dtype, cat_class) + is_cat = isinstance(self.__edge_prop_dataframe.dtypes[TCN], cat_class) if not is_cat: cat_dtype = cat_class([TCN], ordered=False) self.__edge_prop_dataframe[TCN] = self.__edge_prop_dataframe[TCN].astype( @@ -1355,7 +1332,7 @@ def __get_new_column_dtypes(from_df, to_df): column in from_df that is not present in to_df. """ new_cols = set(from_df.columns) - set(to_df.columns) - return [(col, from_df[col].dtype) for col in new_cols] + return [(col, from_df.dtypes[col]) for col in new_cols] @staticmethod def __update_dataframe_dtypes(df, column_dtype_dict): @@ -1366,6 +1343,8 @@ def __update_dataframe_dtypes(df, column_dtype_dict): """ update_cols = {} for (col, dtype) in column_dtype_dict.items(): + if col not in df.columns: + continue # If the DataFrame is Pandas and the dtype is an integer type, # ensure a nullable integer array is used by specifying the correct # dtype. The alias for these dtypes is simply a capitalized string @@ -1374,7 +1353,7 @@ def __update_dataframe_dtypes(df, column_dtype_dict): dtype_str = str(dtype) if dtype_str in ["int32", "int64"]: dtype_str = dtype_str.title() - if str(df[col].dtype) != dtype_str: + if str(df.dtypes[col]) != dtype_str: # Assigning to df[col] produces a (false?) warning with Pandas, # but assigning to df.loc[:,col] does not update the df in # cudf, so do one or the other based on type. From 453177944fa1ca59a49984b4214d571475199ca3 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Mon, 24 Oct 2022 22:49:34 -0700 Subject: [PATCH 3/5] Fix SG PG tests --- python/cugraph/cugraph/tests/test_property_graph.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/cugraph/cugraph/tests/test_property_graph.py b/python/cugraph/cugraph/tests/test_property_graph.py index bf857a23c0f..1fd4ec1fda6 100644 --- a/python/cugraph/cugraph/tests/test_property_graph.py +++ b/python/cugraph/cugraph/tests/test_property_graph.py @@ -724,6 +724,7 @@ def test_get_vertex_data_repeated(df_type): afe = assert_frame_equal else: afe = pd.testing.assert_frame_equal + expected["feat"] = expected["feat"].astype("Int64") afe(df1, expected) @@ -819,6 +820,8 @@ def test_get_edge_data_repeated(df_type): afe = assert_frame_equal else: afe = pd.testing.assert_frame_equal + for col in ["edge_feat", pG.src_col_name, pG.dst_col_name]: + expected[col] = expected[col].astype("Int64") afe(df1, expected) From 18de9b28072e094263c6888c620735b2de93fa52 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Tue, 25 Oct 2022 23:17:52 -0700 Subject: [PATCH 4/5] Fix tests, and repartition after join when too many partitions --- .../cugraph/dask/structure/mg_property_graph.py | 10 ++++++++-- .../cugraph/cugraph/tests/mg/test_mg_property_graph.py | 10 +++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 4d7bfc5e620..302380011c5 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -445,7 +445,7 @@ def add_vertex_data( # Join on vertex ids (the index) # TODO: can we automagically determine when we to use concat? df = self.__vertex_prop_dataframe.join( - tmp_df, how="outer", rsuffix="_NEW_" + tmp_df, how="outer", rsuffix="_NEW_", npartitions=self.__num_workers ).persist() cols = self.__vertex_prop_dataframe.columns.intersection( tmp_df.columns @@ -455,6 +455,9 @@ def add_vertex_data( sub_df = df[new_cols].rename(columns=rename_cols) # This only adds data--it doesn't replace existing data df = df.drop(columns=new_cols).fillna(sub_df).persist() + if df.npartitions > 2 * self.__num_workers: + # TODO: better understand behavior of npartitions argument in join + df = df.repartition(npartitions=self.__num_workers).persist() self.__vertex_prop_dataframe = df # Update the vertex eval dict with the latest column instances @@ -675,7 +678,7 @@ def add_edge_data( # Join on edge ids (the index) # TODO: can we automagically determine when we to use concat? df = self.__edge_prop_dataframe.join( - tmp_df, how="outer", rsuffix="_NEW_" + tmp_df, how="outer", rsuffix="_NEW_", npartitions=self.__num_workers ).persist() cols = self.__edge_prop_dataframe.columns.intersection( tmp_df.columns @@ -685,6 +688,9 @@ def add_edge_data( sub_df = df[new_cols].rename(columns=rename_cols) # This only adds data--it doesn't replace existing data df = df.drop(columns=new_cols).fillna(sub_df).persist() + if df.npartitions > 2 * self.__num_workers: + # TODO: better understand behavior of npartitions argument in join + df = df.repartition(npartitions=self.__num_workers).persist() self.__edge_prop_dataframe = df # Update the edge eval dict with the latest column instances diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index ad1e21ea027..aa72320d282 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -825,6 +825,10 @@ def test_get_edge_data_repeated(dask_client): } ) df1[pG.type_col_name] = df1[pG.type_col_name].astype(str) # Undo category + + # Order and indices don't matter + df1 = df1.sort_values(df1.columns).reset_index(drop=True) + expected = expected.sort_values(df1.columns).reset_index(drop=True) assert_frame_equal(df1, expected) @@ -935,7 +939,11 @@ def test_add_data_noncontiguous(): check_names=False, ) - df["vertex"] = 10 * df["src"] + df["dst"] + df["vertex"] = ( + 100 * df["src"] + + df["dst"] + + df["edge_type"].map({"pig": 0, "dog": 10, "cat": 20}) + ) pG = MGPropertyGraph() for edge_type in ["cat", "dog", "pig"]: pG.add_vertex_data( From d21030c36aea14dc11be89ee12d07efd4a550c61 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Tue, 8 Nov 2022 11:11:37 -0800 Subject: [PATCH 5/5] Repartition to 2 * num_workers --- .../dask/structure/mg_property_graph.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 070567059ad..2d2b6f930e0 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -443,7 +443,10 @@ def add_vertex_data( # Join on vertex ids (the index) # TODO: can we automagically determine when we to use concat? df = self.__vertex_prop_dataframe.join( - tmp_df, how="outer", rsuffix="_NEW_", npartitions=self.__num_workers + tmp_df, + how="outer", + rsuffix="_NEW_", + # npartitions=self.__num_workers # TODO: see how this behaves ).persist() cols = self.__vertex_prop_dataframe.columns.intersection( tmp_df.columns @@ -453,9 +456,9 @@ def add_vertex_data( sub_df = df[new_cols].rename(columns=rename_cols) # This only adds data--it doesn't replace existing data df = df.drop(columns=new_cols).fillna(sub_df).persist() - if df.npartitions > 2 * self.__num_workers: + if df.npartitions > 4 * self.__num_workers: # TODO: better understand behavior of npartitions argument in join - df = df.repartition(npartitions=self.__num_workers).persist() + df = df.repartition(npartitions=2 * self.__num_workers).persist() self.__vertex_prop_dataframe = df # Update the vertex eval dict with the latest column instances @@ -676,7 +679,10 @@ def add_edge_data( # Join on edge ids (the index) # TODO: can we automagically determine when we to use concat? df = self.__edge_prop_dataframe.join( - tmp_df, how="outer", rsuffix="_NEW_", npartitions=self.__num_workers + tmp_df, + how="outer", + rsuffix="_NEW_", + # npartitions=self.__num_workers # TODO: see how this behaves ).persist() cols = self.__edge_prop_dataframe.columns.intersection( tmp_df.columns @@ -686,9 +692,9 @@ def add_edge_data( sub_df = df[new_cols].rename(columns=rename_cols) # This only adds data--it doesn't replace existing data df = df.drop(columns=new_cols).fillna(sub_df).persist() - if df.npartitions > 2 * self.__num_workers: + if df.npartitions > 4 * self.__num_workers: # TODO: better understand behavior of npartitions argument in join - df = df.repartition(npartitions=self.__num_workers).persist() + df = df.repartition(npartitions=2 * self.__num_workers).persist() self.__edge_prop_dataframe = df # Update the edge eval dict with the latest column instances