Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moves more MG graph ETL to libcugraph and re-enables MG tests in CI #3941

Merged
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b59c94f
placeholder for re-enabling the mg testing
jnke2016 Oct 17, 2023
d07af7d
propose C API changes
ChuckHastings Nov 7, 2023
c66a50e
new graph creation implementation in C API
ChuckHastings Nov 13, 2023
37278d1
Merge branch 'branch-23.12' into new_graph_creation_methods
ChuckHastings Nov 13, 2023
d470bc6
add proper test for multiple input lists in MG
ChuckHastings Nov 14, 2023
0dbea1a
Merge branch 'branch-23.12' into new_graph_creation_methods
ChuckHastings Nov 14, 2023
67e7383
update branch with the latest changes
jnke2016 Nov 15, 2023
a56b30a
fetch and merge CAPI graph update
jnke2016 Nov 15, 2023
d0bf54c
support isolated vertices for sg graph
jnke2016 Nov 15, 2023
52b3162
add support for dropping self loops and removing multi edges to C API…
ChuckHastings Nov 16, 2023
0cca2cd
Merge branch 'branch-23.12' into new_graph_creation_methods
ChuckHastings Nov 16, 2023
d96ba62
refactor remove_self_loops and sort_and_remove_multi_edges to reduce …
ChuckHastings Nov 17, 2023
f1ab784
add support for isolated vertices, list of edges
jnke2016 Nov 17, 2023
9242207
Merge remote-tracking branch 'upstream/new_graph_creation_methods' in…
jnke2016 Nov 17, 2023
d1b104a
check weights before extracting its type
jnke2016 Nov 17, 2023
dfafeec
remove deprecated parameter 'num_edges'
jnke2016 Nov 17, 2023
e7c0cf9
support list of edges
jnke2016 Nov 17, 2023
a44a1ae
remove debug print
jnke2016 Nov 17, 2023
12bf121
fix style
jnke2016 Nov 17, 2023
0fc84bd
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
jnke2016 Nov 17, 2023
76e5c15
pass list of edgelist to the plc graph creation
jnke2016 Nov 18, 2023
172ea07
update check
jnke2016 Nov 20, 2023
42913b8
update data persistence
jnke2016 Nov 20, 2023
3e233f1
cleanup code and fix bugs
jnke2016 Nov 20, 2023
a612a0a
fix style
jnke2016 Nov 20, 2023
4513d19
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
jnke2016 Nov 20, 2023
cc113db
pass keyword argument to accommodate for the plc graph creation signa…
jnke2016 Nov 20, 2023
0ceeb4f
update doctests
jnke2016 Nov 20, 2023
e757ff9
update doctest examples
jnke2016 Nov 20, 2023
68e76b8
re-enable single gpu dask python tests
jnke2016 Nov 20, 2023
64bb371
fix style
jnke2016 Nov 20, 2023
9915497
update copyright
jnke2016 Nov 20, 2023
08c0d05
update copyright
jnke2016 Nov 20, 2023
6db1050
lower tolerance
jnke2016 Nov 20, 2023
39bded6
fix docstring examples
jnke2016 Nov 20, 2023
129a226
Merge branch 'branch-23.12' into branch-23.12_re-enable-mg-testing
naimnv Nov 20, 2023
e275714
Remove another persist and decrease memory footprint of drop_duplicates
VibhuJawa Nov 21, 2023
6069f3c
decrease memory footprint of drop_duplicates
VibhuJawa Nov 21, 2023
64ec881
decrease memory footprint of drop_duplicates
VibhuJawa Nov 21, 2023
a32fd3d
Revert bulk sampling changes
VibhuJawa Nov 21, 2023
5f7d4e5
Revert bulk sampling changes
VibhuJawa Nov 21, 2023
008e1d0
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
rlratzel Nov 22, 2023
8b3b051
properly handle smaller graphs
jnke2016 Nov 22, 2023
51b5a90
remove extra persist
jnke2016 Nov 22, 2023
c8c0abe
Merge remote-tracking branch 'upstream/branch-23.12_re-enable-mg-test…
jnke2016 Nov 22, 2023
3b3d0c6
fix style
jnke2016 Nov 22, 2023
2d4e5ac
undo changes when resolving merge conflict
jnke2016 Nov 22, 2023
e4db01d
clean up code
jnke2016 Nov 22, 2023
d29c10d
update docstrings
jnke2016 Nov 22, 2023
3a4255d
update docstrings
jnke2016 Nov 22, 2023
c607a58
properly handle list of device arrays and clean up code
jnke2016 Nov 22, 2023
7f869b3
explicitly increase the timeout per worker
jnke2016 Nov 22, 2023
71fb5e5
temporarily lower the timeout value
jnke2016 Nov 22, 2023
9f8b131
fix style and add comment
jnke2016 Nov 22, 2023
912701a
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
jnke2016 Nov 22, 2023
35a6feb
fix style
jnke2016 Nov 22, 2023
fdeaa57
refactor distribution of dask objects across workers
jnke2016 Nov 27, 2023
07bcd2e
fix style
jnke2016 Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pytest \
--cov=cugraph \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/cugraph-coverage.xml" \
--cov-report=term \
-k "not _mg" \
-k "not test_property_graph_mg" \
tests
popd

