Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds parameterized benchmarks for uniform_neighbor_sampling, updates benchmarks dir for future additions #3048

Merged
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
517d6ab
initial (untested) refactoring of test utils to move shared gen_fixtu…
rlratzel Nov 19, 2022
1c8bb1c
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Nov 19, 2022
7db35e6
Added util to generate fixture params from a specific list, updated t…
rlratzel Nov 20, 2022
291ebd4
Updated fixture parameterization, added more marks for parameterized …
rlratzel Nov 22, 2022
c24eb23
WIP: adding code for graph creation
rlratzel Nov 22, 2022
a58aadd
Updates to run uniform_neighbor_sample for local SG.
rlratzel Nov 22, 2022
fc6fa32
Added temporary hack to get unique vertices for the start_list, added…
rlratzel Nov 22, 2022
63d1150
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Nov 22, 2022
74b5629
Initial refactoring for supporting both local and remote Graphs.
rlratzel Nov 23, 2022
8cf460a
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Nov 23, 2022
5bdad72
Added meta-data about extensions to value returned from get_server_in…
rlratzel Nov 23, 2022
24c962e
Added server extensions for benchmarks and testing which are packaged…
rlratzel Nov 23, 2022
0a95b74
Updates to support initial RemoteGraph runs, still WIP.
rlratzel Nov 24, 2022
810cfac
Updates to use strings instead of objects for making benchmark report…
rlratzel Nov 27, 2022
03c1bbd
updated run-dask-process for newer CLI args for UCX, added support fo…
rlratzel Nov 29, 2022
3b382bc
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Nov 29, 2022
2c2e999
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Nov 30, 2022
b10346f
Re-organizing top-level benchmarks dir to better accommodate more ben…
rlratzel Dec 5, 2022
878d32b
Moving uniform_neighbor_sampling benchmarks and params to new common …
rlratzel Dec 5, 2022
9489483
Added PLC standalone benchmark not committed before, renamed pytest d…
rlratzel Dec 5, 2022
a013a6c
Moved params out of benchmark file into separate params.py file so ti…
rlratzel Dec 5, 2022
d5bcfa8
Further refactorings to separate local from remote: removed unneeded …
rlratzel Dec 6, 2022
5d02856
Additional updates for moving genFixtureParamsProduct to PLC, added m…
rlratzel Dec 6, 2022
e354057
Updated parameterization to match perf tracking spreadsheet, added ex…
rlratzel Dec 6, 2022
fbf7792
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Dec 6, 2022
e6e1c4e
Copyright updates for current year.
rlratzel Dec 6, 2022
93fac88
Added temporary debug prints, changed param names to be more accurate…
rlratzel Dec 8, 2022
0cfe4a9
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Dec 13, 2022
f650429
Using MultiGraph instead of Graph for benchmarks, added additional RM…
rlratzel Dec 14, 2022
5d0bd77
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Dec 14, 2022
f2860bc
Merge remote-tracking branch 'upstream/branch-23.02' into branch-23.0…
rlratzel Dec 15, 2022
227a113
Fixed new flake8 style complaints
rlratzel Dec 15, 2022
1e78e05
Undo-ing variable rename in order to match the rest of the code.
rlratzel Dec 15, 2022
86cd55d
Updated cugraph-service-server setup.py to include subpackages.
rlratzel Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import os
from pathlib import Path

import pytest
import numpy as np

# If the rapids-pytest-benchmark plugin is installed, the "gpubenchmark"
# fixture will be available automatically. Check that this fixture is available
# by trying to import rapids_pytest_benchmark, and if that fails, set
# "gpubenchmark" to the standard "benchmark" fixture provided by
# pytest-benchmark.
try:
import rapids_pytest_benchmark # noqa: F401
except ImportError:
import pytest_benchmark

gpubenchmark = pytest_benchmark.plugin.benchmark

from cugraph_service_client import CugraphServiceClient
from cugraph_service_client.exceptions import CugraphServiceError
from cugraph_service_client import RemoteGraph
from cugraph_service_server.testing import utils

