Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve memory footprint and performance of graph creation #3542

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 80 additions & 22 deletions benchmarks/cugraph/standalone/cugraph_graph_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cugraph.testing.mg_utils import (
generate_edgelist_rmat,
get_allocation_counts_dask_persist,
get_allocation_counts_dask_lazy,
sizeof_fmt,
get_peak_output_ratio_across_workers,
restart_client,
Expand All @@ -24,12 +25,15 @@
stop_dask_client,
enable_spilling,
)
from cugraph.structure.symmetrize import symmetrize_ddf
import cugraph
import cudf
from time import sleep
import pandas as pd
import time


@get_allocation_counts_dask_persist(return_allocations=True, logging=False)
@get_allocation_counts_dask_lazy(return_allocations=True, logging=True)
def construct_graph(dask_dataframe, directed=False, renumber=False):
"""
Args:
Expand All @@ -43,28 +47,78 @@ def construct_graph(dask_dataframe, directed=False, renumber=False):
Returns:
G: cugraph.Graph
"""
st = time.time()
G = cugraph.Graph(directed=directed)
G.from_dask_cudf_edgelist(
dask_dataframe, source="src", destination="dst", renumber=renumber
)
return G
et = time.time()
g_creation_time = et - st
print(f"Graph creation time = {g_creation_time} s")
return G, g_creation_time


@get_allocation_counts_dask_persist(return_allocations=True, logging=True)
def symmetrize_cugraph_df(dask_df, multi=False):
output_df = symmetrize_ddf(dask_df, "src", "dst", multi=multi)
return output_df


def benchmark_cugraph_graph_symmetrize(scale, edgefactor, seed, multi):
"""
Benchmark cugraph graph symmetrization
"""
dask_df = generate_edgelist_rmat(
scale=scale, edgefactor=edgefactor, seed=seed, unweighted=True, mg=True
)
dask_df = dask_df.astype("int64")
dask_df = dask_df.reset_index(drop=True)
input_memory = dask_df.memory_usage().sum().compute()
num_input_edges = len(dask_df)
print(f"Number of input edges = {num_input_edges:,}, multi = {multi}")
output_df, allocation_counts = symmetrize_cugraph_df(dask_df, multi=multi)
(
input_to_peak_ratio,
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
) = get_memory_statistics(
allocation_counts=allocation_counts, input_memory=input_memory
)
print(f"Number of edges after symmetrization = {len(output_df):,}")
print("-" * 80)
return (
num_input_edges,
input_to_peak_ratio,
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
)


