diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index db6144e2d0e..2d2b6f930e0 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -363,20 +363,19 @@ def add_vertex_data( f"{vertex_col_name} is not a column in " f"dataframe: {dataframe.columns}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) @@ -385,52 +384,30 @@ def add_vertex_data( self.__num_vertices = None self.__vertex_type_value_counts = None # Could update instead - # Initialize the __vertex_prop_dataframe if necessary using the same - # type as the incoming dataframe. + # Add `type_name` to the TYPE categorical dtype if necessary TCN = self.type_col_name - default_vertex_columns = [self.vertex_col_name, TCN] - if self.__vertex_prop_dataframe is None: - temp_dataframe = cudf.DataFrame(columns=default_vertex_columns) - self.__vertex_prop_dataframe = dask_cudf.from_cudf( - temp_dataframe, npartitions=self.__num_workers - ) - # Initialize the new columns to the same dtype as the appropriate - # column in the incoming dataframe, since the initial merge may not - # result in the same dtype. (see - # https://github.com/rapidsai/cudf/issues/9981) - self.__update_dataframe_dtypes( - self.__vertex_prop_dataframe, - {self.vertex_col_name: dataframe[vertex_col_name].dtype}, - ) - self.__vertex_prop_dataframe = self.__vertex_prop_dataframe.set_index( - self.vertex_col_name - ) - - # Use categorical dtype for the type column + is_first_data = self.__vertex_prop_dataframe is None + if is_first_data: if self.__series_type is dask_cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ - TCN - ].astype(cat_dtype) - - # Ensure that both the predetermined vertex ID column name and vertex - # type column name are present for proper merging. + else: + cat_dtype = self.__update_categorical_dtype( + self.__vertex_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then # deleted when out-of-scope. + + # Ensure that both the predetermined vertex ID column name and vertex + # type column name are present for proper merging. tmp_df = dataframe.copy() tmp_df[self.vertex_col_name] = tmp_df[vertex_col_name] # FIXME: handle case of a type_name column already being in tmp_df - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__vertex_prop_dataframe, TCN, type_name - ) - tmp_df[TCN] = type_name tmp_df[TCN] = tmp_df[TCN].astype(cat_dtype) @@ -439,7 +416,7 @@ def add_vertex_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_vertex_columns + property_columns + [self.vertex_col_name, TCN] ) else: column_names_to_drop = {vertex_col_name} @@ -448,27 +425,41 @@ def add_vertex_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes( - tmp_df, self.__vertex_prop_dataframe - ) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__vertex_prop_dataframe + ) self.__vertex_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices + # TODO: allow tmp_df to come in with vertex id already as index tmp_df = tmp_df.persist().set_index(self.vertex_col_name).persist() - cols = self.__vertex_prop_dataframe.columns.intersection( - tmp_df.columns - ).to_list() - cols.append(self.vertex_col_name) - # FIXME: workaround for: https://github.com/rapidsai/cudf/issues/11550 - self.__vertex_prop_dataframe = ( - self.__vertex_prop_dataframe.reset_index() - .merge(tmp_df.reset_index(), on=cols, how="outer") - .persist() - .set_index(self.vertex_col_name) - .persist() - ) - # self.__vertex_prop_dataframe = \ - # self.__vertex_prop_dataframe.merge(tmp_df, on=cols, how="outer") + self.__update_dataframe_dtypes(tmp_df, self.__vertex_prop_dtypes) + + if is_first_data: + self.__vertex_prop_dataframe = tmp_df + else: + # Join on vertex ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__vertex_prop_dataframe.join( + tmp_df, + how="outer", + rsuffix="_NEW_", + # npartitions=self.__num_workers # TODO: see how this behaves + ).persist() + cols = self.__vertex_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + # This only adds data--it doesn't replace existing data + df = df.drop(columns=new_cols).fillna(sub_df).persist() + if df.npartitions > 4 * self.__num_workers: + # TODO: better understand behavior of npartitions argument in join + df = df.repartition(npartitions=2 * self.__num_workers).persist() + self.__vertex_prop_dataframe = df # Update the vertex eval dict with the latest column instances latest = { @@ -580,20 +571,19 @@ def add_edge_data( "vertex_col_names contains column(s) not found " f"in dataframe: {list(invalid_columns)}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) if self.__is_edge_id_autogenerated is False and edge_id_col_name is None: @@ -614,31 +604,20 @@ def add_edge_data( self.__num_vertices = None self.__edge_type_value_counts = None # Could update instead + # Add `type_name` to the categorical dtype if necessary TCN = self.type_col_name - default_edge_columns = [self.src_col_name, self.dst_col_name, TCN] - if self.__edge_prop_dataframe is None: - temp_dataframe = cudf.DataFrame(columns=default_edge_columns) - self.__update_dataframe_dtypes( - temp_dataframe, - { - self.src_col_name: dataframe[vertex_col_names[0]].dtype, - self.dst_col_name: dataframe[vertex_col_names[1]].dtype, - }, - ) - temp_dataframe.index.name = self.edge_id_col_name - - # Use categorical dtype for the type column + is_first_data = self.__edge_prop_dataframe is None + if is_first_data: if self.__series_type is dask_cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - temp_dataframe[TCN] = temp_dataframe[TCN].astype(cat_dtype) - - self.__edge_prop_dataframe = dask_cudf.from_cudf( - temp_dataframe, npartitions=self.__num_workers - ) self.__is_edge_id_autogenerated = edge_id_col_name is None + else: + cat_dtype = self.__update_categorical_dtype( + self.__edge_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then @@ -647,11 +626,6 @@ def add_edge_data( tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]] tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]] - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__edge_prop_dataframe, TCN, type_name - ) - tmp_df[TCN] = type_name tmp_df[TCN] = tmp_df[TCN].astype(cat_dtype) @@ -679,7 +653,7 @@ def add_edge_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_edge_columns + property_columns + [self.src_col_name, self.dst_col_name, TCN] ) else: column_names_to_drop = {vertex_col_names[0], vertex_col_names[1]} @@ -688,22 +662,40 @@ def add_edge_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes(tmp_df, self.__edge_prop_dataframe) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__edge_prop_dataframe + ) self.__edge_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices - cols = self.__edge_prop_dataframe.columns.intersection(tmp_df.columns).to_list() - cols.append(self.edge_id_col_name) - # FIXME: workaround for: https://github.com/rapidsai/cudf/issues/11550 - self.__edge_prop_dataframe = ( - self.__edge_prop_dataframe.reset_index() - .merge(tmp_df.reset_index(), on=cols, how="outer") - .persist() - .set_index(self.edge_id_col_name) - .persist() - ) - # self.__edge_prop_dataframe = \ - # self.__edge_prop_dataframe.merge(tmp_df, on=cols, how="outer") + # TODO: allow tmp_df to come in with edge id already as index + self.__update_dataframe_dtypes(tmp_df, self.__edge_prop_dtypes) + + if is_first_data: + self.__edge_prop_dataframe = tmp_df + else: + # Join on edge ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__edge_prop_dataframe.join( + tmp_df, + how="outer", + rsuffix="_NEW_", + # npartitions=self.__num_workers # TODO: see how this behaves + ).persist() + cols = self.__edge_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + # This only adds data--it doesn't replace existing data + df = df.drop(columns=new_cols).fillna(sub_df).persist() + if df.npartitions > 4 * self.__num_workers: + # TODO: better understand behavior of npartitions argument in join + df = df.repartition(npartitions=2 * self.__num_workers).persist() + self.__edge_prop_dataframe = df # Update the edge eval dict with the latest column instances latest = dict( @@ -843,7 +835,7 @@ def extract_subgraph( -------- >>> """ - if (selection is not None) and not isinstance( + if selection is not None and not isinstance( selection, EXPERIMENTAL__MGPropertySelection ): raise TypeError( @@ -857,14 +849,14 @@ def extract_subgraph( # dtypes (eg. int64 to float64 in order to add NaN entries). This # should not be a problem since the conversions do not change the # values. - if (selection is not None) and (selection.vertex_selections is not None): + if selection is not None and selection.vertex_selections is not None: selected_vertex_dataframe = self.__vertex_prop_dataframe[ selection.vertex_selections ] else: selected_vertex_dataframe = None - if (selection is not None) and (selection.edge_selections is not None): + if selection is not None and selection.edge_selections is not None: selected_edge_dataframe = self.__edge_prop_dataframe[ selection.edge_selections ] @@ -877,7 +869,8 @@ def extract_subgraph( # selected verts in both src and dst if ( selected_vertex_dataframe is not None - ) and not selected_vertex_dataframe.empty: + and not selected_vertex_dataframe.empty + ): has_srcs = selected_edge_dataframe[self.src_col_name].isin( selected_vertex_dataframe.index ) @@ -958,8 +951,7 @@ def edge_props_to_graph( if prop_col.count().compute() != prop_col.size: if default_edge_weight is None: raise ValueError( - "edge_weight_property " - f'"{edge_weight_property}" ' + f'edge_weight_property "{edge_weight_property}" ' "contains NA values in the subgraph and " "default_edge_weight is not set" ) @@ -1023,7 +1015,7 @@ def edge_props_to_graph( # take place. The C renumbering only occurs for pylibcugraph algos, # hence the reason these extracted subgraphs only work with PLC algos. if renumber_graph is False: - raise ValueError("currently, renumber_graph must be set to True " "for MG") + raise ValueError("currently, renumber_graph must be set to True for MG") legacy_renum_only = True col_names = [self.src_col_name, self.dst_col_name] @@ -1075,7 +1067,7 @@ def renumber_vertices_by_type(self): else: cat_class = pd.CategoricalDtype - is_cat = isinstance(self.__vertex_prop_dataframe[TCN].dtype, cat_class) + is_cat = isinstance(self.__vertex_prop_dataframe.dtypes[TCN], cat_class) if not is_cat: cat_dtype = cat_class([TCN], ordered=False) self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ @@ -1124,12 +1116,8 @@ def renumber_vertices_by_type(self): cat_dtype = df.index.dtype df.index = df.index.astype(str) - rv = ( - # self._vertex_type_value_counts - df.sort_index() - .cumsum() - .to_frame("stop") - ) + # self._vertex_type_value_counts + rv = df.sort_index().cumsum().to_frame("stop") # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 df.index = df.index.astype(cat_dtype) @@ -1169,12 +1157,8 @@ def renumber_edges_by_type(self): assert df.index.dtype == cat_dtype df.index = df.index.astype(str) - rv = ( - # self._edge_type_value_counts - df.sort_index() - .cumsum() - .to_frame("stop") - ) + # self._edge_type_value_counts + rv = df.sort_index().cumsum().to_frame("stop") # FIXME DASK_CUDF: https://github.com/rapidsai/cudf/issues/11795 df.index = df.index.astype(cat_dtype) @@ -1249,7 +1233,7 @@ def __get_new_column_dtypes(from_df, to_df): column in from_df that is not present in to_df. """ new_cols = set(from_df.columns) - set(to_df.columns) - return [(col, from_df[col].dtype) for col in new_cols] + return [(col, from_df.dtypes[col]) for col in new_cols] @staticmethod def __update_dataframe_dtypes(df, column_dtype_dict): @@ -1259,6 +1243,8 @@ def __update_dataframe_dtypes(df, column_dtype_dict): integer dtypes, needed to accommodate NA values in columns. """ for (col, dtype) in column_dtype_dict.items(): + if col not in df.columns: + continue # If the DataFrame is Pandas and the dtype is an integer type, # ensure a nullable integer array is used by specifying the correct # dtype. The alias for these dtypes is simply a capitalized string @@ -1267,7 +1253,7 @@ def __update_dataframe_dtypes(df, column_dtype_dict): dtype_str = str(dtype) if dtype_str in ["int32", "int64"]: dtype_str = dtype_str.title() - if str(df[col].dtype) != dtype_str: + if str(df.dtypes[col]) != dtype_str: df[col] = df[col].astype(dtype_str) def __update_categorical_dtype(self, df, column, val): diff --git a/python/cugraph/cugraph/structure/property_graph.py b/python/cugraph/cugraph/structure/property_graph.py index bd6b15cc4de..e732f3183a6 100644 --- a/python/cugraph/cugraph/structure/property_graph.py +++ b/python/cugraph/cugraph/structure/property_graph.py @@ -372,25 +372,24 @@ def add_vertex_data( f"{vertex_col_name} is not a column in " f"dataframe: {dataframe.columns}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) # Save the DataFrame and Series types for future instantiations - if (self.__dataframe_type is None) or (self.__series_type is None): + if self.__dataframe_type is None or self.__series_type is None: self.__dataframe_type = type(dataframe) self.__series_type = type(dataframe[dataframe.columns[0]]) else: @@ -406,49 +405,30 @@ def add_vertex_data( self.__num_vertices = None self.__vertex_type_value_counts = None # Could update instead - # Initialize the __vertex_prop_dataframe if necessary using the same - # type as the incoming dataframe. + # Add `type_name` to the TYPE categorical dtype if necessary TCN = self.type_col_name - default_vertex_columns = [self.vertex_col_name, TCN] - if self.__vertex_prop_dataframe is None: - self.__vertex_prop_dataframe = self.__dataframe_type( - columns=default_vertex_columns - ) - # Initialize the new columns to the same dtype as the appropriate - # column in the incoming dataframe, since the initial merge may not - # result in the same dtype. (see - # https://github.com/rapidsai/cudf/issues/9981) - self.__vertex_prop_dataframe = self.__update_dataframe_dtypes( - self.__vertex_prop_dataframe, - {self.vertex_col_name: dataframe[vertex_col_name].dtype}, - ) - self.__vertex_prop_dataframe.set_index(self.vertex_col_name, inplace=True) - - # Use categorical dtype for the type column + is_first_data = self.__vertex_prop_dataframe is None + if is_first_data: if self.__series_type is cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ - TCN - ].astype(cat_dtype) - - # Ensure that both the predetermined vertex ID column name and vertex - # type column name are present for proper merging. + else: + cat_dtype = self.__update_categorical_dtype( + self.__vertex_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then # deleted when out-of-scope. + + # Ensure that both the predetermined vertex ID column name and vertex + # type column name are present for proper merging. tmp_df = dataframe.copy(deep=True) tmp_df[self.vertex_col_name] = tmp_df[vertex_col_name] # FIXME: handle case of a type_name column already being in tmp_df - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__vertex_prop_dataframe, TCN, type_name - ) - if self.__series_type is cudf.Series: # cudf does not yet support initialization with a scalar tmp_df[TCN] = cudf.Series( @@ -463,7 +443,7 @@ def add_vertex_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_vertex_columns + property_columns + [self.vertex_col_name, TCN] ) else: column_names_to_drop = {vertex_col_name} @@ -472,20 +452,34 @@ def add_vertex_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes( - tmp_df, self.__vertex_prop_dataframe - ) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__vertex_prop_dataframe + ) self.__vertex_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices + # TODO: allow tmp_df to come in with vertex id already as index tmp_df.set_index(self.vertex_col_name, inplace=True) - cols = self.__vertex_prop_dataframe.columns.intersection( - tmp_df.columns - ).to_list() - cols.append(self.vertex_col_name) - self.__vertex_prop_dataframe = self.__vertex_prop_dataframe.merge( - tmp_df, on=cols, how="outer" - ) + tmp_df = self.__update_dataframe_dtypes(tmp_df, self.__vertex_prop_dtypes) + + if is_first_data: + self.__vertex_prop_dataframe = tmp_df + else: + # Join on vertex ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__vertex_prop_dataframe.join(tmp_df, how="outer", rsuffix="_NEW_") + cols = self.__vertex_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + df.drop(columns=new_cols, inplace=True) + # This only adds data--it doesn't replace existing data + df.fillna(sub_df, inplace=True) + self.__vertex_prop_dataframe = df # Update the vertex eval dict with the latest column instances if self.__series_type is cudf.Series: @@ -603,25 +597,24 @@ def add_edge_data( "vertex_col_names contains column(s) not found " f"in dataframe: {list(invalid_columns)}" ) - if (type_name is not None) and not isinstance(type_name, str): - raise TypeError("type_name must be a string, got: " f"{type(type_name)}") + if type_name is not None and not isinstance(type_name, str): + raise TypeError(f"type_name must be a string, got: {type(type_name)}") if type_name is None: type_name = self._default_type_name if property_columns: if type(property_columns) is not list: raise TypeError( - "property_columns must be a list, got: " f"{type(property_columns)}" + f"property_columns must be a list, got: {type(property_columns)}" ) invalid_columns = set(property_columns).difference(dataframe.columns) if invalid_columns: raise ValueError( - "property_columns contains column(s) not " - "found in dataframe: " + "property_columns contains column(s) not found in dataframe: " f"{list(invalid_columns)}" ) # Save the DataFrame and Series types for future instantiations - if (self.__dataframe_type is None) or (self.__series_type is None): + if self.__dataframe_type is None or self.__series_type is None: self.__dataframe_type = type(dataframe) self.__series_type = type(dataframe[dataframe.columns[0]]) else: @@ -649,35 +642,20 @@ def add_edge_data( self.__num_vertices = None self.__edge_type_value_counts = None # Could update instead + # Add `type_name` to the categorical dtype if necessary TCN = self.type_col_name - default_edge_columns = [self.src_col_name, self.dst_col_name, TCN] - if self.__edge_prop_dataframe is None: - self.__edge_prop_dataframe = self.__dataframe_type( - columns=default_edge_columns - ) - # Initialize the new columns to the same dtype as the appropriate - # column in the incoming dataframe, since the initial merge may not - # result in the same dtype. (see - # https://github.com/rapidsai/cudf/issues/9981) - self.__edge_prop_dataframe = self.__update_dataframe_dtypes( - self.__edge_prop_dataframe, - { - self.src_col_name: dataframe[vertex_col_names[0]].dtype, - self.dst_col_name: dataframe[vertex_col_names[1]].dtype, - }, - ) - self.__edge_prop_dataframe.index.name = self.edge_id_col_name - - # Use categorical dtype for the type column + is_first_data = self.__edge_prop_dataframe is None + if is_first_data: if self.__series_type is cudf.Series: cat_class = cudf.CategoricalDtype else: cat_class = pd.CategoricalDtype cat_dtype = cat_class([type_name], ordered=False) - self.__edge_prop_dataframe[TCN] = self.__edge_prop_dataframe[TCN].astype( - cat_dtype - ) self.__is_edge_id_autogenerated = edge_id_col_name is None + else: + cat_dtype = self.__update_categorical_dtype( + self.__edge_prop_dataframe, TCN, type_name + ) # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then @@ -686,11 +664,6 @@ def add_edge_data( tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]] tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]] - # Add `type_name` to the categorical dtype if necessary - cat_dtype = self.__update_categorical_dtype( - self.__edge_prop_dataframe, TCN, type_name - ) - if self.__series_type is cudf.Series: # cudf does not yet support initialization with a scalar tmp_df[TCN] = cudf.Series( @@ -720,7 +693,7 @@ def add_edge_data( column_names_to_drop = set(tmp_df.columns) # remove the ones to keep column_names_to_drop.difference_update( - property_columns + default_edge_columns + property_columns + [self.src_col_name, self.dst_col_name, TCN] ) else: column_names_to_drop = {vertex_col_names[0], vertex_col_names[1]} @@ -729,15 +702,33 @@ def add_edge_data( # Save the original dtypes for each new column so they can be restored # prior to constructing subgraphs (since column dtypes may get altered # during merge to accommodate NaN values). - new_col_info = self.__get_new_column_dtypes(tmp_df, self.__edge_prop_dataframe) + if is_first_data: + new_col_info = tmp_df.dtypes.items() + else: + new_col_info = self.__get_new_column_dtypes( + tmp_df, self.__edge_prop_dataframe + ) self.__edge_prop_dtypes.update(new_col_info) - # Join on shared columns and the indices - cols = self.__edge_prop_dataframe.columns.intersection(tmp_df.columns).to_list() - cols.append(self.edge_id_col_name) - self.__edge_prop_dataframe = self.__edge_prop_dataframe.merge( - tmp_df, on=cols, how="outer" - ) + # TODO: allow tmp_df to come in with edge id already as index + tmp_df = self.__update_dataframe_dtypes(tmp_df, self.__edge_prop_dtypes) + + if is_first_data: + self.__edge_prop_dataframe = tmp_df + else: + # Join on edge ids (the index) + # TODO: can we automagically determine when we to use concat? + df = self.__edge_prop_dataframe.join(tmp_df, how="outer", rsuffix="_NEW_") + cols = self.__edge_prop_dataframe.columns.intersection( + tmp_df.columns + ).to_list() + rename_cols = {f"{col}_NEW_": col for col in cols} + new_cols = list(rename_cols) + sub_df = df[new_cols].rename(columns=rename_cols) + df.drop(columns=new_cols, inplace=True) + # This only adds data--it doesn't replace existing data + df.fillna(sub_df, inplace=True) + self.__edge_prop_dataframe = df # Update the edge eval dict with the latest column instances if self.__series_type is cudf.Series: @@ -824,8 +815,9 @@ def select_vertices(self, expr, from_previous_selection=None): # Check if the expr is to be evaluated in the context of properties # from only the previously selected vertices (as opposed to all # properties from all vertices) - if (from_previous_selection is not None) and ( - from_previous_selection.vertex_selections is not None + if ( + from_previous_selection is not None + and from_previous_selection.vertex_selections is not None ): previously_selected_rows = self.__vertex_prop_dataframe[ from_previous_selection.vertex_selections @@ -938,7 +930,7 @@ def extract_subgraph( -------- >>> """ - if (selection is not None) and not isinstance( + if selection is not None and not isinstance( selection, EXPERIMENTAL__PropertySelection ): raise TypeError( @@ -952,14 +944,14 @@ def extract_subgraph( # dtypes (eg. int64 to float64 in order to add NaN entries). This # should not be a problem since the conversions do not change the # values. - if (selection is not None) and (selection.vertex_selections is not None): + if selection is not None and selection.vertex_selections is not None: selected_vertex_dataframe = self.__vertex_prop_dataframe[ selection.vertex_selections ] else: selected_vertex_dataframe = None - if (selection is not None) and (selection.edge_selections is not None): + if selection is not None and selection.edge_selections is not None: selected_edge_dataframe = self.__edge_prop_dataframe[ selection.edge_selections ] @@ -972,7 +964,8 @@ def extract_subgraph( # selected verts in both src and dst if ( selected_vertex_dataframe is not None - ) and not selected_vertex_dataframe.empty: + and not selected_vertex_dataframe.empty + ): has_srcs = selected_edge_dataframe[self.src_col_name].isin( selected_vertex_dataframe.index ) @@ -1084,7 +1077,7 @@ def annotate_dataframe(self, df, G, edge_vertex_col_names): # restore the original dtypes new_df = self.__update_dataframe_dtypes(new_df, self.__edge_prop_dtypes) for col in df.columns: - new_df[col] = new_df[col].astype(df[col].dtype) + new_df[col] = new_df[col].astype(df.dtypes[col]) # FIXME: consider removing internal columns (_EDGE_ID_, etc.) and # columns from edge types not included in the edges in df. @@ -1103,6 +1096,8 @@ def edge_props_to_graph( """ Create and return a Graph from the edges in edge_prop_df. """ + # Don't mutate input data, and ensure DataFrame is not a view + edge_prop_df = edge_prop_df.copy() # FIXME: check default_edge_weight is valid if edge_weight_property: if ( @@ -1125,13 +1120,15 @@ def edge_props_to_graph( if prop_col.count() != prop_col.size: if default_edge_weight is None: raise ValueError( - "edge_weight_property " - f'"{edge_weight_property}" ' + f'edge_weight_property "{edge_weight_property}" ' "contains NA values in the subgraph and " "default_edge_weight is not set" ) + prop_col = prop_col.fillna(default_edge_weight) + if edge_weight_property in edge_prop_df.columns: + edge_prop_df[edge_weight_property] = prop_col else: - prop_col.fillna(default_edge_weight, inplace=True) + edge_prop_df.index = prop_col edge_attr = edge_weight_property # If a default_edge_weight was specified but an edge_weight_property @@ -1225,7 +1222,7 @@ def renumber_vertices_by_type(self): else: cat_class = pd.CategoricalDtype - is_cat = isinstance(self.__vertex_prop_dataframe[TCN].dtype, cat_class) + is_cat = isinstance(self.__vertex_prop_dataframe.dtypes[TCN], cat_class) if not is_cat: cat_dtype = cat_class([TCN], ordered=False) self.__vertex_prop_dataframe[TCN] = self.__vertex_prop_dataframe[ @@ -1268,7 +1265,7 @@ def renumber_edges_by_type(self): else: cat_class = pd.CategoricalDtype - is_cat = isinstance(self.__edge_prop_dataframe[TCN].dtype, cat_class) + is_cat = isinstance(self.__edge_prop_dataframe.dtypes[TCN], cat_class) if not is_cat: cat_dtype = cat_class([TCN], ordered=False) self.__edge_prop_dataframe[TCN] = self.__edge_prop_dataframe[TCN].astype( @@ -1343,7 +1340,7 @@ def __get_new_column_dtypes(from_df, to_df): column in from_df that is not present in to_df. """ new_cols = set(from_df.columns) - set(to_df.columns) - return [(col, from_df[col].dtype) for col in new_cols] + return [(col, from_df.dtypes[col]) for col in new_cols] @staticmethod def __update_dataframe_dtypes(df, column_dtype_dict): @@ -1354,6 +1351,8 @@ def __update_dataframe_dtypes(df, column_dtype_dict): """ update_cols = {} for (col, dtype) in column_dtype_dict.items(): + if col not in df.columns: + continue # If the DataFrame is Pandas and the dtype is an integer type, # ensure a nullable integer array is used by specifying the correct # dtype. The alias for these dtypes is simply a capitalized string @@ -1362,7 +1361,7 @@ def __update_dataframe_dtypes(df, column_dtype_dict): dtype_str = str(dtype) if dtype_str in ["int32", "int64"]: dtype_str = dtype_str.title() - if str(df[col].dtype) != dtype_str: + if str(df.dtypes[col]) != dtype_str: # Assigning to df[col] produces a (false?) warning with Pandas, # but assigning to df.loc[:,col] does not update the df in # cudf, so do one or the other based on type. diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index ad1e21ea027..aa72320d282 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -825,6 +825,10 @@ def test_get_edge_data_repeated(dask_client): } ) df1[pG.type_col_name] = df1[pG.type_col_name].astype(str) # Undo category + + # Order and indices don't matter + df1 = df1.sort_values(df1.columns).reset_index(drop=True) + expected = expected.sort_values(df1.columns).reset_index(drop=True) assert_frame_equal(df1, expected) @@ -935,7 +939,11 @@ def test_add_data_noncontiguous(): check_names=False, ) - df["vertex"] = 10 * df["src"] + df["dst"] + df["vertex"] = ( + 100 * df["src"] + + df["dst"] + + df["edge_type"].map({"pig": 0, "dog": 10, "cat": 20}) + ) pG = MGPropertyGraph() for edge_type in ["cat", "dog", "pig"]: pG.add_vertex_data( diff --git a/python/cugraph/cugraph/tests/test_property_graph.py b/python/cugraph/cugraph/tests/test_property_graph.py index 596e6640fb6..1fd4ec1fda6 100644 --- a/python/cugraph/cugraph/tests/test_property_graph.py +++ b/python/cugraph/cugraph/tests/test_property_graph.py @@ -724,6 +724,7 @@ def test_get_vertex_data_repeated(df_type): afe = assert_frame_equal else: afe = pd.testing.assert_frame_equal + expected["feat"] = expected["feat"].astype("Int64") afe(df1, expected) @@ -819,6 +820,8 @@ def test_get_edge_data_repeated(df_type): afe = assert_frame_equal else: afe = pd.testing.assert_frame_equal + for col in ["edge_feat", pG.src_col_name, pG.dst_col_name]: + expected[col] = expected[col].astype("Int64") afe(df1, expected) @@ -1829,7 +1832,11 @@ def test_add_data_noncontiguous(df_type): check_names=False, ) - df["vertex"] = 10 * df["src"] + df["dst"] + df["vertex"] = ( + 100 * df["src"] + + df["dst"] + + df["edge_type"].map({"pig": 0, "dog": 10, "cat": 20}) + ) pG = PropertyGraph() for edge_type in ["cat", "dog", "pig"]: pG.add_vertex_data(