from cugraph_benchmarking import params

_seed = 42


def create_remote_graph(graph_data, is_mg, client):
"""
Create a remote graph instance based on the data to be loaded/generated,
relying on server-side graph creation extensions.

The server extension is part of the
"cugraph_service_server.testing.benchmark_server_extension" and is loaded
in the ensure_running_service_for_sampling() helper.
"""
# Assume strings are names of datasets in the datasets package
if isinstance(graph_data, str):
gid = client.call_graph_creation_extension(
"create_graph_from_builtin_dataset", graph_data
)

# Assume dictionary contains RMAT params
elif isinstance(graph_data, dict):
scale = graph_data["scale"]
num_edges = (2**scale) * graph_data["edgefactor"]
seed = _seed
gid = client.call_graph_creation_extension(
"create_graph_from_rmat_generator",
scale=scale,
num_edges=num_edges,
seed=seed,
mg=is_mg,
)
else:
raise TypeError(f"graph_data can only be str or dict, got {type(graph_data)}")

G = RemoteGraph(client, gid)
return G


def get_uniform_neighbor_sample_args(
G, seed, batch_size, fanout, with_replacement
):
"""
Return a dictionary containing the args for uniform_neighbor_sample based
on the graph and desired args passed in. For example, if a large start list
and small fanout list is desired, a "large" (based on graph size) list of
valid vert IDs for the graph passed in and a "small" list of fanout values
will be returned.

The dictionary return value allows for easily supporting other args without
having to maintain an order of values in a return tuple, for example.
"""
if with_replacement not in [True, False]:
raise ValueError(f"got unexpected value {with_replacement=}")

num_verts = G.number_of_vertices()

if batch_size > num_verts:
num_start_verts = int(num_verts * 0.25)
else:
num_start_verts = batch_size

# Create the start_list on the server, since generating a list of actual
# IDs requires unrenumbering steps that cannot be easily done remotely.
start_list = G._client.call_extension(
"gen_vertex_list", G._graph_id, num_start_verts, seed
)

return {
"start_list": list(start_list),
"fanout": fanout,
"with_replacement": with_replacement,
}


def ensure_running_service_for_sampling(dask_scheduler_file=None,
start_local_cuda_cluster=False):
"""
Returns a tuple containing a Popen object for the running cugraph-service
server subprocess, and a client object connected to it. If a server was
detected already running, the Popen object will be None.
"""
host = "localhost"
port = 9090
client = CugraphServiceClient(host, port)
server_process = None

try:
client.uptime()
print("FOUND RUNNING SERVER, ASSUMING IT SHOULD BE USED FOR TESTING!")

except CugraphServiceError:
# A server was not found, so start one for testing then stop it when
# testing is done.
server_process = utils.start_server_subprocess(
host=host,
port=port,
start_local_cuda_cluster=start_local_cuda_cluster,
dask_scheduler_file=dask_scheduler_file,
)

# Ensure the extensions needed for these benchmarks are loaded
required_graph_creation_extension_module = "benchmark_server_extension"
server_data = client.get_server_info()
# .stem excludes .py extensions, so it can match a python module name
loaded_graph_creation_extension_modules = [
Path(m).stem for m in server_data["graph_creation_extensions"]
]
if (
required_graph_creation_extension_module
not in loaded_graph_creation_extension_modules
):
modules_loaded = client.load_graph_creation_extensions(
"cugraph_service_server.testing.benchmark_server_extension"
)
if len(modules_loaded) < 1:
raise RuntimeError(
"failed to load graph creation extension "
f"{required_graph_creation_extension_module}"
)

loaded_extension_modules = [Path(m).stem for m in server_data["extensions"]]
if required_graph_creation_extension_module not in loaded_extension_modules:
modules_loaded = client.load_extensions(
"cugraph_service_server.testing.benchmark_server_extension"
)
if len(modules_loaded) < 1:
raise RuntimeError(
"failed to load extension "
f"{required_graph_creation_extension_module}"
)

