Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] Sample with Offsets in the Bulk Sampler #3524

Merged
merged 64 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
f462ec9
select edge props
alexbarghi-nv Apr 17, 2023
5bab68b
remove unwanted files
alexbarghi-nv Apr 17, 2023
ed462b7
fix style
alexbarghi-nv Apr 17, 2023
8a3bd1e
Merge branch 'branch-23.06' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Apr 19, 2023
f13f2b6
updates to store
alexbarghi-nv Apr 24, 2023
695f493
Merge branch 'branch-23.06' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Apr 24, 2023
8fe9acb
throw exception for unweighted sssp
alexbarghi-nv Apr 24, 2023
4012f4f
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv Apr 24, 2023
7a29d57
add tests for unweighted graphs in C API, update C API implementation…
ChuckHastings Apr 25, 2023
b1f9fa7
Merge branch 'branch-23.06' into capi_handle_default_weights
ChuckHastings Apr 25, 2023
1e8323a
Merge branch 'capi_handle_default_weights' of https://github.com/chuc…
alexbarghi-nv Apr 26, 2023
a44483e
refactor to use new fill_edge_property
ChuckHastings Apr 26, 2023
442e1ce
Rename graph_helper.cu
ChuckHastings Apr 26, 2023
7e347de
need to sort after shuffling
ChuckHastings Apr 26, 2023
8232c13
Merge branch 'branch-23.06' into alex_uns_bug
ChuckHastings Apr 26, 2023
b27b544
Merge branch 'branch-23.06' into alex_uns_bug
ChuckHastings Apr 27, 2023
d2ba15f
Merge branch 'branch-23.06' into capi_handle_default_weights
ChuckHastings Apr 27, 2023
9e451ca
Merge branch 'capi_handle_default_weights' of https://github.com/chuc…
alexbarghi-nv Apr 27, 2023
d34af46
pull in chuck's changes, update sssp tests
alexbarghi-nv Apr 27, 2023
e7bfba4
style
alexbarghi-nv Apr 27, 2023
6b4bf78
Merge branch 'alex_uns_bug' of https://github.com/chuckhastings/cugra…
alexbarghi-nv Apr 27, 2023
90eb5cc
remove unused header. Modify MG egonet test to work properly
ChuckHastings Apr 28, 2023
01d46f7
update cugraph-pyg and cugraph-dgl
alexbarghi-nv Apr 28, 2023
9a30a7e
fix style and copyright
alexbarghi-nv May 1, 2023
c5eb9aa
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 1, 2023
5ef7b1c
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 2, 2023
7ed1861
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv May 2, 2023
5a0124a
Merge branch 'capi_handle_default_weights' of https://github.com/chuc…
alexbarghi-nv May 2, 2023
cc305dd
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 2, 2023
4f732a8
fix pylibcugraph empty weights issue, update tests
alexbarghi-nv May 2, 2023
6fc0e31
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv May 5, 2023
918bdbe
style,copyright
alexbarghi-nv May 5, 2023
b131cc3
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 5, 2023
4c30882
style
alexbarghi-nv May 5, 2023
aa8cca5
style
alexbarghi-nv May 5, 2023
e8b05a5
iterator, print statement fix
alexbarghi-nv May 8, 2023
99e2491
remove egonet prints
alexbarghi-nv May 8, 2023
11a1a62
fix
alexbarghi-nv May 8, 2023
3f23d4d
style fix
alexbarghi-nv May 8, 2023
911517c
remove print
alexbarghi-nv May 8, 2023
c1ddad8
reformat
alexbarghi-nv May 8, 2023
b2d5af2
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 8, 2023
d54cd6a
update docstrings
alexbarghi-nv May 8, 2023
bb35c6c
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 8, 2023
09bd0fa
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 8, 2023
5f52c08
correct error message
alexbarghi-nv May 8, 2023
4cdb783
correct error message
alexbarghi-nv May 8, 2023
df7f0ab
update graph creation with warning when edge id type doesn't match
alexbarghi-nv May 8, 2023
66cebf4
Merge branch 'select-edge-props' of https://github.com/alexbarghi-nv/…
alexbarghi-nv May 8, 2023
fef1184
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 8, 2023
155536f
fix style
alexbarghi-nv May 9, 2023
1bb9271
fix style
alexbarghi-nv May 9, 2023
67b57e8
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv May 9, 2023
ad051c1
remove rank option
alexbarghi-nv May 9, 2023
d34190d
remove explicit gtest, gmock dependencies
alexbarghi-nv May 9, 2023
70632dc
Merge branch 'select-edge-props' of https://github.com/alexbarghi-nv/…
alexbarghi-nv May 9, 2023
bc4b2a1
generate
alexbarghi-nv May 9, 2023
a2bd389
update gtest to 1.13
alexbarghi-nv May 9, 2023
b9202dd
generate
alexbarghi-nv May 9, 2023
abbe83e
fix dependencies
alexbarghi-nv May 9, 2023
4b89269
generate
alexbarghi-nv May 9, 2023
3a0958f
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 9, 2023
56b43d1
Merge branch 'branch-23.06' into sampling-with-offsets
BradReesWork May 9, 2023
438cfeb
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cpp/src/c_api/graph_mg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,19 +338,19 @@ extern "C" cugraph_error_code_t cugraph_mg_graph_create(
weight_type = cugraph_data_type_id_t::FLOAT32;
}