Expand Down
2 changes: 1 addition & 1 deletion ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ else
DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT="1000s" \
python -m pytest -k "not _mg" ./python/${package_name}/${python_package_name}/tests
python -m pytest ./python/${package_name}/${python_package_name}/tests
fi
3 changes: 2 additions & 1 deletion cpp/include/cugraph_c/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ cugraph_error_code_t cugraph_sg_graph_create(
* Note that setting this flag will arbitrarily select one instance of a multi edge to be the
* edge that survives. If the edges have properties that should be honored (e.g. sum the
weights,
* or take the maximum weight), the caller should do that on not rely on this flag.
* or take the maximum weight), the caller should remove specific edges themselves and not rely
* on this flag.
* @param [in] do_expensive_check If true, do expensive checks to validate the input data
* is consistent with software assumptions. If false bypass these checks.
* @param [out] graph A pointer to the graph object
Expand Down
16 changes: 13 additions & 3 deletions python/cugraph/cugraph/dask/community/leiden.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,19 @@ def leiden(

Examples
--------
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts, modularity_score = cugraph.leiden(G)
>>> import cugraph.dask as dcg
>>> import dask_cudf
>>> # ... Init a DASK Cluster
>>> # see https://docs.rapids.ai/api/cugraph/stable/dask-cugraph.html
>>> # Download dataset from https://github.com/rapidsai/cugraph/datasets/..
>>> chunksize = dcg.get_chunksize(datasets_path / "karate.csv")
>>> ddf = dask_cudf.read_csv(datasets_path / "karate.csv",
... chunksize=chunksize, delimiter=" ",
... names=["src", "dst", "value"],
... dtype=["int32", "int32", "float32"])
>>> dg = cugraph.Graph()
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
>>> parts, modularity_score = dcg.leiden(dg)

"""

Expand Down
16 changes: 13 additions & 3 deletions python/cugraph/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,19 @@ def louvain(

Examples
--------
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts = cugraph.louvain(G)
>>> import cugraph.dask as dcg
>>> import dask_cudf
>>> # ... Init a DASK Cluster
>>> # see https://docs.rapids.ai/api/cugraph/stable/dask-cugraph.html
>>> # Download dataset from https://github.com/rapidsai/cugraph/datasets/..
>>> chunksize = dcg.get_chunksize(datasets_path / "karate.csv")
>>> ddf = dask_cudf.read_csv(datasets_path / "karate.csv",
... chunksize=chunksize, delimiter=" ",
... names=["src", "dst", "value"],
... dtype=["int32", "int32", "float32"])
>>> dg = cugraph.Graph()
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
>>> parts, modularity_score = dcg.louvain(dg)

"""

Expand Down
17 changes: 13 additions & 4 deletions python/cugraph/cugraph/dask/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
)

import cugraph.dask.comms.comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
from cugraph.exceptions import FailedToConvergeError


Expand Down Expand Up @@ -352,7 +354,14 @@ def pagerank(
personalization, npartitions=len(Comms.get_workers())
)

data_prsztn = get_distributed_data(personalization_ddf)
data_prsztn = persist_dask_df_equal_parts_per_worker(
personalization_ddf, client, return_type="dict"
)

empty_df = cudf.DataFrame(columns=list(personalization_ddf.columns))
empty_df = empty_df.astype(
dict(zip(personalization_ddf.columns, personalization_ddf.dtypes))
)

