Skip to content

Commit

Permalink
Add indirection and replace algorithms with new renumbering (#1484)
Browse files Browse the repository at this point in the history
Authors:
  - @Iroy30

Approvers:
  - Chuck Hastings (@ChuckHastings)
  - Alex Fender (@afender)
  - Seunghwa Kang (@seunghwak)

URL: #1484
  • Loading branch information
Iroy30 authored Mar 30, 2021
1 parent e60d9f7 commit 22e9e2b
Show file tree
Hide file tree
Showing 14 changed files with 417 additions and 942 deletions.
3 changes: 2 additions & 1 deletion python/cugraph/community/egonet_wrapper.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def egonet(input_graph, vertices, radius=1):
np.dtype("float32") : <int>numberTypeEnum.floatType,
np.dtype("double") : <int>numberTypeEnum.doubleType}

[src, dst] = [input_graph.edgelist.edgelist_df['src'], input_graph.edgelist.edgelist_df['dst']]
[src, dst] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'], input_graph.edgelist.edgelist_df['dst']], [np.int32])
vertex_t = src.dtype
edge_t = np.dtype("int32")
weights = None
Expand All @@ -54,6 +54,7 @@ def egonet(input_graph, vertices, radius=1):
weight_t = np.dtype("float32")

# Pointers for egonet
vertices = vertices.astype('int32')
cdef uintptr_t c_source_vertex_ptr = vertices.__cuda_array_interface__['data'][0]
n_subgraphs = vertices.size
n_streams = 1
Expand Down
4 changes: 4 additions & 0 deletions python/cugraph/community/ktruss_subgraph_wrapper.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def ktruss_subgraph_double(input_graph, k, use_weights):


def ktruss_subgraph(input_graph, k, use_weights):
[input_graph.edgelist.edgelist_df['src'],
input_graph.edgelist.edgelist_df['dst']] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'],
input_graph.edgelist.edgelist_df['dst']],
[np.int32])
if graph_primtypes_wrapper.weight_type(input_graph) == np.float64 and use_weights:
return ktruss_subgraph_double(input_graph, k, use_weights)
else:
Expand Down
1 change: 1 addition & 0 deletions python/cugraph/community/subgraph_extraction_wrapper.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def subgraph(input_graph, vertices):
if weights is not None:
c_weights = weights.__cuda_array_interface__['data'][0]

[vertices] = graph_primtypes_wrapper.datatype_cast([vertices], [np.int32])
cdef uintptr_t c_vertices = vertices.__cuda_array_interface__['data'][0]

if use_float:
Expand Down
4 changes: 4 additions & 0 deletions python/cugraph/cores/k_core_wrapper.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def k_core(input_graph, k, core_number):
"""
Call k_core
"""
[input_graph.edgelist.edgelist_df['src'],
input_graph.edgelist.edgelist_df['dst']] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'],
input_graph.edgelist.edgelist_df['dst']],
[np.int32])
if graph_primtypes_wrapper.weight_type(input_graph) == np.float64:
return k_core_double(input_graph, k, core_number)
else:
Expand Down
12 changes: 5 additions & 7 deletions python/cugraph/dask/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
#

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.structure.shuffle import shuffle
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.centrality import\
mg_katz_centrality_wrapper as mg_katz_centrality
import cugraph.comms.comms as Comms
Expand Down Expand Up @@ -133,11 +133,9 @@ def katz_centrality(input_graph,
client = default_client()

input_graph.compute_renumber_edge_list(transposed=True)
(ddf,
num_verts,
partition_row_size,
partition_col_size,
vertex_partition_offsets) = shuffle(input_graph, transposed=True)
ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]
num_edges = len(ddf)
data = get_distributed_data(ddf)

Expand Down
14 changes: 13 additions & 1 deletion python/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -217,3 +217,15 @@ def get_distributed_data(input_ddf):
if data.worker_info is None and comms is not None:
data.calculate_worker_and_rank_info(comms)
return data


def get_vertex_partition_offsets(input_graph):
import cudf
renumber_vertex_count = input_graph.renumber_map.implementation.ddf.\
map_partitions(len).compute()
renumber_vertex_cumsum = renumber_vertex_count.cumsum()
vertex_dtype = input_graph.edgelist.edgelist_df['src'].dtype
vertex_partition_offsets = cudf.Series([0], dtype=vertex_dtype)
vertex_partition_offsets = vertex_partition_offsets.append(cudf.Series(
renumber_vertex_cumsum, dtype=vertex_dtype))
return vertex_partition_offsets
13 changes: 5 additions & 8 deletions python/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from dask.distributed import wait, default_client

import cugraph.comms.comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.structure.shuffle import shuffle
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.community import louvain_wrapper as c_mg_louvain
from cugraph.utilities.utils import is_cuda_version_less_than

Expand Down Expand Up @@ -86,12 +86,9 @@ def louvain(input_graph, max_iter=100, resolution=1.0):
input_graph.compute_renumber_edge_list(transposed=False)
sorted_by_degree = True

(ddf,
num_verts,
partition_row_size,
partition_col_size,
vertex_partition_offsets) = shuffle(input_graph, transposed=False)

ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]
num_edges = len(ddf)
data = get_distributed_data(ddf)

Expand Down
13 changes: 6 additions & 7 deletions python/cugraph/dask/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
#

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.structure.shuffle import shuffle
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.link_analysis import mg_pagerank_wrapper as mg_pagerank
import cugraph.comms.comms as Comms
import dask_cudf
Expand Down Expand Up @@ -124,11 +124,10 @@ def pagerank(input_graph,
client = default_client()

input_graph.compute_renumber_edge_list(transposed=True)
(ddf,
num_verts,
partition_row_size,
partition_col_size,
vertex_partition_offsets) = shuffle(input_graph, transposed=True)

ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]
num_edges = len(ddf)
data = get_distributed_data(ddf)

Expand Down
13 changes: 6 additions & 7 deletions python/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
#

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.structure.shuffle import shuffle
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.traversal import mg_bfs_wrapper as mg_bfs
import cugraph.comms.comms as Comms
import cudf
Expand Down Expand Up @@ -91,11 +91,10 @@ def bfs(graph,
client = default_client()

graph.compute_renumber_edge_list(transposed=False)
(ddf,
num_verts,
partition_row_size,
partition_col_size,
vertex_partition_offsets) = shuffle(graph, transposed=False)
ddf = graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(graph)
num_verts = vertex_partition_offsets.iloc[-1]

num_edges = len(ddf)
data = get_distributed_data(ddf)

Expand Down
12 changes: 5 additions & 7 deletions python/cugraph/dask/traversal/sssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
#

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.structure.shuffle import shuffle
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.traversal import mg_sssp_wrapper as mg_sssp
import cugraph.comms.comms as Comms
import cudf
Expand Down Expand Up @@ -91,11 +91,9 @@ def sssp(graph,
client = default_client()

graph.compute_renumber_edge_list(transposed=False)
(ddf,
num_verts,
partition_row_size,
partition_col_size,
vertex_partition_offsets) = shuffle(graph, transposed=False)
ddf = graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(graph)
num_verts = vertex_partition_offsets.iloc[-1]
num_edges = len(ddf)
data = get_distributed_data(ddf)

Expand Down
Loading

0 comments on commit 22e9e2b

Please sign in to comment.