def benchmark_cugraph_graph_creation(scale, edgefactor, seed, directed, renumber):
"""
Entry point for the benchmark.
"""
dask_df = generate_edgelist_rmat(
scale=scale, edgefactor=edgefactor, seed=seed, unweighted=True, mg=True,
scale=scale,
edgefactor=edgefactor,
seed=seed,
unweighted=True,
mg=True,
)
# We do below to remove the rmat memory overhead
# which holds on to GPU memory
dask_df = dask_df.map_partitions(lambda df: df.to_pandas()).persist()
dask_df = dask_df.map_partitions(cudf.from_pandas)
dask_df = dask_df.astype("int64")
dask_df = dask_df.reset_index(drop=True)
input_memory = dask_df.memory_usage().sum().compute()
num_input_edges = len(dask_df)
print(
f"Number of input edges = {num_input_edges:,}, directed = {directed}, renumber = {renumber}"
)
G, allocation_counts = construct_graph(
(G, g_creation_time), allocation_counts = construct_graph(
dask_df, directed=directed, renumber=renumber
)
(
Expand All @@ -83,6 +137,7 @@ def benchmark_cugraph_graph_creation(scale, edgefactor, seed, directed, renumber
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
g_creation_time,
)


Expand All @@ -109,13 +164,13 @@ def get_memory_statistics(allocation_counts, input_memory):
peak_allocation_across_workers,
)

# call __main__ function

if __name__ == "__main__":
client, cluster = start_dask_client(dask_worker_devices=[1], jit_unspill=False)
enable_spilling()
stats_ls = []
client.run(enable_spilling)
for scale in [22, 23, 24]:
for scale in [23, 24, 25]:
for directed in [True, False]:
for renumber in [True, False]:
try:
Expand All @@ -126,6 +181,7 @@ def get_memory_statistics(allocation_counts, input_memory):
output_to_peak_ratio,
input_memory_per_worker,
peak_allocation_across_workers,
g_creation_time,
) = benchmark_cugraph_graph_creation(
scale=scale,
edgefactor=16,
Expand All @@ -137,35 +193,37 @@ def get_memory_statistics(allocation_counts, input_memory):
stats_d["num_input_edges"] = num_input_edges
stats_d["directed"] = directed
stats_d["renumber"] = renumber
stats_d["input_memory_per_worker"] = sizeof_fmt(input_memory_per_worker)
stats_d["input_memory_per_worker"] = sizeof_fmt(
input_memory_per_worker
)
stats_d["peak_allocation_across_workers"] = sizeof_fmt(
peak_allocation_across_workers
)
stats_d["input_to_peak_ratio"] = input_to_peak_ratio
stats_d["output_to_peak_ratio"] = output_to_peak_ratio
stats_d["g_creation_time"] = g_creation_time
stats_ls.append(stats_d)
except Exception as e:
print(e)
restart_client(client)
sleep(10)

print("-" * 40 + f"renumber completed" + "-" * 40)

stats_df = pd.DataFrame(
stats_df = pd.DataFrame(
stats_ls,
columns=[
"scale",
"num_input_edges",
"directed",
"renumber",
"input_memory_per_worker",
"peak_allocation_across_workers",
"input_to_peak_ratio",
"output_to_peak_ratio",
],
)
stats_df.to_csv("cugraph_graph_creation_stats.csv")
columns=[
"scale",
"num_input_edges",
"directed",
"renumber",
"input_memory_per_worker",
"peak_allocation_across_workers",
"input_to_peak_ratio",
"output_to_peak_ratio",
"g_creation_time",
],
)
stats_df.to_csv("cugraph_graph_creation_stats.csv")
print("-" * 40 + f"scale = {scale} completed" + "-" * 40)

# Cleanup Dask Cluster
stop_dask_client(client, cluster)
5 changes: 2 additions & 3 deletions python/cugraph/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,6 @@


from collections.abc import Sequence

from collections import OrderedDict
from dask_cudf.core import DataFrame as dcDataFrame
from dask_cudf.core import Series as daskSeries
Expand Down Expand Up @@ -101,7 +100,7 @@ def create(cls, data, client=None, batch_enabled=False):
if isinstance(first(data) if multiple else data, (dcDataFrame, daskSeries)):
datatype = "cudf"
else:
raise Exception("Graph data must be dask-cudf dataframe")
raise TypeError("Graph data must be dask-cudf dataframe")

gpu_futures = client.sync(
_extract_partitions, data, client, batch_enabled=batch_enabled
Expand Down
37 changes: 36 additions & 1 deletion python/cugraph/cugraph/dask/common/part_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,41 @@ def persist_distributed_data(dask_df, client):
return parts


def _create_empty_dask_df_future(meta_df, client, worker):
df_future = client.scatter(meta_df.head(0), workers=[worker])
wait(df_future)
return [df_future]


def get_persisted_df_worker_map(dask_df, client):
ddf_keys = futures_of(dask_df)
output_map = {}
for w, w_keys in client.has_what().items():
output_map[w] = [ddf_k for ddf_k in ddf_keys if str(ddf_k.key) in w_keys]
if len(output_map[w]) == 0:
output_map[w] = _create_empty_dask_df_future(dask_df._meta, client, w)
return output_map


def _chunk_lst(ls, num_parts):
return [ls[i::num_parts] for i in range(num_parts)]


def persist_dask_df_equal_parts_per_worker(dask_df, client):
ddf_keys = dask_df.to_delayed()
workers = client.scheduler_info()["workers"].keys()
ddf_keys_ls = _chunk_lst(ddf_keys, len(workers))
persisted_keys = []
for w, ddf_k in zip(workers, ddf_keys_ls):
persisted_keys.extend(
client.persist(ddf_k, workers=w, allow_other_workers=False)
)
dask_df = dask_cudf.from_delayed(persisted_keys, meta=dask_df._meta).persist()
wait(dask_df)
client.rebalance(dask_df)
return dask_df


async def _extract_partitions(dask_obj, client=None, batch_enabled=False):
client = default_client() if client is None else client
worker_list = Comms.get_workers()
Expand Down
Loading