Skip to content

Commit

Permalink
Update the list of algos to benchmark (#2337)
Browse files Browse the repository at this point in the history
This PR
1. Update the way `uniform neighbor sample` is imported( it has been removed from experimental)
2. Ping `libraft-headers` and `pyraft` to 22.08
3. Add `Triangle count` to the list of algos to benchmarks

Authors:
  - Joseph Nke (https://github.com/jnke2016)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)

URL: #2337
  • Loading branch information
jnke2016 authored Jun 21, 2022
1 parent 78b394c commit 0bcb6e0
Show file tree
Hide file tree
Showing 21 changed files with 659 additions and 304 deletions.
67 changes: 40 additions & 27 deletions benchmarks/python_e2e/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ def __init__(self,
# FIXME: need to accept and save individual algo args
self.construct_graph = benchmark(construct_graph_func)

#add starting node to algos: BFS and SSSP
# add starting node to algos: BFS and SSSP
# FIXME: Refactor BenchmarkRun __init__ because all the work
# done below should be done elsewhere
for i, algo in enumerate (algo_func_param_list):
if benchmark(algo).name in ["bfs", "sssp", "neighborhood_sampling"]:
if benchmark(algo).name in ["bfs", "sssp", "uniform_neighbor_sample"]:
param={}
param["start"]=self.input_dataframe['src'].head()[0]
if benchmark(algo).name in ["neighborhood_sampling"]:
if benchmark(algo).name in ["uniform_neighbor_sample"]:
start = [param.pop("start")]
labels = [0]
param["start_info_list"] = (start, labels)
param["start_list"] = start
param["fanout_vals"] = [1]
algo_func_param_list[i]=(algo,)+(param,)

Expand Down Expand Up @@ -128,32 +129,44 @@ def run(self):
self.__log("done.")
G = result.retval
self.results.append(result)

#algos with transposed=True : PageRank, Katz
#algos with transposed=False: BFS, SSSP, Louvain, HITS, Neighborhood_sampling
#algos supporting the legacy_renum_only: HITS, Neighborhood_sampling
#
# Algos with transposed=True : PageRank, Katz.
# Algos with transposed=False: BFS, SSSP, Louvain, HITS,
# Neighborhood_sampling.
# Algos supporting the legacy_renum_only: HITS, Neighborhood_sampling
#
for i in range(len(self.algos)):
if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering
if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist":
largest_out_degree = G.out_degree().compute().\
nlargest(n=1, columns="degree") #compute outdegree before renumbering because outdegree has transpose=False
largest_out_degree = largest_out_degree["degree"].iloc[0]
katz_alpha = 1 / (largest_out_degree + 1)
self.algos[i][1]["alpha"] = katz_alpha
elif self.algos[i][0].name == "katz" and self.construct_graph.name == "from_cudf_edgelist":
largest_out_degree = G.out_degree().nlargest(n=1, columns="degree")
largest_out_degree = largest_out_degree["degree"].iloc[0]
katz_alpha = 1 / (largest_out_degree + 1)
self.algos[i][1]["alpha"] = katz_alpha
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=True)
elif self.algos[i][0].name in ["neighborhood_sampling", "hits"]:
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=False, legacy_renum_only=True)
# set transpose=True when renumbering
if self.algos[i][0].name in ["pagerank", "katz"]:
if self.algos[i][0].name == "katz":
if self.construct_graph.name == "from_dask_cudf_edgelist":
# compute out_degree before renumbering because out_degree
# has transpose=False
degree_max = G.degree()['degree'].max().compute()
katz_alpha = 1 / (degree_max)
self.algos[i][1]["alpha"] = katz_alpha
elif self.construct_graph.name == "from_cudf_edgelist":
degree_max = G.degree()['degree'].max()
katz_alpha = 1 / (degree_max)
self.algos[i][1]["alpha"] = katz_alpha
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(
transposed=True, legacy_renum_only=True)
else:
# FIXME: Pagerank still follows the old path. Update this once it
# follows the pylibcugraph/C path
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=True)
else: #set transpose=False when renumbering
self.__log("running compute_renumber_edge_list...", end="")
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=False)
if self.algos[i][0].name in ["wcc", "louvain"]:
# FIXME: Pagerank and Louvain still follow the old path.
# Update this once it follows the pylibcugraph/C path
G.compute_renumber_edge_list(transposed=False)
else:
G.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)
self.__log("done.")
# FIXME: need to handle individual algo args
for ((algo, params), validator) in zip(self.algos, self.validators):
Expand Down
36 changes: 22 additions & 14 deletions benchmarks/python_e2e/cugraph_dask_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from cugraph.structure.symmetrize import symmetrize_ddf
from cugraph.dask.common.mg_utils import get_visible_devices
from dask_cuda.initialize import initialize
from cugraph.experimental.dask import uniform_neighborhood_sampling
import cudf

