From 22e9e2bb6b7fb6599f3d7c5b1cc35683591fd6c6 Mon Sep 17 00:00:00 2001 From: Iroy30 <41401566+Iroy30@users.noreply.github.com> Date: Tue, 30 Mar 2021 09:23:52 -0500 Subject: [PATCH] Add indirection and replace algorithms with new renumbering (#1484) Authors: - @Iroy30 Approvers: - Chuck Hastings (@ChuckHastings) - Alex Fender (@afender) - Seunghwa Kang (@seunghwak) URL: https://github.com/rapidsai/cugraph/pull/1484 --- python/cugraph/community/egonet_wrapper.pyx | 3 +- .../community/ktruss_subgraph_wrapper.pyx | 4 + .../community/subgraph_extraction_wrapper.pyx | 1 + python/cugraph/cores/k_core_wrapper.pyx | 4 + .../dask/centrality/katz_centrality.py | 12 +- python/cugraph/dask/common/input_utils.py | 14 +- python/cugraph/dask/community/louvain.py | 13 +- python/cugraph/dask/link_analysis/pagerank.py | 13 +- python/cugraph/dask/traversal/bfs.py | 13 +- python/cugraph/dask/traversal/sssp.py | 12 +- python/cugraph/structure/new_number_map.py | 317 -------- python/cugraph/structure/number_map.py | 686 ++++++------------ python/cugraph/structure/renumber_wrapper.pyx | 127 ++-- python/cugraph/tests/test_renumber.py | 140 ++-- 14 files changed, 417 insertions(+), 942 deletions(-) delete mode 100644 python/cugraph/structure/new_number_map.py diff --git a/python/cugraph/community/egonet_wrapper.pyx b/python/cugraph/community/egonet_wrapper.pyx index ff9f2b8b3de..ead41705628 100644 --- a/python/cugraph/community/egonet_wrapper.pyx +++ b/python/cugraph/community/egonet_wrapper.pyx @@ -33,7 +33,7 @@ def egonet(input_graph, vertices, radius=1): np.dtype("float32") : numberTypeEnum.floatType, np.dtype("double") : numberTypeEnum.doubleType} - [src, dst] = [input_graph.edgelist.edgelist_df['src'], input_graph.edgelist.edgelist_df['dst']] + [src, dst] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'], input_graph.edgelist.edgelist_df['dst']], [np.int32]) vertex_t = src.dtype edge_t = np.dtype("int32") weights = None @@ -54,6 +54,7 @@ def egonet(input_graph, vertices, radius=1): weight_t = np.dtype("float32") # Pointers for egonet + vertices = vertices.astype('int32') cdef uintptr_t c_source_vertex_ptr = vertices.__cuda_array_interface__['data'][0] n_subgraphs = vertices.size n_streams = 1 diff --git a/python/cugraph/community/ktruss_subgraph_wrapper.pyx b/python/cugraph/community/ktruss_subgraph_wrapper.pyx index 9f38b33d774..d3b7a38ba41 100644 --- a/python/cugraph/community/ktruss_subgraph_wrapper.pyx +++ b/python/cugraph/community/ktruss_subgraph_wrapper.pyx @@ -33,6 +33,10 @@ def ktruss_subgraph_double(input_graph, k, use_weights): def ktruss_subgraph(input_graph, k, use_weights): + [input_graph.edgelist.edgelist_df['src'], + input_graph.edgelist.edgelist_df['dst']] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'], + input_graph.edgelist.edgelist_df['dst']], + [np.int32]) if graph_primtypes_wrapper.weight_type(input_graph) == np.float64 and use_weights: return ktruss_subgraph_double(input_graph, k, use_weights) else: diff --git a/python/cugraph/community/subgraph_extraction_wrapper.pyx b/python/cugraph/community/subgraph_extraction_wrapper.pyx index 31c5d2372f0..46dc5c07eaf 100644 --- a/python/cugraph/community/subgraph_extraction_wrapper.pyx +++ b/python/cugraph/community/subgraph_extraction_wrapper.pyx @@ -59,6 +59,7 @@ def subgraph(input_graph, vertices): if weights is not None: c_weights = weights.__cuda_array_interface__['data'][0] + [vertices] = graph_primtypes_wrapper.datatype_cast([vertices], [np.int32]) cdef uintptr_t c_vertices = vertices.__cuda_array_interface__['data'][0] if use_float: diff --git a/python/cugraph/cores/k_core_wrapper.pyx b/python/cugraph/cores/k_core_wrapper.pyx index a0ef99a8e8b..28bb191f4f4 100644 --- a/python/cugraph/cores/k_core_wrapper.pyx +++ b/python/cugraph/cores/k_core_wrapper.pyx @@ -49,6 +49,10 @@ def k_core(input_graph, k, core_number): """ Call k_core """ + [input_graph.edgelist.edgelist_df['src'], + input_graph.edgelist.edgelist_df['dst']] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'], + input_graph.edgelist.edgelist_df['dst']], + [np.int32]) if graph_primtypes_wrapper.weight_type(input_graph) == np.float64: return k_core_double(input_graph, k, core_number) else: diff --git a/python/cugraph/dask/centrality/katz_centrality.py b/python/cugraph/dask/centrality/katz_centrality.py index e690e291928..a2f83a0b2a8 100644 --- a/python/cugraph/dask/centrality/katz_centrality.py +++ b/python/cugraph/dask/centrality/katz_centrality.py @@ -14,8 +14,8 @@ # from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.structure.shuffle import shuffle +from cugraph.dask.common.input_utils import (get_distributed_data, + get_vertex_partition_offsets) from cugraph.dask.centrality import\ mg_katz_centrality_wrapper as mg_katz_centrality import cugraph.comms.comms as Comms @@ -133,11 +133,9 @@ def katz_centrality(input_graph, client = default_client() input_graph.compute_renumber_edge_list(transposed=True) - (ddf, - num_verts, - partition_row_size, - partition_col_size, - vertex_partition_offsets) = shuffle(input_graph, transposed=True) + ddf = input_graph.edgelist.edgelist_df + vertex_partition_offsets = get_vertex_partition_offsets(input_graph) + num_verts = vertex_partition_offsets.iloc[-1] num_edges = len(ddf) data = get_distributed_data(ddf) diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index bbc914da502..0248f429a09 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -217,3 +217,15 @@ def get_distributed_data(input_ddf): if data.worker_info is None and comms is not None: data.calculate_worker_and_rank_info(comms) return data + + +def get_vertex_partition_offsets(input_graph): + import cudf + renumber_vertex_count = input_graph.renumber_map.implementation.ddf.\ + map_partitions(len).compute() + renumber_vertex_cumsum = renumber_vertex_count.cumsum() + vertex_dtype = input_graph.edgelist.edgelist_df['src'].dtype + vertex_partition_offsets = cudf.Series([0], dtype=vertex_dtype) + vertex_partition_offsets = vertex_partition_offsets.append(cudf.Series( + renumber_vertex_cumsum, dtype=vertex_dtype)) + return vertex_partition_offsets diff --git a/python/cugraph/dask/community/louvain.py b/python/cugraph/dask/community/louvain.py index 495061c0f81..c9af0f526c9 100644 --- a/python/cugraph/dask/community/louvain.py +++ b/python/cugraph/dask/community/louvain.py @@ -16,8 +16,8 @@ from dask.distributed import wait, default_client import cugraph.comms.comms as Comms -from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.structure.shuffle import shuffle +from cugraph.dask.common.input_utils import (get_distributed_data, + get_vertex_partition_offsets) from cugraph.dask.community import louvain_wrapper as c_mg_louvain from cugraph.utilities.utils import is_cuda_version_less_than @@ -86,12 +86,9 @@ def louvain(input_graph, max_iter=100, resolution=1.0): input_graph.compute_renumber_edge_list(transposed=False) sorted_by_degree = True - (ddf, - num_verts, - partition_row_size, - partition_col_size, - vertex_partition_offsets) = shuffle(input_graph, transposed=False) - + ddf = input_graph.edgelist.edgelist_df + vertex_partition_offsets = get_vertex_partition_offsets(input_graph) + num_verts = vertex_partition_offsets.iloc[-1] num_edges = len(ddf) data = get_distributed_data(ddf) diff --git a/python/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/dask/link_analysis/pagerank.py index d8a76f1231e..bfaada85a6f 100644 --- a/python/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/dask/link_analysis/pagerank.py @@ -14,8 +14,8 @@ # from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.structure.shuffle import shuffle +from cugraph.dask.common.input_utils import (get_distributed_data, + get_vertex_partition_offsets) from cugraph.dask.link_analysis import mg_pagerank_wrapper as mg_pagerank import cugraph.comms.comms as Comms import dask_cudf @@ -124,11 +124,10 @@ def pagerank(input_graph, client = default_client() input_graph.compute_renumber_edge_list(transposed=True) - (ddf, - num_verts, - partition_row_size, - partition_col_size, - vertex_partition_offsets) = shuffle(input_graph, transposed=True) + + ddf = input_graph.edgelist.edgelist_df + vertex_partition_offsets = get_vertex_partition_offsets(input_graph) + num_verts = vertex_partition_offsets.iloc[-1] num_edges = len(ddf) data = get_distributed_data(ddf) diff --git a/python/cugraph/dask/traversal/bfs.py b/python/cugraph/dask/traversal/bfs.py index 51e0dc0de5d..d108730f665 100644 --- a/python/cugraph/dask/traversal/bfs.py +++ b/python/cugraph/dask/traversal/bfs.py @@ -14,8 +14,8 @@ # from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.structure.shuffle import shuffle +from cugraph.dask.common.input_utils import (get_distributed_data, + get_vertex_partition_offsets) from cugraph.dask.traversal import mg_bfs_wrapper as mg_bfs import cugraph.comms.comms as Comms import cudf @@ -91,11 +91,10 @@ def bfs(graph, client = default_client() graph.compute_renumber_edge_list(transposed=False) - (ddf, - num_verts, - partition_row_size, - partition_col_size, - vertex_partition_offsets) = shuffle(graph, transposed=False) + ddf = graph.edgelist.edgelist_df + vertex_partition_offsets = get_vertex_partition_offsets(graph) + num_verts = vertex_partition_offsets.iloc[-1] + num_edges = len(ddf) data = get_distributed_data(ddf) diff --git a/python/cugraph/dask/traversal/sssp.py b/python/cugraph/dask/traversal/sssp.py index 52f2b9b256c..32e7401023a 100644 --- a/python/cugraph/dask/traversal/sssp.py +++ b/python/cugraph/dask/traversal/sssp.py @@ -14,8 +14,8 @@ # from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.structure.shuffle import shuffle +from cugraph.dask.common.input_utils import (get_distributed_data, + get_vertex_partition_offsets) from cugraph.dask.traversal import mg_sssp_wrapper as mg_sssp import cugraph.comms.comms as Comms import cudf @@ -91,11 +91,9 @@ def sssp(graph, client = default_client() graph.compute_renumber_edge_list(transposed=False) - (ddf, - num_verts, - partition_row_size, - partition_col_size, - vertex_partition_offsets) = shuffle(graph, transposed=False) + ddf = graph.edgelist.edgelist_df + vertex_partition_offsets = get_vertex_partition_offsets(graph) + num_verts = vertex_partition_offsets.iloc[-1] num_edges = len(ddf) data = get_distributed_data(ddf) diff --git a/python/cugraph/structure/new_number_map.py b/python/cugraph/structure/new_number_map.py deleted file mode 100644 index f8a2164d2c4..00000000000 --- a/python/cugraph/structure/new_number_map.py +++ /dev/null @@ -1,317 +0,0 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.structure import renumber_wrapper as c_renumber -import cugraph.comms as Comms -import dask_cudf -import numpy as np -import cudf -import cugraph.structure.number_map as legacy_number_map - - -def call_renumber(sID, - data, - num_edges, - is_mnmg, - store_transposed): - wid = Comms.get_worker_id(sID) - handle = Comms.get_handle(sID) - return c_renumber.renumber(data[0], - num_edges, - wid, - handle, - is_mnmg, - store_transposed) - - -class NumberMap: - - class SingleGPU: - def __init__(self, df, src_col_names, dst_col_names, id_type, - store_transposed): - self.col_names = NumberMap.compute_vals(src_col_names) - self.df = cudf.DataFrame() - self.id_type = id_type - self.store_transposed = store_transposed - self.numbered = False - - def to_internal_vertex_id(self, df, col_names): - tmp_df = df[col_names].rename( - columns=dict(zip(col_names, self.col_names)), copy=False - ) - index_name = NumberMap.generate_unused_column_name(df.columns) - tmp_df[index_name] = tmp_df.index - return ( - self.df.merge(tmp_df, on=self.col_names, how="right") - .sort_values(index_name) - .drop(columns=[index_name]) - .reset_index()["id"] - ) - - def from_internal_vertex_id( - self, df, internal_column_name, external_column_names - ): - tmp_df = self.df.merge( - df, - right_on=internal_column_name, - left_on="id", - how="right", - ) - if internal_column_name != "id": - tmp_df = tmp_df.drop(columns=["id"]) - if external_column_names is None: - return tmp_df - else: - return tmp_df.rename( - columns=dict(zip(self.col_names, external_column_names)), - copy=False, - ) - - class MultiGPU: - def __init__( - self, ddf, src_col_names, dst_col_names, id_type, store_transposed - ): - self.col_names = NumberMap.compute_vals(src_col_names) - self.val_types = NumberMap.compute_vals_types(ddf, src_col_names) - self.val_types["count"] = np.int32 - self.id_type = id_type - self.store_transposed = store_transposed - self.numbered = False - - def to_internal_vertex_id(self, ddf, col_names): - return self.ddf.merge( - ddf, - right_on=col_names, - left_on=self.col_names, - how="right", - )["global_id"] - - def from_internal_vertex_id( - self, df, internal_column_name, external_column_names - ): - tmp_df = self.ddf.merge( - df, - right_on=internal_column_name, - left_on="global_id", - how="right" - ).map_partitions(lambda df: df.drop(columns="global_id")) - - if external_column_names is None: - return tmp_df - else: - return tmp_df.map_partitions( - lambda df: - df.rename( - columns=dict( - zip(self.col_names, external_column_names) - ), - copy=False - ) - ) - - def __init__(self, id_type=np.int32): - self.implementation = None - self.id_type = id_type - - def compute_vals_types(df, column_names): - """ - Helper function to compute internal column names and types - """ - return { - str(i): df[column_names[i]].dtype for i in range(len(column_names)) - } - - def generate_unused_column_name(column_names): - """ - Helper function to generate an unused column name - """ - name = 'x' - while name in column_names: - name = name + "x" - - return name - - def compute_vals(column_names): - """ - Helper function to compute internal column names based on external - column names - """ - return [str(i) for i in range(len(column_names))] - - def renumber(df, src_col_names, dst_col_names, preserve_order=False, - store_transposed=False): - - if isinstance(src_col_names, list): - renumber_type = 'legacy' - # elif isinstance(df[src_col_names].dtype, string): - # renumber_type = 'legacy' - else: - renumber_type = 'experimental' - - if renumber_type == 'legacy': - renumber_map, renumbered_df = legacy_number_map.renumber( - df, - src_col_names, - dst_col_names, - preserve_order, - store_transposed) - # Add shuffling once algorithms are switched to new renumber - # (ddf, - # num_verts, - # partition_row_size, - # partition_col_size, - # vertex_partition_offsets) = shuffle(input_graph, transposed=True) - return renumber_map, renumbered_df - - renumber_map = NumberMap() - if not isinstance(src_col_names, list): - src_col_names = [src_col_names] - dst_col_names = [dst_col_names] - if type(df) is cudf.DataFrame: - renumber_map.implementation = NumberMap.SingleGPU( - df, src_col_names, dst_col_names, renumber_map.id_type, - store_transposed - ) - elif type(df) is dask_cudf.DataFrame: - renumber_map.implementation = NumberMap.MultiGPU( - df, src_col_names, dst_col_names, renumber_map.id_type, - store_transposed - ) - else: - raise Exception("df must be cudf.DataFrame or dask_cudf.DataFrame") - - num_edges = len(df) - - if isinstance(df, dask_cudf.DataFrame): - is_mnmg = True - else: - is_mnmg = False - - if is_mnmg: - client = default_client() - data = get_distributed_data(df) - result = [(client.submit(call_renumber, - Comms.get_session_id(), - wf[1], - num_edges, - is_mnmg, - store_transposed, - workers=[wf[0]]), wf[0]) - for idx, wf in enumerate(data.worker_to_parts.items())] - wait(result) - - def get_renumber_map(data): - return data[0] - - def get_renumbered_df(data): - return data[1] - - renumbering_map = dask_cudf.from_delayed( - [client.submit(get_renumber_map, - data, - workers=[wf]) - for (data, wf) in result]) - renumbered_df = dask_cudf.from_delayed( - [client.submit(get_renumbered_df, - data, - workers=[wf]) - for (data, wf) in result]) - - renumber_map.implementation.ddf = renumbering_map - renumber_map.implementation.numbered = True - - return renumbered_df, renumber_map - else: - renumbering_map, renumbered_df = c_renumber.renumber( - df, - num_edges, - 0, - Comms.get_default_handle(), - is_mnmg, - store_transposed) - renumber_map.implementation.df = renumbering_map - renumber_map.implementation.numbered = True - return renumbered_df, renumber_map - - def unrenumber(self, df, column_name, preserve_order=False): - """ - Given a DataFrame containing internal vertex ids in the identified - column, replace this with external vertex ids. If the renumbering - is from a single column, the output dataframe will use the same - name for the external vertex identifiers. If the renumbering is from - a multi-column input, the output columns will be labeled 0 through - n-1 with a suffix of _column_name. - Note that this function does not guarantee order or partitioning in - multi-GPU mode. - Parameters - ---------- - df: cudf.DataFrame or dask_cudf.DataFrame - A DataFrame containing internal vertex identifiers that will be - converted into external vertex identifiers. - column_name: string - Name of the column containing the internal vertex id. - preserve_order: (optional) bool - If True, preserve the ourder of the rows in the output - DataFrame to match the input DataFrame - Returns - --------- - df : cudf.DataFrame or dask_cudf.DataFrame - The original DataFrame columns exist unmodified. The external - vertex identifiers are added to the DataFrame, the internal - vertex identifier column is removed from the dataframe. - Examples - -------- - >>> M = cudf.read_csv('datasets/karate.csv', delimiter=' ', - >>> dtype=['int32', 'int32', 'float32'], header=None) - >>> - >>> df, number_map = NumberMap.renumber(df, '0', '1') - >>> - >>> G = cugraph.Graph() - >>> G.from_cudf_edgelist(df, 'src', 'dst') - >>> - >>> pr = cugraph.pagerank(G, alpha = 0.85, max_iter = 500, - >>> tol = 1.0e-05) - >>> - >>> pr = number_map.unrenumber(pr, 'vertex') - >>> - """ - if len(self.col_names) == 1: - # Output will be renamed to match input - mapping = {"0": column_name} - else: - # Output will be renamed to ${i}_${column_name} - mapping = {} - for nm in self.col_names: - mapping[nm] = nm + "_" + column_name - - if preserve_order: - index_name = NumberMap.generate_unused_column_name(df) - df[index_name] = df.index - - df = self.from_internal_vertex_id(df, column_name, drop=True) - - if preserve_order: - df = df.sort_values( - index_name - ).drop(columns=index_name).reset_index(drop=True) - - if type(df) is dask_cudf.DataFrame: - return df.map_partitions( - lambda df: df.rename(columns=mapping, copy=False) - ) - else: - return df.rename(columns=mapping, copy=False) diff --git a/python/cugraph/structure/number_map.py b/python/cugraph/structure/number_map.py index deb2b9f4114..5f801eb0d90 100644 --- a/python/cugraph/structure/number_map.py +++ b/python/cugraph/structure/number_map.py @@ -1,4 +1,5 @@ # Copyright (c) 2020-2021, NVIDIA CORPORATION. +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -10,100 +11,45 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# -import cudf +from dask.distributed import wait, default_client +from cugraph.dask.common.input_utils import get_distributed_data +from cugraph.structure import renumber_wrapper as c_renumber +import cugraph.comms.comms as Comms import dask_cudf import numpy as np -import bisect +import cudf + + +def call_renumber(sID, + data, + num_edges, + is_mnmg, + store_transposed): + wid = Comms.get_worker_id(sID) + handle = Comms.get_handle(sID) + return c_renumber.renumber(data[0], + num_edges, + wid, + handle, + is_mnmg, + store_transposed) class NumberMap: - """ - Class used to translate external vertex ids to internal vertex ids - in the cuGraph framework. - - Internal vertex ids are assigned by hashing the external vertex ids - into a structure to eliminate duplicates, and the resulting list - of unique vertices are assigned integers from [0, V) where V is - the number of unique vertices. - - In Single GPU mode, internal vertex ids are constructed using - cudf functions, with a cudf.DataFrame containing the mapping - from external vertex identifiers and internal vertex identifiers - allowing for mapping vertex identifiers in either direction. In - this mode, the order of the output from the mapping functions is - non-deterministic. cudf makes no guarantees about order. If - matching the input order is required set the preserve_order - to True. - - In Multi GPU mode, internal vertex ids are constucted using - dask_cudf functions, with a dask_cudf.DataFrame containing - the mapping from external vertex identifiers and internal - vertex identifiers allowing for mapping vertex identifiers - in either direction. In this mode, the partitioning of - the number_map and the output from any of the mapping functions - are non-deterministic. dask_cudf makes no guarantees about the - partitioning or order of the output. As of this release, - there is no mechanism for controlling that, this will be - addressed at some point. - """ class SingleGPU: def __init__(self, df, src_col_names, dst_col_names, id_type, store_transposed): self.col_names = NumberMap.compute_vals(src_col_names) - self.df = cudf.DataFrame() + self.src_col_names = src_col_names + self.dst_col_names = dst_col_names + self.df = df self.id_type = id_type self.store_transposed = store_transposed - - source_count = 0 - dest_count = 0 - - if store_transposed: - dest_count = 1 - else: - source_count = 1 - - tmp = ( - df[src_col_names] - .assign(count=source_count) - .groupby(src_col_names) - .sum() - .reset_index() - .rename( - columns=dict(zip(src_col_names, self.col_names)), - copy=False, - ) - ) - - if dst_col_names is not None: - tmp_dst = ( - df[dst_col_names] - .assign(count=dest_count) - .groupby(dst_col_names) - .sum() - .reset_index() - ) - for newname, oldname in zip(self.col_names, dst_col_names): - self.df[newname] = tmp[newname].append(tmp_dst[oldname]) - self.df['count'] = tmp['count'].append(tmp_dst['count']) - else: - for newname in self.col_names: - self.df[newname] = tmp[newname] - self.df['count'] = tmp['count'] - self.numbered = False - def compute(self): - if not self.numbered: - tmp = self.df.groupby(self.col_names).sum().sort_values( - 'count', ascending=False - ).reset_index().drop(columns='count') - - tmp["id"] = tmp.index.astype(self.id_type) - self.df = tmp - self.numbered = True - def to_internal_vertex_id(self, df, col_names): tmp_df = df[col_names].rename( columns=dict(zip(col_names, self.col_names)), copy=False @@ -117,6 +63,25 @@ def to_internal_vertex_id(self, df, col_names): .reset_index()["id"] ) + def from_internal_vertex_id( + self, df, internal_column_name, external_column_names + ): + tmp_df = self.df.merge( + df, + right_on=internal_column_name, + left_on="id", + how="right", + ) + if internal_column_name != "id": + tmp_df = tmp_df.drop(columns=["id"]) + if external_column_names is None: + return tmp_df + else: + return tmp_df.rename( + columns=dict(zip(self.col_names, external_column_names)), + copy=False, + ) + def add_internal_vertex_id(self, df, id_column_name, col_names, drop, preserve_order): ret = None @@ -162,76 +127,39 @@ def add_internal_vertex_id(self, df, id_column_name, col_names, return ret - def from_internal_vertex_id( - self, df, internal_column_name, external_column_names - ): - tmp_df = self.df.merge( - df, - right_on=internal_column_name, - left_on="id", - how="right", - ) - if internal_column_name != "id": - tmp_df = tmp_df.drop(columns=["id"]) - if external_column_names is None: - return tmp_df - else: - return tmp_df.rename( - columns=dict(zip(self.col_names, external_column_names)), - copy=False, - ) - - class MultiGPU: - def extract_vertices( - df, src_col_names, dst_col_names, - internal_col_names, store_transposed - ): - source_count = 0 - dest_count = 0 - - if store_transposed: - dest_count = 1 - else: - source_count = 1 + def indirection_map(self, df, src_col_names, dst_col_names): + tmp_df = cudf.DataFrame() - s = ( + tmp = ( df[src_col_names] - .assign(count=source_count) .groupby(src_col_names) - .sum() + .count() .reset_index() .rename( - columns=dict(zip(src_col_names, internal_col_names)), + columns=dict(zip(src_col_names, self.col_names)), copy=False, ) ) - d = None if dst_col_names is not None: - d = ( + tmp_dst = ( df[dst_col_names] - .assign(count=dest_count) .groupby(dst_col_names) - .sum() + .count() .reset_index() - .rename( - columns=dict(zip(dst_col_names, internal_col_names)), - copy=False, - ) ) + for newname, oldname in zip(self.col_names, dst_col_names): + tmp_df[newname] = tmp[newname].append(tmp_dst[oldname]) + else: + for newname in self.col_names: + tmp_df[newname] = tmp[newname] - reply = cudf.DataFrame() - - for i in internal_col_names: - if d is None: - reply[i] = s[i] - else: - reply[i] = s[i].append(d[i]) - - reply['count'] = s['count'].append(d['count']) - - return reply + tmp_df = tmp_df.groupby(self.col_names).count().reset_index() + tmp_df["id"] = tmp_df.index.astype(self.id_type) + self.df = tmp_df + return tmp_df + class MultiGPU: def __init__( self, ddf, src_col_names, dst_col_names, id_type, store_transposed ): @@ -239,110 +167,10 @@ def __init__( self.val_types = NumberMap.compute_vals_types(ddf, src_col_names) self.val_types["count"] = np.int32 self.id_type = id_type + self.ddf = ddf self.store_transposed = store_transposed - self.ddf = ddf.map_partitions( - NumberMap.MultiGPU.extract_vertices, - src_col_names, - dst_col_names, - self.col_names, - store_transposed, - meta=self.val_types, - ) self.numbered = False - # Function to compute partitions based on known divisions of the - # hash value - def compute_partition(df, divisions): - sample = df.index[0] - partition_id = bisect.bisect_right(divisions, sample) - 1 - return df.assign(partition=partition_id) - - def assign_internal_identifiers_kernel( - local_id, partition, global_id, base_addresses - ): - for i in range(len(local_id)): - global_id[i] = local_id[i] + base_addresses[partition[i]] - - def assign_internal_identifiers(df, base_addresses, id_type): - df = df.assign(local_id=df.index.astype(np.int64)) - df = df.apply_rows( - NumberMap.MultiGPU.assign_internal_identifiers_kernel, - incols=["local_id", "partition"], - outcols={"global_id": id_type}, - kwargs={"base_addresses": base_addresses}, - ) - - return df.drop(columns=["local_id", "hash", "partition"]) - - def assign_global_id(self, ddf, base_addresses, val_types): - val_types["global_id"] = self.id_type - del val_types["hash"] - del val_types["partition"] - - ddf = ddf.map_partitions( - lambda df: NumberMap.MultiGPU.assign_internal_identifiers( - df, base_addresses, self.id_type - ), - meta=val_types, - ) - return ddf - - def compute(self): - if not self.numbered: - val_types = self.val_types - val_types["hash"] = np.int32 - - vertices = self.ddf.map_partitions( - lambda df: df.assign(hash=df.hash_columns(self.col_names)), - meta=val_types, - ) - - # Redistribute the ddf based on the hash values - rehashed = vertices.set_index("hash", drop=False) - - # Compute the local partition id (obsolete once - # https://github.com/dask/dask/issues/3707 is completed) - val_types["partition"] = np.int32 - - rehashed_with_partition_id = rehashed.map_partitions( - NumberMap.MultiGPU.compute_partition, - rehashed.divisions, - meta=val_types, - ) - - val_types.pop('count') - - numbering_map = rehashed_with_partition_id.map_partitions( - lambda df: df.groupby( - self.col_names + ["hash", "partition"] - ).sum() - .sort_values('count', ascending=False) - .reset_index() - .drop(columns='count'), - meta=val_types - ) - - # - # Compute base address for each partition - # - counts = numbering_map.map_partitions( - lambda df: df.groupby("partition").count() - ).compute()["hash"].to_pandas() - base_addresses = np.zeros(len(counts) + 1, self.id_type) - - for i in range(len(counts)): - base_addresses[i + 1] = base_addresses[i] + counts[i] - - # - # Update each partition with the base address - # - numbering_map = self.assign_global_id( - numbering_map, cudf.Series(base_addresses), val_types - ) - - self.ddf = numbering_map - self.numbered = True - def to_internal_vertex_id(self, ddf, col_names): return self.ddf.merge( ddf, @@ -351,6 +179,29 @@ def to_internal_vertex_id(self, ddf, col_names): how="right", )["global_id"] + def from_internal_vertex_id( + self, df, internal_column_name, external_column_names + ): + tmp_df = self.ddf.merge( + df, + right_on=internal_column_name, + left_on="global_id", + how="right" + ).map_partitions(lambda df: df.drop(columns="global_id")) + + if external_column_names is None: + return tmp_df + else: + return tmp_df.map_partitions( + lambda df: + df.rename( + columns=dict( + zip(self.col_names, external_column_names) + ), + copy=False + ) + ) + def add_internal_vertex_id(self, ddf, id_column_name, col_names, drop, preserve_order): # At the moment, preserve_order cannot be done on @@ -385,39 +236,50 @@ def add_internal_vertex_id(self, ddf, id_column_name, col_names, drop, return ret - def from_internal_vertex_id( - self, df, internal_column_name, external_column_names - ): - tmp_df = self.ddf.merge( - df, - right_on=internal_column_name, - left_on="global_id", - how="right" - ).map_partitions(lambda df: df.drop(columns="global_id")) + def indirection_map(self, ddf, src_col_names, dst_col_names): - if external_column_names is None: - return tmp_df - else: - return tmp_df.map_partitions( - lambda df: - df.rename( - columns=dict( - zip(self.col_names, external_column_names) - ), - copy=False - ) + tmp = ( + ddf[src_col_names] + .groupby(src_col_names) + .count() + .reset_index() + .rename( + columns=dict(zip(src_col_names, self.col_names)), ) + ) + + if dst_col_names is not None: + tmp_dst = ( + ddf[dst_col_names] + .groupby(dst_col_names) + .count() + .reset_index() + ) + for i, (newname, oldname) in enumerate(zip(self.col_names, + dst_col_names)): + if i == 0: + tmp_df = tmp[newname].append(tmp_dst[oldname]).\ + to_frame(name=newname) + else: + tmp_df[newname] = tmp[newname].append(tmp_dst[oldname]) + print(tmp_df.columns) + else: + for newname in self.col_names: + tmp_df[newname] = tmp[newname] + tmp_ddf = tmp_df.groupby(self.col_names).count().reset_index() + + # Set global index + tmp_ddf = tmp_ddf.assign(idx=1) + tmp_ddf['global_id'] = tmp_ddf.idx.cumsum() - 1 + tmp_ddf = tmp_ddf.drop(columns='idx') + + self.ddf = tmp_ddf + return tmp_ddf def __init__(self, id_type=np.int32): self.implementation = None self.id_type = id_type - def aggregate_count_and_partition(df): - d = {} - d['count'] = df['count'].sum() - d['partition'] = df['partition'].min() - return cudf.Series(d, index=['count', 'partition']) - def compute_vals_types(df, column_names): """ Helper function to compute internal column names and types @@ -443,125 +305,19 @@ def compute_vals(column_names): """ return [str(i) for i in range(len(column_names))] - def from_dataframe( - self, df, src_col_names, dst_col_names=None, store_transposed=False - ): - """ - Populate the numbering map with vertices from the specified - columns of the provided DataFrame. - - Parameters - ---------- - df : cudf.DataFrame or dask_cudf.DataFrame - Contains a list of external vertex identifiers that will be - numbered by the NumberMap class. - src_col_names: list of strings - This list of 1 or more strings contain the names - of the columns that uniquely identify an external - vertex identifier for source vertices - dst_col_names: list of strings - This list of 1 or more strings contain the names - of the columns that uniquely identify an external - vertex identifier for destination vertices - store_transposed : bool - Identify how the graph adjacency will be used. - If True, the graph will be organized by destination. - If False, the graph will be organized by source - - """ - if self.implementation is not None: - raise Exception("NumberMap is already populated") - - if dst_col_names is not None and len(src_col_names) != len( - dst_col_names - ): - raise Exception( - "src_col_names must have same length as dst_col_names" - ) - - if type(df) is cudf.DataFrame: - self.implementation = NumberMap.SingleGPU( - df, src_col_names, dst_col_names, self.id_type, - store_transposed - ) - elif type(df) is dask_cudf.DataFrame: - self.implementation = NumberMap.MultiGPU( - df, src_col_names, dst_col_names, self.id_type, - store_transposed - ) - else: - raise Exception("df must be cudf.DataFrame or dask_cudf.DataFrame") - - self.implementation.compute() - - def from_series(self, src_series, dst_series=None, store_transposed=False): - """ - Populate the numbering map with vertices from the specified - pair of series objects, one for the source and one for - the destination - - Parameters - ---------- - src_series: cudf.Series or dask_cudf.Series - Contains a list of external vertex identifiers that will be - numbered by the NumberMap class. - dst_series: cudf.Series or dask_cudf.Series - Contains a list of external vertex identifiers that will be - numbered by the NumberMap class. - store_transposed : bool - Identify how the graph adjacency will be used. - If True, the graph will be organized by destination. - If False, the graph will be organized by source - """ - if self.implementation is not None: - raise Exception("NumberMap is already populated") - - if dst_series is not None and type(src_series) != type(dst_series): - raise Exception("src_series and dst_series must have same type") - - if type(src_series) is cudf.Series: - dst_series_list = None - df = cudf.DataFrame() - df["s"] = src_series - if dst_series is not None: - df["d"] = dst_series - dst_series_list = ["d"] - self.implementation = NumberMap.SingleGPU( - df, ["s"], dst_series_list, self.id_type, store_transposed - ) - elif type(src_series) is dask_cudf.Series: - dst_series_list = None - df = dask_cudf.DataFrame() - df["s"] = src_series - if dst_series is not None: - df["d"] = dst_series - dst_series_list = ["d"] - self.implementation = NumberMap.MultiGPU( - df, ["s"], dst_series_list, self.id_type, store_transposed - ) - else: - raise Exception( - "src_series must be cudf.Series or " "dask_cudf.Series" - ) - - self.implementation.compute() - def to_internal_vertex_id(self, df, col_names=None): """ Given a collection of external vertex ids, return the internal vertex ids - Parameters ---------- df: cudf.DataFrame, cudf.Series, dask_cudf.DataFrame, dask_cudf.Series Contains a list of external vertex identifiers that will be converted into internal vertex identifiers - col_names: (optional) list of strings This list of 1 or more strings contain the names of the columns that uniquely identify an external vertex identifier - Returns --------- vertex_ids : cudf.Series or dask_cudf.Series @@ -569,7 +325,6 @@ def to_internal_vertex_id(self, df, col_names=None): does not guarantee order or partitioning (in the case of dask_cudf) of vertex ids. If order matters use add_internal_vertex_id - """ tmp_df = None tmp_col_names = None @@ -600,34 +355,27 @@ def add_internal_vertex_id( """ Given a collection of external vertex ids, return the internal vertex ids combined with the input data. - If a series-type input is provided then the series will be in a column named '0'. Otherwise the input column names in the DataFrame will be preserved. - Parameters ---------- df: cudf.DataFrame, cudf.Series, dask_cudf.DataFrame, dask_cudf.Series Contains a list of external vertex identifiers that will be converted into internal vertex identifiers - id_column_name: (optional) string The name to be applied to the column containing the id (defaults to 'id') - col_names: (optional) list of strings This list of 1 or more strings contain the names of the columns that uniquely identify an external vertex identifier - drop: (optional) boolean If True, drop the column names specified in col_names from the returned DataFrame. Defaults to False. - preserve_order: (optional) boolean If True, do extra sorting work to preserve the order of the input DataFrame. Defaults to False. - Returns --------- df : cudf.DataFrame or dask_cudf.DataFrame @@ -635,7 +383,6 @@ def add_internal_vertex_id( with an additional column containing the internal vertex id. Note that there is no guarantee of the order or partitioning of elements in the returned DataFrame. - """ tmp_df = None tmp_col_names = None @@ -671,7 +418,6 @@ def from_internal_vertex_id( """ Given a collection of internal vertex ids, return a DataFrame of the external vertex ids - Parameters ---------- df: cudf.DataFrame, cudf.Series, dask_cudf.DataFrame, dask_cudf.Series @@ -681,20 +427,16 @@ def from_internal_vertex_id( in a column labeled 'id'. If df is a dataframe type object then internal_column_name should identify which column corresponds the the internal vertex id that should be converted - internal_column_name: (optional) string Name of the column containing the internal vertex id. If df is a series then this parameter is ignored. If df is a DataFrame this parameter is required. - external_column_names: (optional) string or list of strings Name of the columns that define an external vertex id. If not specified, columns will be labeled '0', '1,', ..., 'n-1' - drop: (optional) boolean If True the internal column name will be dropped from the DataFrame. Defaults to False. - Returns --------- df : cudf.DataFrame or dask_cudf.DataFrame @@ -727,107 +469,117 @@ def from_internal_vertex_id( return output_df - def column_names(self): - """ - Return the list of internal column names - - Returns - ---------- - List of column names ('0', '1', ..., 'n-1') - """ - return self.implementation.col_names - def renumber(df, src_col_names, dst_col_names, preserve_order=False, store_transposed=False): - """ - Given a single GPU or distributed DataFrame, use src_col_names and - dst_col_names to identify the source vertex identifiers and destination - vertex identifiers, respectively. - - Internal vertex identifiers will be created, numbering vertices as - integers starting from 0. - - The function will return a DataFrame containing the original dataframe - contents with a new column labeled 'src' containing the renumbered - source vertices and a new column labeled 'dst' containing the - renumbered dest vertices, along with a NumberMap object that contains - the number map for the numbering that was used. - - Note that this function does not guarantee order in single GPU mode, - and does not guarantee order or partitioning in multi-GPU mode. If you - wish to preserve ordering, add an index column to df and sort the - return by that index column. - - Parameters - ---------- - df: cudf.DataFrame or dask_cudf.DataFrame - Contains a list of external vertex identifiers that will be - numbered by the NumberMap class. - src_col_names: string or list of strings - This list of 1 or more strings contain the names - of the columns that uniquely identify an external - vertex identifier for source vertices - dst_col_names: string or list of strings - This list of 1 or more strings contain the names - of the columns that uniquely identify an external - vertex identifier for destination vertices - store_transposed : bool - Identify how the graph adjacency will be used. - If True, the graph will be organized by destination. - If False, the graph will be organized by source - - Returns - --------- - df : cudf.DataFrame or dask_cudf.DataFrame - The original DataFrame columns exist unmodified. Columns - are added to the DataFrame to identify the external vertex - identifiers. If external_columns is specified, these names - are used as the names of the output columns. If external_columns - is not specifed the columns are labeled '0', ... 'n-1' based on - the number of columns identifying the external vertex identifiers. - - number_map : NumberMap - The number map object object that retains the mapping between - internal vertex identifiers and external vertex identifiers. + if isinstance(src_col_names, list): + renumber_type = 'legacy' + elif not (df[src_col_names].dtype == np.int32 or + df[src_col_names].dtype == np.int64): + renumber_type = 'legacy' + else: + renumber_type = 'experimental' + df = df.rename(columns={src_col_names: "src", + dst_col_names: "dst"}) - Examples - -------- - >>> M = cudf.read_csv('datasets/karate.csv', delimiter=' ', - >>> dtype=['int32', 'int32', 'float32'], header=None) - >>> - >>> df, number_map = NumberMap.renumber(df, '0', '1') - >>> - >>> G = cugraph.Graph() - >>> G.from_cudf_edgelist(df, 'src', 'dst') - """ renumber_map = NumberMap() - - if isinstance(src_col_names, list): - renumber_map.from_dataframe(df, src_col_names, dst_col_names) - df = renumber_map.add_internal_vertex_id( - df, "src", src_col_names, drop=True, - preserve_order=preserve_order + if not isinstance(src_col_names, list): + src_col_names = [src_col_names] + dst_col_names = [dst_col_names] + if type(df) is cudf.DataFrame: + renumber_map.implementation = NumberMap.SingleGPU( + df, src_col_names, dst_col_names, renumber_map.id_type, + store_transposed ) - df = renumber_map.add_internal_vertex_id( - df, "dst", dst_col_names, drop=True, - preserve_order=preserve_order + elif type(df) is dask_cudf.DataFrame: + renumber_map.implementation = NumberMap.MultiGPU( + df, src_col_names, dst_col_names, renumber_map.id_type, + store_transposed ) else: - renumber_map.from_dataframe(df, [src_col_names], [dst_col_names]) + raise Exception("df must be cudf.DataFrame or dask_cudf.DataFrame") + + if renumber_type == 'legacy': + indirection_map = renumber_map.implementation.\ + indirection_map(df, + src_col_names, + dst_col_names) df = renumber_map.add_internal_vertex_id( df, "src", src_col_names, drop=True, preserve_order=preserve_order ) - df = renumber_map.add_internal_vertex_id( df, "dst", dst_col_names, drop=True, preserve_order=preserve_order ) - if type(df) is dask_cudf.DataFrame: - df = df.persist() + num_edges = len(df) + + if isinstance(df, dask_cudf.DataFrame): + is_mnmg = True + else: + is_mnmg = False + + if is_mnmg: + client = default_client() + data = get_distributed_data(df) + result = [(client.submit(call_renumber, + Comms.get_session_id(), + wf[1], + num_edges, + is_mnmg, + store_transposed, + workers=[wf[0]]), wf[0]) + for idx, wf in enumerate(data.worker_to_parts.items())] + wait(result) + + def get_renumber_map(data): + return data[0] + + def get_renumbered_df(data): + return data[1] + + renumbering_map = dask_cudf.from_delayed( + [client.submit(get_renumber_map, + data, + workers=[wf]) + for (data, wf) in result]) + renumbered_df = dask_cudf.from_delayed( + [client.submit(get_renumbered_df, + data, + workers=[wf]) + for (data, wf) in result]) + if renumber_type == 'legacy': + renumber_map.implementation.ddf = indirection_map.merge( + renumbering_map, + right_on='original_ids', left_on='global_id', + how='right').\ + drop(columns=['global_id', 'original_ids'])\ + .rename(columns={'new_ids': 'global_id'}) + else: + renumber_map.implementation.ddf = renumbering_map.rename( + columns={'original_ids': '0', 'new_ids': 'global_id'}) + renumber_map.implementation.numbered = True + return renumbered_df, renumber_map - return df, renumber_map + else: + renumbering_map, renumbered_df = c_renumber.renumber( + df, + num_edges, + 0, + Comms.get_default_handle(), + is_mnmg, + store_transposed) + if renumber_type == 'legacy': + renumber_map.implementation.df = indirection_map.\ + merge(renumbering_map, + right_on='original_ids', left_on='id').\ + drop(columns=['id', 'original_ids'])\ + .rename(columns={'new_ids': 'id'}, copy=False) + else: + renumber_map.implementation.df = renumbering_map.rename( + columns={'original_ids': '0', 'new_ids': 'id'}, copy=False) + renumber_map.implementation.numbered = True + return renumbered_df, renumber_map def unrenumber(self, df, column_name, preserve_order=False): """ @@ -837,30 +589,24 @@ def unrenumber(self, df, column_name, preserve_order=False): name for the external vertex identifiers. If the renumbering is from a multi-column input, the output columns will be labeled 0 through n-1 with a suffix of _column_name. - Note that this function does not guarantee order or partitioning in multi-GPU mode. - Parameters ---------- df: cudf.DataFrame or dask_cudf.DataFrame A DataFrame containing internal vertex identifiers that will be converted into external vertex identifiers. - column_name: string Name of the column containing the internal vertex id. - preserve_order: (optional) bool If True, preserve the ourder of the rows in the output DataFrame to match the input DataFrame - Returns --------- df : cudf.DataFrame or dask_cudf.DataFrame The original DataFrame columns exist unmodified. The external vertex identifiers are added to the DataFrame, the internal vertex identifier column is removed from the dataframe. - Examples -------- >>> M = cudf.read_csv('datasets/karate.csv', delimiter=' ', diff --git a/python/cugraph/structure/renumber_wrapper.pyx b/python/cugraph/structure/renumber_wrapper.pyx index 302fcfe583b..682c6b32a0f 100644 --- a/python/cugraph/structure/renumber_wrapper.pyx +++ b/python/cugraph/structure/renumber_wrapper.pyx @@ -43,8 +43,8 @@ cdef renumber_helper(shuffled_vertices_t* ptr_maj_min_w, vertex_t, weights): shuffled_minor_series = cudf.Series(data=shuffled_minor_buffer, dtype=vertex_t) shuffled_df = cudf.DataFrame() - shuffled_df['src']=shuffled_major_series - shuffled_df['dst']=shuffled_minor_series + shuffled_df['major_vertices']=shuffled_major_series + shuffled_df['minor_vertices']=shuffled_minor_series if weights is not None: weight_t = weights.dtype @@ -53,7 +53,7 @@ cdef renumber_helper(shuffled_vertices_t* ptr_maj_min_w, vertex_t, weights): shuffled_weights_series = cudf.Series(data=shuffled_weights_buffer, dtype=weight_t) - shuffled_df['weights']= shuffled_weights_series + shuffled_df['value']= shuffled_weights_series return shuffled_df @@ -84,7 +84,7 @@ def renumber(input_df, # maybe use cpdef ? if num_global_edges > (2**31 - 1): edge_t = np.dtype("int64") else: - edge_t = np.dtype("int32") + edge_t = vertex_t if "value" in input_df.columns: weights = input_df['value'] weight_t = weights.dtype @@ -150,15 +150,19 @@ def renumber(input_df, # maybe use cpdef ? num_partition_edges, is_hyper_partitioned).release()) shuffled_df = renumber_helper(ptr_shuffled_32_32.get(), vertex_t, weights) + major_vertices = shuffled_df['major_vertices'] + minor_vertices = shuffled_df['minor_vertices'] + num_partition_edges = len(shuffled_df) + if not transposed: + major = 'src'; minor = 'dst' + else: + major = 'dst'; minor = 'src' + shuffled_df = shuffled_df.rename(columns={'major_vertices':major, 'minor_vertices':minor}, copy=False) else: - shuffled_df = input_df - - shuffled_src = shuffled_df['src'] - shuffled_dst = shuffled_df['dst'] - num_partition_edges = len(shuffled_df) + shuffled_df = input_df - shuffled_major = shuffled_src.__cuda_array_interface__['data'][0] - shuffled_minor = shuffled_dst.__cuda_array_interface__['data'][0] + shuffled_major = major_vertices.__cuda_array_interface__['data'][0] + shuffled_minor = minor_vertices.__cuda_array_interface__['data'][0] ptr_renum_quad_32_32.reset(call_renumber[int, int](deref(handle_ptr), shuffled_major, shuffled_minor, @@ -209,15 +213,19 @@ def renumber(input_df, # maybe use cpdef ? is_hyper_partitioned).release()) shuffled_df = renumber_helper(ptr_shuffled_32_64.get(), vertex_t, weights) + major_vertices = shuffled_df['major_vertices'] + minor_vertices = shuffled_df['minor_vertices'] + num_partition_edges = len(shuffled_df) + if not transposed: + major = 'src'; minor = 'dst' + else: + major = 'dst'; minor = 'src' + shuffled_df = shuffled_df.rename(columns={'major_vertices':major, 'minor_vertices':minor}, copy=False) else: shuffled_df = input_df - - shuffled_src = shuffled_df['src'] - shuffled_dst = shuffled_df['dst'] - num_partition_edges = len(shuffled_df) - - shuffled_major = shuffled_src.__cuda_array_interface__['data'][0] - shuffled_minor = shuffled_dst.__cuda_array_interface__['data'][0] + + shuffled_major = major_vertices.__cuda_array_interface__['data'][0] + shuffled_minor = minor_vertices.__cuda_array_interface__['data'][0] ptr_renum_quad_32_32.reset(call_renumber[int, int](deref(handle_ptr), shuffled_major, @@ -259,6 +267,7 @@ def renumber(input_df, # maybe use cpdef ? renumbered_map['new_ids'] = new_series return renumbered_map, shuffled_df + elif ( edge_t == np.dtype("int64")): if( weight_t == np.dtype("float32")): if(is_multi_gpu): @@ -270,15 +279,19 @@ def renumber(input_df, # maybe use cpdef ? is_hyper_partitioned).release()) shuffled_df = renumber_helper(ptr_shuffled_32_32.get(), vertex_t, weights) + major_vertices = shuffled_df['major_vertices'] + minor_vertices = shuffled_df['minor_vertices'] + num_partition_edges = len(shuffled_df) + if not transposed: + major = 'src'; minor = 'dst' + else: + major = 'dst'; minor = 'src' + shuffled_df = shuffled_df.rename(columns={'major_vertices':major, 'minor_vertices':minor}, copy=False) else: shuffled_df = input_df - - shuffled_src = shuffled_df['src'] - shuffled_dst = shuffled_df['dst'] - num_partition_edges = len(shuffled_df) - - shuffled_major = shuffled_src.__cuda_array_interface__['data'][0] - shuffled_minor = shuffled_dst.__cuda_array_interface__['data'][0] + + shuffled_major = major_vertices.__cuda_array_interface__['data'][0] + shuffled_minor = minor_vertices.__cuda_array_interface__['data'][0] ptr_renum_quad_32_64.reset(call_renumber[int, long](deref(handle_ptr), shuffled_major, @@ -330,15 +343,19 @@ def renumber(input_df, # maybe use cpdef ? is_hyper_partitioned).release()) shuffled_df = renumber_helper(ptr_shuffled_32_64.get(), vertex_t, weights) + major_vertices = shuffled_df['major_vertices'] + minor_vertices = shuffled_df['minor_vertices'] + num_partition_edges = len(shuffled_df) + if not transposed: + major = 'src'; minor = 'dst' + else: + major = 'dst'; minor = 'src' + shuffled_df = shuffled_df.rename(columns={'major_vertices':major, 'minor_vertices':minor}, copy=False) else: shuffled_df = input_df - - shuffled_src = shuffled_df['src'] - shuffled_dst = shuffled_df['dst'] - num_partition_edges = len(shuffled_df) - - shuffled_major = shuffled_src.__cuda_array_interface__['data'][0] - shuffled_minor = shuffled_dst.__cuda_array_interface__['data'][0] + + shuffled_major = major_vertices.__cuda_array_interface__['data'][0] + shuffled_minor = minor_vertices.__cuda_array_interface__['data'][0] ptr_renum_quad_32_64.reset(call_renumber[int, long](deref(handle_ptr), shuffled_major, @@ -379,6 +396,7 @@ def renumber(input_df, # maybe use cpdef ? renumbered_map['new_ids'] = new_series return renumbered_map, shuffled_df + elif (vertex_t == np.dtype("int64")): if ( edge_t == np.dtype("int64")): if( weight_t == np.dtype("float32")): @@ -391,15 +409,19 @@ def renumber(input_df, # maybe use cpdef ? is_hyper_partitioned).release()) shuffled_df = renumber_helper(ptr_shuffled_64_32.get(), vertex_t, weights) + major_vertices = shuffled_df['major_vertices'] + minor_vertices = shuffled_df['minor_vertices'] + num_partition_edges = len(shuffled_df) + if not transposed: + major = 'src'; minor = 'dst' + else: + major = 'dst'; minor = 'src' + shuffled_df = shuffled_df.rename(columns={'major_vertices':major, 'minor_vertices':minor}, copy=False) else: shuffled_df = input_df - - shuffled_src = shuffled_df['src'] - shuffled_dst = shuffled_df['dst'] - num_partition_edges = len(shuffled_df) - - shuffled_major = shuffled_src.__cuda_array_interface__['data'][0] - shuffled_minor = shuffled_dst.__cuda_array_interface__['data'][0] + + shuffled_major = major_vertices.__cuda_array_interface__['data'][0] + shuffled_minor = minor_vertices.__cuda_array_interface__['data'][0] ptr_renum_quad_64_64.reset(call_renumber[long, long](deref(handle_ptr), shuffled_major, @@ -428,8 +450,8 @@ def renumber(input_df, # maybe use cpdef ? uniq_partition_vector_64.get()[0].at(rank_indx+1)), dtype=vertex_t) else: - new_series = cudf.Series(np.arange(uniq_partition_vector_32.get()[0].at(0), - uniq_partition_vector_32.get()[0].at(1)), + new_series = cudf.Series(np.arange(uniq_partition_vector_64.get()[0].at(0), + uniq_partition_vector_64.get()[0].at(1)), dtype=vertex_t) # create new cudf df @@ -441,6 +463,7 @@ def renumber(input_df, # maybe use cpdef ? renumbered_map['new_ids'] = new_series return renumbered_map, shuffled_df + elif( weight_t == np.dtype("float64")): if(is_multi_gpu): ptr_shuffled_64_64.reset(call_shuffle[long, long, double](deref(handle_ptr), @@ -451,15 +474,19 @@ def renumber(input_df, # maybe use cpdef ? is_hyper_partitioned).release()) shuffled_df = renumber_helper(ptr_shuffled_64_64.get(), vertex_t, weights) + major_vertices = shuffled_df['major_vertices'] + minor_vertices = shuffled_df['minor_vertices'] + num_partition_edges = len(shuffled_df) + if not transposed: + major = 'src'; minor = 'dst' + else: + major = 'dst'; minor = 'src' + shuffled_df = shuffled_df.rename(columns={'major_vertices':major, 'minor_vertices':minor}, copy=False) else: shuffled_df = input_df - - shuffled_src = shuffled_df['src'] - shuffled_dst = shuffled_df['dst'] - num_partition_edges = len(shuffled_df) - - shuffled_major = shuffled_src.__cuda_array_interface__['data'][0] - shuffled_minor = shuffled_dst.__cuda_array_interface__['data'][0] + + shuffled_major = major_vertices.__cuda_array_interface__['data'][0] + shuffled_minor = minor_vertices.__cuda_array_interface__['data'][0] ptr_renum_quad_64_64.reset(call_renumber[long, long](deref(handle_ptr), shuffled_major, @@ -488,8 +515,8 @@ def renumber(input_df, # maybe use cpdef ? uniq_partition_vector_64.get()[0].at(rank_indx+1)), dtype=vertex_t) else: - new_series = cudf.Series(np.arange(uniq_partition_vector_32.get()[0].at(0), - uniq_partition_vector_32.get()[0].at(1)), + new_series = cudf.Series(np.arange(uniq_partition_vector_64.get()[0].at(0), + uniq_partition_vector_64.get()[0].at(1)), dtype=vertex_t) # create new cudf df diff --git a/python/cugraph/tests/test_renumber.py b/python/cugraph/tests/test_renumber.py index 6f88d5f85c4..5362d3f5804 100644 --- a/python/cugraph/tests/test_renumber.py +++ b/python/cugraph/tests/test_renumber.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# Copyright (c) 2019-2021, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -44,13 +44,14 @@ def test_renumber_ips(): gdf["source_as_int"] = gdf["source_list"].str.ip2int() gdf["dest_as_int"] = gdf["dest_list"].str.ip2int() - numbering = NumberMap() - numbering.from_series(gdf["source_as_int"], gdf["dest_as_int"]) - src = numbering.to_internal_vertex_id(gdf["source_as_int"]) - dst = numbering.to_internal_vertex_id(gdf["dest_as_int"]) + renumbered_gdf, renumber_map = NumberMap.renumber(gdf, + "source_as_int", + "dest_as_int") - check_src = numbering.from_internal_vertex_id(src)["0"] - check_dst = numbering.from_internal_vertex_id(dst)["0"] + check_src = renumber_map.from_internal_vertex_id(renumbered_gdf['src'] + )["0"] + check_dst = renumber_map.from_internal_vertex_id(renumbered_gdf['dst'] + )["0"] assert check_src.equals(gdf["source_as_int"]) assert check_dst.equals(gdf["dest_as_int"]) @@ -78,13 +79,14 @@ def test_renumber_ips_cols(): gdf["source_as_int"] = gdf["source_list"].str.ip2int() gdf["dest_as_int"] = gdf["dest_list"].str.ip2int() - numbering = NumberMap() - numbering.from_dataframe(gdf, ["source_as_int"], ["dest_as_int"]) - src = numbering.to_internal_vertex_id(gdf["source_as_int"]) - dst = numbering.to_internal_vertex_id(gdf["dest_as_int"]) + renumbered_gdf, renumber_map = NumberMap.renumber(gdf, + ["source_as_int"], + ["dest_as_int"]) - check_src = numbering.from_internal_vertex_id(src)["0"] - check_dst = numbering.from_internal_vertex_id(dst)["0"] + check_src = renumber_map.from_internal_vertex_id(renumbered_gdf['src'] + )["0"] + check_dst = renumber_map.from_internal_vertex_id(renumbered_gdf['dst'] + )["0"] assert check_src.equals(gdf["source_as_int"]) assert check_dst.equals(gdf["dest_as_int"]) @@ -110,13 +112,14 @@ def test_renumber_ips_str_cols(): gdf = cudf.from_pandas(pdf) - numbering = NumberMap() - numbering.from_dataframe(gdf, ["source_list"], ["dest_list"]) - src = numbering.to_internal_vertex_id(gdf["source_list"]) - dst = numbering.to_internal_vertex_id(gdf["dest_list"]) + renumbered_gdf, renumber_map = NumberMap.renumber(gdf, + ["source_as_int"], + ["dest_as_int"]) - check_src = numbering.from_internal_vertex_id(src)["0"] - check_dst = numbering.from_internal_vertex_id(dst)["0"] + check_src = renumber_map.from_internal_vertex_id(renumbered_gdf['src'] + )["0"] + check_dst = renumber_map.from_internal_vertex_id(renumbered_gdf['dst'] + )["0"] assert check_src.equals(gdf["source_list"]) assert check_dst.equals(gdf["dest_list"]) @@ -130,13 +133,14 @@ def test_renumber_negative(): gdf = cudf.DataFrame.from_pandas(df[["source_list", "dest_list"]]) - numbering = NumberMap() - numbering.from_dataframe(gdf, ["source_list"], ["dest_list"]) - src = numbering.to_internal_vertex_id(gdf["source_list"]) - dst = numbering.to_internal_vertex_id(gdf["dest_list"]) + renumbered_gdf, renumber_map = NumberMap.renumber(gdf, + "source_list", + "dest_list") - check_src = numbering.from_internal_vertex_id(src)["0"] - check_dst = numbering.from_internal_vertex_id(dst)["0"] + check_src = renumber_map.from_internal_vertex_id(renumbered_gdf['src'] + )["0"] + check_dst = renumber_map.from_internal_vertex_id(renumbered_gdf['dst'] + )["0"] assert check_src.equals(gdf["source_list"]) assert check_dst.equals(gdf["dest_list"]) @@ -150,19 +154,21 @@ def test_renumber_negative_col(): gdf = cudf.DataFrame.from_pandas(df[["source_list", "dest_list"]]) - numbering = NumberMap() - numbering.from_dataframe(gdf, ["source_list"], ["dest_list"]) - src = numbering.to_internal_vertex_id(gdf["source_list"]) - dst = numbering.to_internal_vertex_id(gdf["dest_list"]) + renumbered_gdf, renumber_map = NumberMap.renumber(gdf, + "source_list", + "dest_list") - check_src = numbering.from_internal_vertex_id(src)["0"] - check_dst = numbering.from_internal_vertex_id(dst)["0"] + check_src = renumber_map.from_internal_vertex_id(renumbered_gdf['src'] + )["0"] + check_dst = renumber_map.from_internal_vertex_id(renumbered_gdf['dst'] + )["0"] assert check_src.equals(gdf["source_list"]) assert check_dst.equals(gdf["dest_list"]) # Test all combinations of default/managed and pooled/non-pooled allocation +@pytest.mark.skip(reason="dropped renumbering from series support") @pytest.mark.parametrize("graph_file", utils.DATASETS) def test_renumber_series(graph_file): gc.collect() @@ -215,19 +221,21 @@ def test_renumber_files(graph_file): df["dst"] = cudf.Series([x + translate for x in destinations. values_host]) - numbering = NumberMap() - numbering.from_series(df["src"], df["dst"]) + exp_src = cudf.Series([x + translate for x in sources. + values_host]) + exp_dst = cudf.Series([x + translate for x in destinations. + values_host]) - renumbered_df = numbering.add_internal_vertex_id( - numbering.add_internal_vertex_id(df, "src_id", ["src"]), - "dst_id", ["dst"] - ) + renumbered_df, renumber_map = NumberMap.renumber(df, "src", "dst", + preserve_order=True) - check_src = numbering.from_internal_vertex_id(renumbered_df, "src_id") - check_dst = numbering.from_internal_vertex_id(renumbered_df, "dst_id") + unrenumbered_df = renumber_map.unrenumber(renumbered_df, "src", + preserve_order=True) + unrenumbered_df = renumber_map.unrenumber(unrenumbered_df, "dst", + preserve_order=True) - assert check_src["src"].equals(check_src["0"]) - assert check_dst["dst"].equals(check_dst["0"]) + assert exp_src.equals(unrenumbered_df["src"]) + assert exp_dst.equals(unrenumbered_df["dst"]) # Test all combinations of default/managed and pooled/non-pooled allocation @@ -246,19 +254,21 @@ def test_renumber_files_col(graph_file): gdf['dst'] = cudf.Series([x + translate for x in destinations. values_host]) - numbering = NumberMap() - numbering.from_dataframe(gdf, ["src"], ["dst"]) + exp_src = cudf.Series([x + translate for x in sources. + values_host]) + exp_dst = cudf.Series([x + translate for x in destinations. + values_host]) - renumbered_df = numbering.add_internal_vertex_id( - numbering.add_internal_vertex_id(gdf, "src_id", ["src"]), - "dst_id", ["dst"] - ) + renumbered_df, renumber_map = NumberMap.renumber(gdf, ["src"], ["dst"], + preserve_order=True) - check_src = numbering.from_internal_vertex_id(renumbered_df, "src_id") - check_dst = numbering.from_internal_vertex_id(renumbered_df, "dst_id") + unrenumbered_df = renumber_map.unrenumber(renumbered_df, "src", + preserve_order=True) + unrenumbered_df = renumber_map.unrenumber(unrenumbered_df, "dst", + preserve_order=True) - assert check_src["src"].equals(check_src["0"]) - assert check_dst["dst"].equals(check_dst["0"]) + assert exp_src.equals(unrenumbered_df["src"]) + assert exp_dst.equals(unrenumbered_df["dst"]) # Test all combinations of default/managed and pooled/non-pooled allocation @@ -278,21 +288,17 @@ def test_renumber_files_multi_col(graph_file): gdf["src"] = sources + translate gdf["dst"] = destinations + translate - numbering = NumberMap() - numbering.from_dataframe(gdf, ["src", "src_old"], ["dst", "dst_old"]) + renumbered_df, renumber_map = NumberMap.renumber(gdf, + ["src", "src_old"], + ["dst", "dst_old"], + preserve_order=True) - renumbered_df = numbering.add_internal_vertex_id( - numbering.add_internal_vertex_id( - gdf, "src_id", ["src", "src_old"] - ), - "dst_id", - ["dst", "dst_old"], - ) + unrenumbered_df = renumber_map.unrenumber(renumbered_df, "src", + preserve_order=True) + unrenumbered_df = renumber_map.unrenumber(unrenumbered_df, "dst", + preserve_order=True) - check_src = numbering.from_internal_vertex_id(renumbered_df, "src_id") - check_dst = numbering.from_internal_vertex_id(renumbered_df, "dst_id") - - assert check_src["src"].equals(check_src["0"]) - assert check_src["src_old"].equals(check_src["1"]) - assert check_dst["dst"].equals(check_dst["0"]) - assert check_dst["dst_old"].equals(check_dst["1"]) + assert gdf["src"].equals(unrenumbered_df["0_src"]) + assert gdf["src_old"].equals(unrenumbered_df["1_src"]) + assert gdf["dst"].equals(unrenumbered_df["0_dst"]) + assert gdf["dst_old"].equals(unrenumbered_df["1_dst"])