result = [
client.submit(
Expand All @@ -361,7 +370,7 @@ def pagerank(
input_graph._plc_graph[w],
precomputed_vertex_out_weight_vertices,
precomputed_vertex_out_weight_sums,
data_personalization[0],
data_personalization[0] if data_personalization else empty_df,
initial_guess_vertices,
initial_guess_values,
alpha,
Expand All @@ -372,7 +381,7 @@ def pagerank(
workers=[w],
allow_other_workers=False,
)
for w, data_personalization in data_prsztn.worker_to_parts.items()
for w, data_personalization in data_prsztn.items()
]
else:
result = [
Expand Down
15 changes: 11 additions & 4 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from pylibcugraph import ResourceHandle, bfs as pylibcugraph_bfs

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
import cugraph.dask.comms.comms as Comms
import cudf
import dask_cudf
Expand Down Expand Up @@ -159,8 +161,13 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
tmp_col_names = None

start = input_graph.lookup_internal_vertex_id(start, tmp_col_names)
vertex_dtype = start.dtype # if the edgelist was renumbered, update
# the vertex type accordingly

data_start = persist_dask_df_equal_parts_per_worker(
start, client, return_type="dict"
)

data_start = get_distributed_data(start)
do_expensive_check = False
# FIXME: Why is 'direction_optimizing' not part of the python cugraph API
# and why is it set to 'False' by default
Expand All @@ -171,15 +178,15 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
_call_plc_bfs,
Comms.get_session_id(),
input_graph._plc_graph[w],
st[0],
st[0] if st else cudf.Series(dtype=vertex_dtype),
depth_limit,
direction_optimizing,
return_distances,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w, st in data_start.worker_to_parts.items()
for w, st in data_start.items()
]

wait(cupy_result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from cugraph.structure.symmetrize import symmetrize
from cugraph.dask.common.part_utils import (
get_persisted_df_worker_map,
get_length_of_parts,
persist_dask_df_equal_parts_per_worker,
_chunk_lst,
)
from cugraph.dask import get_n_workers
import cugraph.dask.comms.comms as Comms
Expand Down Expand Up @@ -81,6 +81,10 @@ def __init__(self, properties):
self.destination_columns = None
self.weight_column = None
self.vertex_columns = None
self.vertex_type = None
self.weight_type = None
self.edge_id_type = None
self.edge_type_id_type = None

def _make_plc_graph(
sID,
Expand All @@ -89,51 +93,69 @@ def _make_plc_graph(
src_col_name,
dst_col_name,
store_transposed,
num_edges,
vertex_type,
weight_type,
edge_id_type,
edge_type_id,
):

weights = None
edge_ids = None
edge_types = None

if simpleDistributedGraphImpl.edgeWeightCol in edata_x[0]:
weights = _get_column_from_ls_dfs(
edata_x, simpleDistributedGraphImpl.edgeWeightCol
jnke2016 marked this conversation as resolved.
Show resolved Hide resolved
)
if weights.dtype == "int32":
weights = weights.astype("float32")
elif weights.dtype == "int64":
weights = weights.astype("float64")

if simpleDistributedGraphImpl.edgeIdCol in edata_x[0]:
edge_ids = _get_column_from_ls_dfs(
edata_x, simpleDistributedGraphImpl.edgeIdCol
)
if edata_x[0][src_col_name].dtype == "int64" and edge_ids.dtype != "int64":
edge_ids = edge_ids.astype("int64")
num_arrays = len(edata_x)
if weight_type is not None:
weights = [
edata_x[i][simpleDistributedGraphImpl.edgeWeightCol]
for i in range(num_arrays)
]
if weight_type == "int32":
weights = [w_array.astype("float32") for w_array in weights]
elif weight_type == "int64":
weights = [w_array.astype("float64") for w_array in weights]

if edge_id_type is not None:
edge_ids = [
edata_x[i][simpleDistributedGraphImpl.edgeIdCol]
for i in range(num_arrays)
]
if vertex_type == "int64" and edge_id_type != "int64":
edge_ids = [e_id_array.astype("int64") for e_id_array in edge_ids]
warnings.warn(
f"Vertex type is int64 but edge id type is {edge_ids.dtype}"
f"Vertex type is int64 but edge id type is {edge_ids[0].dtype}"
", automatically casting edge id type to int64. "
"This may cause extra memory usage. Consider passing"
" a int64 list of edge ids instead."
)
if simpleDistributedGraphImpl.edgeTypeCol in edata_x[0]:
edge_types = _get_column_from_ls_dfs(
edata_x, simpleDistributedGraphImpl.edgeTypeCol
)
if edge_type_id is not None:
edge_types = [
edata_x[i][simpleDistributedGraphImpl.edgeTypeCol]
for i in range(num_arrays)
]

return MGGraph(
src_array = [edata_x[i][src_col_name] for i in range(num_arrays)]
dst_array = [edata_x[i][dst_col_name] for i in range(num_arrays)]
plc_graph = MGGraph(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
graph_properties=graph_props,
src_array=_get_column_from_ls_dfs(edata_x, src_col_name),
dst_array=_get_column_from_ls_dfs(edata_x, dst_col_name),
weight_array=weights,
edge_id_array=edge_ids,
edge_type_array=edge_types,
src_array=src_array if src_array else cudf.Series(dtype=vertex_type),
dst_array=dst_array if dst_array else cudf.Series(dtype=vertex_type),
weight_array=weights
if weights
else ([cudf.Series(dtype=weight_type)] if weight_type else None),
edge_id_array=edge_ids
if edge_ids
else ([cudf.Series(dtype=edge_id_type)] if edge_id_type else None),
edge_type_array=edge_types
if edge_types
else ([cudf.Series(dtype=edge_type_id)] if edge_type_id else None),
num_arrays=num_arrays,
store_transposed=store_transposed,
num_edges=num_edges,
do_expensive_check=False,
)
del edata_x
gc.collect()

return plc_graph

# Functions
def __from_edgelist(
Expand Down Expand Up @@ -182,7 +204,6 @@ def __from_edgelist(
workers = _client.scheduler_info()["workers"]
# Repartition to 2 partitions per GPU for memory efficient process
input_ddf = input_ddf.repartition(npartitions=len(workers) * 2)
input_ddf = input_ddf.map_partitions(lambda df: df.copy())
# The dataframe will be symmetrized iff the graph is undirected
# otherwise, the inital dataframe will be returned
if edge_attr is not None:
Expand Down Expand Up @@ -314,19 +335,25 @@ def __from_edgelist(
dst_col_name = self.renumber_map.renumbered_dst_col_name

ddf = self.edgelist.edgelist_df

# Get the edgelist dtypes
self.vertex_type = ddf[src_col_name].dtype
if simpleDistributedGraphImpl.edgeWeightCol in ddf.columns:
self.weight_type = ddf[simpleDistributedGraphImpl.edgeWeightCol].dtype
if simpleDistributedGraphImpl.edgeIdCol in ddf.columns:
self.edge_id_type = ddf[simpleDistributedGraphImpl.edgeIdCol].dtype
if simpleDistributedGraphImpl.edgeTypeCol in ddf.columns:
self.edge_type_id_type = ddf[simpleDistributedGraphImpl.edgeTypeCol].dtype

graph_props = GraphProperties(
is_multigraph=self.properties.multi_edge,
is_symmetric=not self.properties.directed,
)
ddf = ddf.repartition(npartitions=len(workers) * 2)
persisted_keys_d = persist_dask_df_equal_parts_per_worker(
ddf, _client, return_type="dict"
)
del ddf
length_of_parts = get_length_of_parts(persisted_keys_d, _client)
num_edges = sum(
[item for sublist in length_of_parts.values() for item in sublist]
)
ddf_keys = ddf.to_delayed()
workers = _client.scheduler_info()["workers"].keys()
ddf_keys_ls = _chunk_lst(ddf_keys, len(workers))

delayed_tasks_d = {
w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
Comms.get_session_id(),
Expand All @@ -335,9 +362,12 @@ def __from_edgelist(
src_col_name,
dst_col_name,
store_transposed,
num_edges,
self.vertex_type,
self.weight_type,
self.edge_id_type,
self.edge_type_id_type,
)
for w, edata in persisted_keys_d.items()
for w, edata in zip(workers, ddf_keys_ls)
}
self._plc_graph = {
w: _client.compute(
Expand All @@ -346,8 +376,9 @@ def __from_edgelist(
for w, delayed_task in delayed_tasks_d.items()
}
wait(list(self._plc_graph.values()))
del persisted_keys_d
del ddf_keys
del delayed_tasks_d
gc.collect()
_client.run(gc.collect)

@property
Expand Down Expand Up @@ -1189,18 +1220,3 @@ def vertex_column_size(self):
@property
def _npartitions(self) -> int:
return len(self._plc_graph)


def _get_column_from_ls_dfs(lst_df, col_name):
"""
This function concatenates the column
and drops it from the input list
"""
len_df = sum([len(df) for df in lst_df])
if len_df == 0:
return lst_df[0][col_name]
output_col = cudf.concat([df[col_name] for df in lst_df], ignore_index=True)
for df in lst_df:
df.drop(columns=[col_name], inplace=True)
gc.collect()
return output_col
6 changes: 2 additions & 4 deletions python/cugraph/cugraph/structure/symmetrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,8 @@ def _memory_efficient_drop_duplicates(ddf, vertex_col_name, num_workers):
Drop duplicate edges from the input dataframe.
"""
# drop duplicates has a 5x+ overhead
# and does not seem to be working as expected
# TODO: Triage an MRE
ddf = ddf.reset_index(drop=True).repartition(npartitions=num_workers * 2)
ddf = ddf.groupby(by=[*vertex_col_name], as_index=False).min(
split_out=num_workers * 2
ddf = ddf.drop_duplicates(
subset=[*vertex_col_name], ignore_index=True, split_out=num_workers * 2
)
return ddf
2 changes: 2 additions & 0 deletions python/cugraph/cugraph/testing/mg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def start_dask_client(
rmm_pool_size=None,
dask_worker_devices=None,
jit_unspill=False,
worker_class=None,
device_memory_limit=0.8,
):
"""
Expand Down Expand Up @@ -141,6 +142,7 @@ def start_dask_client(
rmm_async=rmm_async,
CUDA_VISIBLE_DEVICES=dask_worker_devices,
jit_unspill=jit_unspill,
worker_class=worker_class,
device_memory_limit=device_memory_limit,
)
client = Client(cluster)
Expand Down
Loading