import cugraph
from cugraph.dask.comms import comms as Comms
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.generators import rmat
import tempfile

Expand Down Expand Up @@ -109,10 +107,15 @@ def construct_graph(dask_dataframe, symmetric=False):
object must be symmetrized and have self loops removed.
"""

G = cugraph.DiGraph()
if symmetric:
G = cugraph.Graph(directed=False)
else:
G = cugraph.Graph(directed=True)

if len(dask_dataframe.columns) > 2:
if symmetric: #symmetrize dask dataframe
dask_dataframe = symmetrize_ddf(dask_dataframe, 'src', 'dst', 'weight')
dask_dataframe = symmetrize_ddf(
dask_dataframe, 'src', 'dst', 'weight')

G.from_dask_cudf_edgelist(
dask_dataframe, source="src", destination="dst", edge_attr="weight")
Expand All @@ -130,11 +133,12 @@ def construct_graph(dask_dataframe, symmetric=False):


def bfs(G, start):
return cugraph.dask.bfs(G, start=start, return_distances=True)
return cugraph.dask.bfs(
G, start=start, return_distances=True, check_start=False)


def sssp(G, start):
return cugraph.dask.sssp(G, source=start)
return cugraph.dask.sssp(G, source=start, check_start=False)


def wcc(G):
Expand All @@ -156,15 +160,19 @@ def katz(G, alpha=None):
def hits(G):
return cugraph.dask.hits(G)

def neighborhood_sampling(G, start_info_list=None, fanout_vals=None):
def uniform_neighbor_sample(G, start_list=None, fanout_vals=None):
# convert list to cudf.Series
start_info_list = (
cudf.Series(start_info_list[0], dtype="int32"),
cudf.Series(start_info_list[1], dtype="int32"),
)

return uniform_neighborhood_sampling(
G, start_info_list=start_info_list, fanout_vals=fanout_vals)
start_list = cudf.Series(start_list, dtype="int32")
return cugraph.dask.uniform_neighbor_sample(
G, start_list=start_list, fanout_vals=fanout_vals)

def triangle_count(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.dask.triangle_count(G)

def eigenvector_centrality(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.dask.eigenvector_centrality(G)

################################################################################
# Session-wide setup and teardown
Expand Down
23 changes: 20 additions & 3 deletions benchmarks/python_e2e/cugraph_funcs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -15,6 +15,7 @@

import cugraph
from cugraph.generators import rmat
import cudf


def generate_edgelist(scale,
Expand Down Expand Up @@ -96,9 +97,9 @@ def construct_graph(dataframe, symmetric=False):
symmetrized and have self loops removed.
"""
if symmetric:
G = cugraph.Graph()
G = cugraph.Graph(directed=False)
else:
G = cugraph.DiGraph()
G = cugraph.Graph(directed=True)