CAPI_EXPECTS((edge_type_ids == nullptr) || (p_edge_ids->type_ == edge_type),
CAPI_EXPECTS((edge_ids == nullptr) || (p_edge_ids->type_ == edge_type),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: Edge id type must match edge (src/dst) type",
*error);

CAPI_EXPECTS((edge_type_ids == nullptr) || (p_edge_type_ids->size_ == p_src->size_),
CAPI_EXPECTS((edge_ids == nullptr) || (p_edge_ids->size_ == p_src->size_),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: src size != edge prop size",
"Invalid input arguments: src size != edge id prop size",
*error);

CAPI_EXPECTS((edge_ids == nullptr) || (p_edge_ids->size_ == p_src->size_),
CAPI_EXPECTS((edge_type_ids == nullptr) || (p_edge_type_ids->size_ == p_src->size_),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: src size != edge prop size",
"Invalid input arguments: src size != edge type prop size",
*error);

cugraph_data_type_id_t edge_type_id_type;
Expand Down
27 changes: 10 additions & 17 deletions cpp/src/c_api/graph_sg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,26 +513,19 @@ extern "C" cugraph_error_code_t cugraph_sg_graph_create(
weight_type = cugraph_data_type_id_t::FLOAT32;
}

// FIXME: The combination of edge_ids != nullptr, edge_type_ids == nullptr
// logically should be valid, but the code will currently break if
// that is that is specified
CAPI_EXPECTS(
(edge_type_ids == nullptr && edge_ids == nullptr) ||
(edge_type_ids != nullptr && edge_ids != nullptr),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: either none or both of edge ids and edge types must be provided.",
*error);
CAPI_EXPECTS((edge_ids == nullptr) || (p_edge_ids->type_ == edge_type),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: Edge id type must match edge (src/dst) type",
*error);

CAPI_EXPECTS(
(edge_type_ids == nullptr && edge_ids == nullptr) || (p_edge_ids->type_ == edge_type),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: Edge id type must match edge (src/dst) type",
*error);
CAPI_EXPECTS((edge_ids == nullptr) || (p_edge_ids->size_ == p_src->size_),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: src size != edge id prop size",
*error);

CAPI_EXPECTS((edge_type_ids == nullptr && edge_ids == nullptr) ||
(p_edge_ids->size_ == p_src->size_ && p_edge_type_ids->size_ == p_dst->size_),
CAPI_EXPECTS((edge_type_ids == nullptr) || (p_edge_type_ids->size_ == p_src->size_),
CUGRAPH_INVALID_INPUT,
"Invalid input arguments: src size != edge prop size",
"Invalid input arguments: src size != edge type prop size",
*error);

cugraph_data_type_id_t edge_type_id_type = cugraph_data_type_id_t::INT32;
Expand Down
3 changes: 0 additions & 3 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,12 @@ def __iter__(self):
output_dir = os.path.join(
self._sampling_output_dir, "epoch_" + str(self.epoch_number)
)
rank = self._rank
bs = BulkSampler(
output_path=output_dir,
batch_size=self._batch_size,
graph=self._cugraph_graph,
batches_per_partition=self._batches_per_partition,
seeds_per_call=self._seeds_per_call,
rank=rank,
fanout_vals=self.graph_sampler._reversed_fanout_vals,
with_replacement=self.graph_sampler.replace,
)
Expand All @@ -226,7 +224,6 @@ def __iter__(self):
batch_df = create_batch_df(self.tensorized_indices_ds)
bs.add_batches(batch_df, start_col_name="start", batch_col_name="batch_id")
bs.flush()
output_dir = output_dir + f"/rank={rank}/"
self.cugraph_dgl_dataset.set_input_files(input_directory=output_dir)
self.epoch_number = self.epoch_number + 1
return super().__iter__()
Expand Down
6 changes: 2 additions & 4 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,6 @@ def __construct_graph(
{
"src": pandas.Series(na_src),
"dst": pandas.Series(na_dst),
"w": pandas.Series(np.zeros(len(na_src))),
"eid": pandas.Series(np.arange(len(na_src))),
"etp": pandas.Series(na_etp),
}
)
Expand All @@ -441,15 +439,15 @@ def __construct_graph(
df,
source="src",
destination="dst",
edge_attr=["w", "eid", "etp"],
edge_type="etp",
)
distributed.get_client().publish_dataset(cugraph_graph=graph)
else:
graph.from_cudf_edgelist(
df,
source="src",
destination="dst",
edge_attr=["w", "eid", "etp"],
edge_type="etp",
)

return graph
Expand Down
46 changes: 24 additions & 22 deletions python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import tempfile

import os
import re

import cupy
import cudf
Expand All @@ -31,6 +32,9 @@


class EXPERIMENTAL__BulkSampleLoader:

__ex_parquet_file = re.compile(r"batch=([0-9]+)\-([0-9]+)\.parquet")

def __init__(
self,
feature_store: CuGraphStore,
Expand All @@ -40,7 +44,6 @@ def __init__(
shuffle=False,
edge_types: Sequence[Tuple[str]] = None,
directory=None,
rank=0,
starting_batch_id=0,
batches_per_partition=100,
# Sampler args
Expand Down Expand Up @@ -84,10 +87,6 @@ def __init__(
The path of the directory to write samples to.
Defaults to a new generated temporary directory.

rank: int (optional, default=0)
The rank of the current worker. Should be provided
when there are multiple workers.

starting_batch_id: int (optional, default=0)
The starting id for each batch. Defaults to 0.
Generally used when loading previously-sampled
Expand All @@ -102,16 +101,16 @@ def __init__(

self.__feature_store = feature_store
self.__graph_store = graph_store
self.__rank = rank
self.__next_batch = starting_batch_id
self.__end_exclusive = starting_batch_id
self.__next_batch = -1
self.__end_exclusive = -1
self.__batches_per_partition = batches_per_partition
self.__starting_batch_id = starting_batch_id

if isinstance(all_indices, int):
# Will be loading from disk
self.__num_batches = all_indices
self.__directory = directory
iter(os.listdir(self.__directory))
return

if batch_size is None or batch_size < 1:
Expand All @@ -123,7 +122,6 @@ def __init__(
batch_size,
self.__directory.name,
self.__graph_store._subgraph(edge_types),
rank=rank,
fanout_vals=num_neighbors,
with_replacement=replace,
batches_per_partition=self.__batches_per_partition,
Expand Down Expand Up @@ -161,33 +159,36 @@ def __init__(
)

bulk_sampler.flush()
self.__input_files = iter(os.listdir(self.__directory.name))

def __next__(self):
# Quit iterating if there are no batches left
if self.__next_batch >= self.__num_batches + self.__starting_batch_id:
raise StopIteration

# Load the next set of sampling results if necessary
if self.__next_batch >= self.__end_exclusive:
if self.__directory is None:
raise StopIteration

# Read the next parquet file into memory
dir_path = (
self.__directory
if isinstance(self.__directory, str)
else self.__directory.name
)
rank_path = os.path.join(dir_path, f"rank={self.__rank}")

file_end_batch_incl = min(
self.__end_exclusive + self.__batches_per_partition - 1,
self.__starting_batch_id + self.__num_batches - 1,
)
# Will raise StopIteration if there are no files left
fname = next(self.__input_files)

m = self.__ex_parquet_file.match(fname)
if m is None:
raise ValueError(f"Invalid parquet filename {fname}")

self.__next_batch, end_inclusive = [int(g) for g in m.groups()]
self.__end_exclusive = end_inclusive + 1

parquet_path = os.path.join(
rank_path,
f"batch={self.__end_exclusive}" f"-{file_end_batch_incl}.parquet",
dir_path,
fname,
)

self.__end_exclusive += self.__batches_per_partition

columns = {
"sources": "int64",
"destinations": "int64",
Expand All @@ -212,6 +213,7 @@ def __next__(self):
if self.__next_batch >= self.__num_batches + self.__starting_batch_id:
# Won't delete a non-temp dir (since it would just be deleting a string)
del self.__directory
self.__directory = None

# Get and return the sampled subgraph
if isinstance(torch_geometric, MissingModule):
Expand Down
7 changes: 4 additions & 3 deletions python/cugraph/cugraph/community/egonet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -132,13 +132,14 @@ def ego_graph(G, n, radius=1, center=True, undirected=None, distance=None):
df = cudf.DataFrame()
df["src"] = source
df["dst"] = destination
df["weight"] = weight
if weight is not None:
df["weight"] = weight

if G.renumbered:
df, src_names = G.unrenumber(df, "src", get_column_names=True)
df, dst_names = G.unrenumber(df, "dst", get_column_names=True)
else:
# FIXME: THe original 'src' and 'dst' are not stored in 'simpleGraph'
# FIXME: The original 'src' and 'dst' are not stored in 'simpleGraph'
src_names = "src"
dst_names = "dst"

Expand Down
5 changes: 4 additions & 1 deletion python/cugraph/cugraph/dask/community/egonet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def convert_to_cudf(cp_arrays):
df = cudf.DataFrame()
df["src"] = cp_src
df["dst"] = cp_dst
df["weight"] = cp_weight
if cp_weight is None:
df["weight"] = None
else:
df["weight"] = cp_weight

offsets = cudf.Series(cp_offsets)

Expand Down
17 changes: 13 additions & 4 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ def convert_to_cudf(cp_arrays, weight_t, with_edge_properties, return_offsets=Fa
df[edge_type_n] = edge_types
df[hop_id_n] = hop_ids

print(
f"sources: {sources}\n"
f"destinations: {destinations}\n"
f"batch: {batch_ids}\n"
f"offset: {offsets}\n"
)

if return_offsets:
offsets_df = cudf.DataFrame(
{
Expand All @@ -141,10 +148,11 @@ def convert_to_cudf(cp_arrays, weight_t, with_edge_properties, return_offsets=Fa
df[dst_n] = cupy_destinations
df[indices_n] = cupy_indices

if weight_t == "int32":
df.indices = df.indices.astype("int32")
elif weight_t == "int64":
df.indices = df.indices.astype("int64")
if cupy_indices is not None:
if weight_t == "int32":
df.indices = df.indices.astype("int32")
elif weight_t == "int64":
df.indices = df.indices.astype("int64")

return df

Expand Down Expand Up @@ -296,6 +304,7 @@ def uniform_neighbor_sample(
List of output GPUs (by rank) corresponding to batch
id labels in the label list. Used to assign each batch
id to a GPU.
Must be in ascending order (i.e. [0, 0, 1, 2]).

random_state: int, optional
Random seed to use when making sampling calls.
Expand Down
11 changes: 5 additions & 6 deletions python/cugraph/cugraph/dask/traversal/sssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import cudf
import dask_cudf
from pylibcugraph import sssp as pylibcugraph_sssp, ResourceHandle
import warnings


def _call_plc_sssp(
Expand Down Expand Up @@ -102,12 +101,12 @@ def sssp(input_graph, source, cutoff=None, check_source=True):

# FIXME: Implement a better way to check if the graph is weighted similar
# to 'simpleGraph'
if len(input_graph.edgelist.edgelist_df.columns) != 3:
warning_msg = (
"'SSSP' requires the input graph to be weighted: Unweighted "
"graphs will not be supported in the next release."
if not input_graph.weighted:
err_msg = (
"'SSSP' requires the input graph to be weighted."
"'BFS' should be used instead of 'SSSP' for unweighted graphs."
)
warnings.warn(warning_msg, PendingDeprecationWarning)
raise ValueError(err_msg)

client = default_client()

Expand Down
Loading