return (server_process, client)


def remote_uniform_neighbor_sample(G, start_list, fanout_vals, with_replacement=True):
"""
Calls uniform_neighbor_sample() on the server using the client assigned to
the RemoteGraph instance G.
"""
assert G.is_remote()
result = G._client.uniform_neighbor_sample(
start_list, fanout_vals, with_replacement, graph_id=G._graph_id, result_device=1
)
return result


@pytest.fixture(scope="module", params=params.graph_obj_fixture_params)
def remote_graph_objs(request):
"""
Fixture that returns a RemoteGraph object populated with graph data and
algo callable based on the parameters. This also ensures a cugraph-service
server if not.
"""
(gpu_config, graph_data) = request.param
server_process = None

if gpu_config not in ["SG", "SNMG", "MNMG"]:
raise RuntimeError(f"got unexpected gpu_config value: {gpu_config}")

# Ensure the appropriate server is running
if gpu_config == "SG":
(server_process, cgs_client) = ensure_running_service_for_sampling()
is_mg = False

elif gpu_config == "SNMG":
dask_scheduler_file = os.environ.get("SCHEDULER_FILE")
if dask_scheduler_file is None:
(server_process, cgs_client) = ensure_running_service_for_sampling(
start_local_cuda_cluster=True
)
else:
assert Path(dask_scheduler_file).exists()
(server_process, cgs_client) = ensure_running_service_for_sampling(
dask_scheduler_file=dask_scheduler_file
)
is_mg = True

else:
raise NotImplementedError(f"{gpu_config=}")

print("creating graph...")
st = time.perf_counter_ns()
G = create_remote_graph(graph_data, is_mg, cgs_client)
print(f"done creating graph, took {((time.perf_counter_ns() - st) / 1e9)}s")

uns_func = remote_uniform_neighbor_sample

yield (G, uns_func)

del G # is this necessary?
if server_process is not None:
print("\nTerminating server...", end="", flush=True)
server_process.terminate()
server_process.wait(timeout=60)
print("done.", flush=True)


################################################################################
# Benchmarks
@pytest.mark.parametrize("batch_size", params.batch_sizes.values())
@pytest.mark.parametrize("fanout", [params.fanout_10_25, params.fanout_5_10_15])
@pytest.mark.parametrize(
"with_replacement", [False], ids=lambda v: f"with_replacement={v}"
)
def bench_cgs_uniform_neighbor_sample(
gpubenchmark, remote_graph_objs, batch_size, fanout, with_replacement
):
(G, uniform_neighbor_sample_func) = remote_graph_objs