if len(dataframe.columns) > 2:
G.from_cudf_edgelist(
Expand Down Expand Up @@ -137,6 +138,22 @@ def pagerank(G):
def katz(G, alpha=None):
return cugraph.katz_centrality(G, alpha)

def hits(G):
return cugraph.hits(G)

def uniform_neighbor_sample(G, start_list=None, fanout_vals=None):
# convert list to cudf.Series
start_list = cudf.Series(start_list, dtype="int32")
return cugraph.uniform_neighbor_sample(
G, start_list=start_list, fanout_vals=fanout_vals)

def triangle_count(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.experimental.triangle_count(G)

def eigenvector_centrality(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.eigenvector_centrality(G)

################################################################################
# Session-wide setup and teardown
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/python_e2e/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def run(algos,
"katz": funcs.katz,
"wcc": funcs.wcc,
"hits": funcs.hits,
"neighborhood_sampling": funcs.neighborhood_sampling,
"uniform_neighbor_sample": funcs.uniform_neighbor_sample,
"triangle_count": funcs.triangle_count,
"eigenvector_centrality": funcs.eigenvector_centrality,
}

if algos:
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/python_e2e/reporting.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -43,15 +43,15 @@ def generate_console_report(benchmark_result_list):
# the graph_create run, then a run of each algo.
r = benchmark_result_list[0]
name = f"{r.name}({__namify_dict(r.params)})"
space = " " * (30 - len(name))
space = " " * (70 - len(name))
retstring += f"{name}{space}{r.runtime:.6}\n"

remaining_results = benchmark_result_list[1:]

for r in remaining_results:
retstring += f"{'-'*60}\n"
retstring += f"{'-'*80}\n"
name = f"{r.name}({__namify_dict(r.params)})"
space = " " * (30 - len(name))
space = " " * (70 - len(name))
retstring += f"{name}{space}{r.runtime:.6}\n"

return retstring
Expand Down
18 changes: 8 additions & 10 deletions python/cugraph/cugraph/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


def katz_centrality(
G, alpha=None, beta=None, max_iter=100, tol=1.0e-6,
G, alpha=None, beta=1.0, max_iter=100, tol=1.0e-6,
nstart=None, normalized=True
):
"""
Expand Down Expand Up @@ -114,11 +114,16 @@ def katz_centrality(
>>> kc = cugraph.katz_centrality(G)
"""
G, isNx = ensure_cugraph_obj_for_nx(G)

if alpha is None:
degree_max = G.degree()['degree'].max()
alpha = 1 / (degree_max)

if (alpha is not None) and (alpha <= 0.0):
raise ValueError(f"'alpha' must be a positive float or None, "
f"got: {alpha}")
if beta is None:
beta = 1.0

elif (not isinstance(beta, float)) or (beta <= 0.0):
raise ValueError(f"'beta' must be a positive float or None, "
f"got: {beta}")
Expand All @@ -128,8 +133,6 @@ def katz_centrality(
if (not isinstance(tol, float)) or (tol <= 0.0):
raise ValueError(f"'tol' must be a positive float, got: {tol}")

G, isNx = ensure_cugraph_obj_for_nx(G)

srcs = G.edgelist.edgelist_df['src']
dsts = G.edgelist.edgelist_df['dst']
if 'weights' in G.edgelist.edgelist_df.columns:
Expand All @@ -139,11 +142,6 @@ def katz_centrality(
# with type hardcoded to float32 is passed into wrapper
weights = cudf.Series((srcs + 1) / (srcs + 1), dtype="float32")

if alpha is None:
largest_out_degree = G.degrees().nlargest(n=1, columns="out_degree")
largest_out_degree = largest_out_degree["out_degree"].iloc[0]
alpha = 1 / (largest_out_degree + 1)

if nstart is not None:
if G.renumbered is True:
if len(G.renumber_map.implementation.col_names) > 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def eigenvector_centrality(
"""
client = default_client()
# Calling renumbering results in data that is sorted by degree
input_graph.compute_renumber_edge_list(transposed=False)
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)

graph_properties = GraphProperties(
is_multigraph=False)
Expand Down
19 changes: 15 additions & 4 deletions python/cugraph/cugraph/dask/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ def katz_centrality(
"""
client = default_client()

if alpha is None:
degree_max = input_graph.degree()['degree'].max().compute()
alpha = 1 / (degree_max)

if (alpha is not None) and (alpha <= 0.0):
raise ValueError(f"'alpha' must be a positive float or None, "
f"got: {alpha}")

# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(transposed=True,
legacy_renum_only=False)

graph_properties = GraphProperties(
is_multigraph=False)

Expand All @@ -188,10 +203,6 @@ def katz_centrality(
num_edges = len(ddf)
data = get_distributed_data(ddf)

# FIXME: Incorporate legacy_renum_only=True to only trigger the python
# renumbering when more support is added in the C/C++ API
input_graph.compute_renumber_edge_list(transposed=True,
legacy_renum_only=False)
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]

Expand Down
14 changes: 9 additions & 5 deletions python/cugraph/cugraph/dask/common/part_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ async def _extract_partitions(dask_obj, client=None, batch_enabled=False):
persisted = [client.persist(
dask_obj.get_partition(p), workers=w) for p, w in enumerate(
worker_list[:dask_obj.npartitions])]
# Persist empty dataframe with the remaining workers if there are
# less partitions than workers
# Persist empty dataframe/series with the remaining workers if
# there are less partitions than workers
if dask_obj.npartitions < len(worker_list):
# The empty df should have the same column names and dtypes as
# dask_obj
empty_df = cudf.DataFrame(columns=list(dask_obj.columns))
empty_df = empty_df.astype(dict(zip(
dask_obj.columns, dask_obj.dtypes)))
if isinstance(dask_obj, dask_cudf.DataFrame):
empty_df = cudf.DataFrame(columns=list(dask_obj.columns))
empty_df = empty_df.astype(dict(zip(
dask_obj.columns, dask_obj.dtypes)))
else:
empty_df = cudf.Series(dtype=dask_obj.dtype)

for p, w in enumerate(worker_list[dask_obj.npartitions:]):
empty_ddf = dask_cudf.from_cudf(empty_df, npartitions=1)
persisted.append(client.persist(empty_ddf, workers=w))
Expand Down
Loading

0 comments on commit 0bcb6e0

Please sign in to comment.