uns_args = get_uniform_neighbor_sample_args(
G, _seed, batch_size, fanout, with_replacement
)
# print(f"\n{uns_args}")
# FIXME: uniform_neighbor_sample cannot take a np.ndarray for start_list
result = gpubenchmark(
uniform_neighbor_sample_func,
G,
start_list=uns_args["start_list"],
fanout_vals=uns_args["fanout"],
with_replacement=uns_args["with_replacement"],
)
dtmap = {"int32": 32 // 8, "int64": 64 // 8}
dt = str(result.sources.dtype)
llen = len(result.sources)
print(f"\nresult list len: {llen} (x3), dtype={dt}, total bytes={3*llen*dtmap[dt]}")
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,44 @@ def setFixtureParamNames(*args, **kwargs):
import cugraph
from cugraph.structure.number_map import NumberMap
from cugraph.testing import utils
from pylibcugraph.testing import gen_fixture_params_product
from cugraph.utilities.utils import is_device_version_less_than
import rmm

from .params import FIXTURE_PARAMS
from cugraph_benchmarking.params import (
directed_datasets,
undirected_datasets,
managed_memory,
pool_allocator,
)

fixture_params = gen_fixture_params_product(
(directed_datasets + undirected_datasets, "ds"),
(managed_memory, "mm"),
(pool_allocator, "pa"))

###############################################################################
# Helpers
def createGraph(csvFileName, graphType=None):
"""
Helper function to create a Graph or DiGraph based on csvFileName.
Helper function to create a Graph (directed or undirected) based on
csvFileName.
"""
if graphType is None:
# There's potential value in verifying that a DiGraph can be created
# from a undirected dataset, and a Graph from a directed. (For now?) do
# not include those combinations to keep benchmark runtime and
# complexity lower, and assume tests have coverage to verify
# correctness for those combinations.
# There's potential value in verifying that a directed graph can be
# created from a undirected dataset, and an undirected from a directed
# dataset. (For now?) do not include those combinations to keep
# benchmark runtime and complexity lower, and assume tests have
# coverage to verify correctness for those combinations.
if "directed" in csvFileName.parts:
graphType = cugraph.structure.graph_classes.DiGraph
graph_obj = cugraph.Graph(directed=True)
else:
graphType = cugraph.structure.graph_classes.Graph
graph_obj = cugraph.Graph()

return cugraph.from_cudf_edgelist(
utils.read_csv_file(csvFileName),
source="0", destination="1", edge_attr="2",
create_using=graphType,
create_using=graph_obj,
renumber=True)


Expand Down Expand Up @@ -93,7 +105,7 @@ def reinitRMM(managed_mem, pool_alloc):
# For benchmarks, the operations performed in fixtures are not measured as part
# of the benchmark.
@pytest.fixture(scope="module",
params=FIXTURE_PARAMS)
params=fixture_params)
def edgelistCreated(request):
"""
Returns a new edgelist created from a CSV, which is specified as part of
Expand All @@ -111,7 +123,7 @@ def edgelistCreated(request):


@pytest.fixture(scope="module",
params=FIXTURE_PARAMS)
params=fixture_params)
def graphWithAdjListComputed(request):
"""
Create a Graph obj from the CSV file in param, compute the adjacency list
Expand All @@ -127,11 +139,11 @@ def graphWithAdjListComputed(request):


@pytest.fixture(scope="module",
params=FIXTURE_PARAMS)
params=fixture_params)
def anyGraphWithAdjListComputed(request):
"""
Create a Graph (or DiGraph) obj based on the param, compute the adjacency
list and return it.
Create a Graph (directed or undirected) obj based on the param, compute the
adjacency list and return it.
"""
setFixtureParamNames(request, ["dataset", "managed_mem", "pool_allocator"])
csvFileName = request.param[0]
Expand All @@ -143,11 +155,11 @@ def anyGraphWithAdjListComputed(request):


@pytest.fixture(scope="module",
params=FIXTURE_PARAMS)
params=fixture_params)
def anyGraphWithTransposedAdjListComputed(request):
"""
Create a Graph (or DiGraph) obj based on the param, compute the transposed
adjacency list and return it.
Create a Graph (directed or undirected) obj based on the param, compute the
transposed adjacency list and return it.
"""
setFixtureParamNames(request, ["dataset", "managed_mem", "pool_allocator"])
csvFileName = request.param[0]
Expand All @@ -169,9 +181,9 @@ def bench_create_graph(gpubenchmark, edgelistCreated):
renumber=False)


# Creating DiGraphs on small datasets runs in micro-seconds, which results in
# thousands of rounds before the default threshold is met, so lower the
# max_time for this benchmark.
# Creating directed Graphs on small datasets runs in micro-seconds, which
# results in thousands of rounds before the default threshold is met, so lower
# the max_time for this benchmark.
@pytest.mark.ETL
@pytest.mark.benchmark(
warmup=True,
Expand All @@ -182,7 +194,7 @@ def bench_create_digraph(gpubenchmark, edgelistCreated):
gpubenchmark(cugraph.from_cudf_edgelist,
edgelistCreated,
source="0", destination="1",
create_using=cugraph.structure.graph_classes.DiGraph,
create_using=cugraph.Graph(directed=True),
renumber=False)


Expand Down
Loading