diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py b/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py index 6cabea198f6..2fd7d29bd49 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py @@ -13,7 +13,7 @@ from cugraph_dgl.dataloading.dataset import ( HomogenousBulkSamplerDataset, - HetrogenousBulkSamplerDataset, + HeterogenousBulkSamplerDataset, ) from cugraph_dgl.dataloading.neighbor_sampler import NeighborSampler from cugraph_dgl.dataloading.dataloader import DataLoader diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py index 0480f61807a..f154b096256 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py @@ -21,7 +21,7 @@ from dask.distributed import default_client, Event from cugraph_dgl.dataloading import ( HomogenousBulkSamplerDataset, - HetrogenousBulkSamplerDataset, + HeterogenousBulkSamplerDataset, ) from cugraph_dgl.dataloading.utils.extract_graph_helpers import ( create_cugraph_graph_from_edges_dict, @@ -47,19 +47,20 @@ def __init__( graph_sampler: cugraph_dgl.dataloading.NeighborSampler, sampling_output_dir: str, batches_per_partition: int = 50, - seeds_per_call: int = 400_000, + seeds_per_call: int = 200_000, device: torch.device = None, use_ddp: bool = False, ddp_seed: int = 0, batch_size: int = 1024, drop_last: bool = False, shuffle: bool = False, + sparse_format: str = "coo", **kwargs, ): """ Constructor for CuGraphStorage: ------------------------------- - graph : CuGraphStorage + graph : CuGraphStorage The graph. indices : Tensor or dict[ntype, Tensor] The set of indices. It can either be a tensor of @@ -89,7 +90,12 @@ def __init__( The seed for shuffling the dataset in :class:`torch.utils.data.distributed.DistributedSampler`. Only effective when :attr:`use_ddp` is True. - batch_size: int, + batch_size: int + Batch size. + sparse_format: str, default = "coo" + The sparse format of the emitted sampled graphs. Choose between "csc" + and "coo". When using "csc", the graphs are of type + cugraph_dgl.nn.SparseGraph. kwargs : dict Key-word arguments to be passed to the parent PyTorch :py:class:`torch.utils.data.DataLoader` class. Common arguments are: @@ -123,6 +129,12 @@ def __init__( ... for input_nodes, output_nodes, blocks in dataloader: ... """ + if sparse_format not in ["coo", "csc"]: + raise ValueError( + f"sparse_format must be one of 'coo', 'csc', " + f"but got {sparse_format}." + ) + self.sparse_format = sparse_format self.ddp_seed = ddp_seed self.use_ddp = use_ddp @@ -156,11 +168,12 @@ def __init__( self.cugraph_dgl_dataset = HomogenousBulkSamplerDataset( total_number_of_nodes=graph.total_number_of_nodes, edge_dir=self.graph_sampler.edge_dir, + sparse_format=sparse_format, ) else: etype_id_to_etype_str_dict = {v: k for k, v in graph._etype_id_dict.items()} - self.cugraph_dgl_dataset = HetrogenousBulkSamplerDataset( + self.cugraph_dgl_dataset = HeterogenousBulkSamplerDataset( num_nodes_dict=graph.num_nodes_dict, etype_id_dict=etype_id_to_etype_str_dict, etype_offset_dict=graph._etype_offset_d, @@ -210,14 +223,23 @@ def __iter__(self): output_dir = os.path.join( self._sampling_output_dir, "epoch_" + str(self.epoch_number) ) + kwargs = {} if isinstance(self.cugraph_dgl_dataset, HomogenousBulkSamplerDataset): - deduplicate_sources = True - prior_sources_behavior = "carryover" - renumber = True + kwargs["deduplicate_sources"] = True + kwargs["prior_sources_behavior"] = "carryover" + kwargs["renumber"] = True + + if self.sparse_format == "csc": + kwargs["compression"] = "CSR" + kwargs["compress_per_hop"] = True + # The following kwargs will be deprecated in uniform sampler. + kwargs["use_legacy_names"] = False + kwargs["include_hop_column"] = False + else: - deduplicate_sources = False - prior_sources_behavior = None - renumber = False + kwargs["deduplicate_sources"] = False + kwargs["prior_sources_behavior"] = None + kwargs["renumber"] = False bs = BulkSampler( output_path=output_dir, @@ -227,10 +249,9 @@ def __iter__(self): seeds_per_call=self._seeds_per_call, fanout_vals=self.graph_sampler._reversed_fanout_vals, with_replacement=self.graph_sampler.replace, - deduplicate_sources=deduplicate_sources, - prior_sources_behavior=prior_sources_behavior, - renumber=renumber, + **kwargs, ) + if self.shuffle: self.tensorized_indices_ds.shuffle() diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py index e0d51bcf4cf..815fd30d8eb 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py @@ -19,6 +19,7 @@ from cugraph_dgl.dataloading.utils.sampling_helpers import ( create_homogeneous_sampled_graphs_from_dataframe, create_heterogeneous_sampled_graphs_from_dataframe, + create_homogeneous_sampled_graphs_from_dataframe_csc, ) @@ -33,17 +34,19 @@ def __init__( total_number_of_nodes: int, edge_dir: str, return_type: str = "dgl.Block", + sparse_format: str = "coo", ): if return_type not in ["dgl.Block", "cugraph_dgl.nn.SparseGraph"]: raise ValueError( - "return_type must be either 'dgl.Block' or \ - 'cugraph_dgl.nn.SparseGraph' " + "return_type must be either 'dgl.Block' or " + "'cugraph_dgl.nn.SparseGraph'." ) # TODO: Deprecate `total_number_of_nodes` # as it is no longer needed # in the next release self.total_number_of_nodes = total_number_of_nodes self.edge_dir = edge_dir + self.sparse_format = sparse_format self._current_batch_fn = None self._input_files = None self._return_type = return_type @@ -60,10 +63,20 @@ def __getitem__(self, idx: int): fn, batch_offset = self._batch_to_fn_d[idx] if fn != self._current_batch_fn: - df = _load_sampled_file(dataset_obj=self, fn=fn) - self._current_batches = create_homogeneous_sampled_graphs_from_dataframe( - sampled_df=df, edge_dir=self.edge_dir, return_type=self._return_type - ) + if self.sparse_format == "csc": + df = _load_sampled_file(dataset_obj=self, fn=fn, skip_rename=True) + self._current_batches = ( + create_homogeneous_sampled_graphs_from_dataframe_csc(df) + ) + else: + df = _load_sampled_file(dataset_obj=self, fn=fn) + self._current_batches = ( + create_homogeneous_sampled_graphs_from_dataframe( + sampled_df=df, + edge_dir=self.edge_dir, + return_type=self._return_type, + ) + ) current_offset = idx - batch_offset return self._current_batches[current_offset] @@ -87,7 +100,7 @@ def set_input_files( ) -class HetrogenousBulkSamplerDataset(torch.utils.data.Dataset): +class HeterogenousBulkSamplerDataset(torch.utils.data.Dataset): def __init__( self, num_nodes_dict: Dict[str, int], @@ -141,18 +154,18 @@ def set_input_files( ---------- input_directory: str input_directory which contains all the files that will be - loaded by HetrogenousBulkSamplerDataset + loaded by HeterogenousBulkSamplerDataset input_file_paths: List[str] - File names that will be loaded by the HetrogenousBulkSamplerDataset + File names that will be loaded by the HeterogenousBulkSamplerDataset """ _set_input_files( self, input_directory=input_directory, input_file_paths=input_file_paths ) -def _load_sampled_file(dataset_obj, fn): +def _load_sampled_file(dataset_obj, fn, skip_rename=False): df = cudf.read_parquet(os.path.join(fn)) - if dataset_obj.edge_dir == "in": + if dataset_obj.edge_dir == "in" and not skip_rename: df.rename( columns={"sources": "destinations", "destinations": "sources"}, inplace=True, @@ -181,7 +194,7 @@ def get_batch_to_fn_d(files): def _set_input_files( - dataset_obj: Union[HomogenousBulkSamplerDataset, HetrogenousBulkSamplerDataset], + dataset_obj: Union[HomogenousBulkSamplerDataset, HeterogenousBulkSamplerDataset], input_directory: Optional[str] = None, input_file_paths: Optional[List[str]] = None, ) -> None: diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py index bdac3b1a323..a4f64668348 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py @@ -11,10 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations -from typing import Tuple, Dict, Optional +from typing import List, Tuple, Dict, Optional from collections import defaultdict import cudf +import cupy from cugraph.utilities.utils import import_optional +from cugraph_dgl.nn import SparseGraph dgl = import_optional("dgl") torch = import_optional("torch") @@ -401,3 +403,154 @@ def create_heterogenous_dgl_block_from_tensors_dict( block = dgl.to_block(sampled_graph, dst_nodes=seed_nodes, src_nodes=src_d) block.edata[dgl.EID] = sampled_graph.edata[dgl.EID] return block + + +def _process_sampled_df_csc( + df: cudf.DataFrame, + reverse_hop_id: bool = True, +) -> Tuple[ + Dict[int, Dict[int, Dict[str, torch.Tensor]]], + List[torch.Tensor], + List[List[int, int]], +]: + """ + Convert a dataframe generated by BulkSampler to a dictionary of tensors, to + facilitate MFG creation. The sampled graphs in the dataframe use CSC-format. + + Parameters + ---------- + df: cudf.DataFrame + The output from BulkSampler compressed in CSC format. The dataframe + should be generated with `compression="CSR"` in BulkSampler, + since the sampling routine treats seed nodes as sources. + + reverse_hop_id: bool (default=True) + Reverse hop id. + + Returns + ------- + tensors_dict: dict + A nested dictionary keyed by batch id and hop id. + `tensor_dict[batch_id][hop_id]` holds "minors" and "major_offsets" + values for CSC MFGs. + + renumber_map_list: list + List of renumbering maps for looking up global indices of nodes. One + map for each batch. + + mfg_sizes: list + List of the number of nodes in each message passing layer. For the + k-th hop, mfg_sizes[k] and mfg_sizes[k+1] is the number of sources and + destinations, respectively. + """ + # dropna + major_offsets = df.major_offsets.dropna().values + label_hop_offsets = df.label_hop_offsets.dropna().values + renumber_map_offsets = df.renumber_map_offsets.dropna().values + renumber_map = df.map.dropna().values + minors = df.minors.dropna().values + + n_batches = renumber_map_offsets.size - 1 + n_hops = int((label_hop_offsets.size - 1) / n_batches) + + # make global offsets local + major_offsets -= major_offsets[0] + label_hop_offsets -= label_hop_offsets[0] + renumber_map_offsets -= renumber_map_offsets[0] + + # get the sizes of each adjacency matrix (for MFGs) + mfg_sizes = (label_hop_offsets[1:] - label_hop_offsets[:-1]).reshape( + (n_batches, n_hops) + ) + n_nodes = renumber_map_offsets[1:] - renumber_map_offsets[:-1] + mfg_sizes = cupy.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1))) + if reverse_hop_id: + mfg_sizes = mfg_sizes[:, ::-1] + + tensors_dict = {} + renumber_map_list = [] + for batch_id in range(n_batches): + batch_dict = {} + + for hop_id in range(n_hops): + hop_dict = {} + idx = batch_id * n_hops + hop_id # idx in label_hop_offsets + major_offsets_start = label_hop_offsets[idx].item() + major_offsets_end = label_hop_offsets[idx + 1].item() + minors_start = major_offsets[major_offsets_start].item() + minors_end = major_offsets[major_offsets_end].item() + # Note: minors and major_offsets from BulkSampler are of type int32 + # and int64 respectively. Since pylibcugraphops binding code doesn't + # support distinct node and edge index type, we simply casting both + # to int32 for now. + hop_dict["minors"] = torch.as_tensor( + minors[minors_start:minors_end], device="cuda" + ).int() + hop_dict["major_offsets"] = torch.as_tensor( + major_offsets[major_offsets_start : major_offsets_end + 1] + - major_offsets[major_offsets_start], + device="cuda", + ).int() + if reverse_hop_id: + batch_dict[n_hops - 1 - hop_id] = hop_dict + else: + batch_dict[hop_id] = hop_dict + + tensors_dict[batch_id] = batch_dict + + renumber_map_list.append( + torch.as_tensor( + renumber_map[ + renumber_map_offsets[batch_id] : renumber_map_offsets[batch_id + 1] + ], + device="cuda", + ) + ) + + return tensors_dict, renumber_map_list, mfg_sizes.tolist() + + +def _create_homogeneous_sparse_graphs_from_csc( + tensors_dict: Dict[int, Dict[int, Dict[str, torch.Tensor]]], + renumber_map_list: List[torch.Tensor], + mfg_sizes: List[int, int], +) -> List[List[torch.Tensor, torch.Tensor, List[SparseGraph]]]: + """Create mini-batches of MFGs. The input arguments are the outputs of + the function `_process_sampled_df_csc`. + + Returns + ------- + output: list + A list of mini-batches. Each mini-batch is a list that consists of + `input_nodes` tensor, `output_nodes` tensor and a list of MFGs. + """ + n_batches, n_hops = len(mfg_sizes), len(mfg_sizes[0]) - 1 + output = [] + for b_id in range(n_batches): + output_batch = [] + output_batch.append(renumber_map_list[b_id]) + output_batch.append(renumber_map_list[b_id][: mfg_sizes[b_id][-1]]) + mfgs = [ + SparseGraph( + size=(mfg_sizes[b_id][h_id], mfg_sizes[b_id][h_id + 1]), + src_ids=tensors_dict[b_id][h_id]["minors"], + cdst_ids=tensors_dict[b_id][h_id]["major_offsets"], + formats=["csc"], + reduce_memory=True, + ) + for h_id in range(n_hops) + ] + + output_batch.append(mfgs) + + output.append(output_batch) + + return output + + +def create_homogeneous_sampled_graphs_from_dataframe_csc(sampled_df: cudf.DataFrame): + """Public API to create mini-batches of MFGs using a dataframe output by + BulkSampler, where the sampled graph is compressed in CSC format.""" + return _create_homogeneous_sparse_graphs_from_csc( + *(_process_sampled_df_csc(sampled_df)) + ) diff --git a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py index 307eb33078e..ddd95a76366 100644 --- a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py +++ b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py @@ -248,6 +248,13 @@ def csr(self) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]: value = value[self._perm_csc2csr] return csrc_ids, dst_ids, value + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}(num_src_nodes={self._num_src_nodes}, " + f"num_dst_nodes={self._num_dst_nodes}, " + f"num_edges={self._src_ids.size(0)}, formats={self._formats})" + ) + class BaseConv(torch.nn.Module): r"""An abstract base class for cugraph-ops nn module.""" diff --git a/python/cugraph-dgl/tests/test_utils.py b/python/cugraph-dgl/tests/test_utils.py index 740db59ce7f..4be66758b43 100644 --- a/python/cugraph-dgl/tests/test_utils.py +++ b/python/cugraph-dgl/tests/test_utils.py @@ -22,6 +22,7 @@ create_homogeneous_sampled_graphs_from_dataframe, _get_source_destination_range, _create_homogeneous_cugraph_dgl_nn_sparse_graph, + create_homogeneous_sampled_graphs_from_dataframe_csc, ) from cugraph.utilities.utils import import_optional @@ -50,6 +51,23 @@ def get_dummy_sampled_df(): return df +def get_dummy_sampled_df_csc(): + df_dict = dict( + minors=np.array( + [1, 1, 2, 1, 0, 3, 1, 3, 2, 3, 2, 4, 0, 1, 1, 0, 3, 2], dtype=np.int32 + ), + major_offsets=np.arange(19, dtype=np.int64), + map=np.array( + [26, 29, 33, 22, 23, 32, 18, 29, 33, 33, 8, 30, 32], dtype=np.int32 + ), + renumber_map_offsets=np.array([0, 4, 9, 13], dtype=np.int64), + label_hop_offsets=np.array([0, 1, 3, 6, 7, 9, 13, 14, 16, 18], dtype=np.int64), + ) + + # convert values to Series so that NaNs are padded automatically + return cudf.DataFrame({k: cudf.Series(v) for k, v in df_dict.items()}) + + def test_get_renumber_map(): sampled_df = get_dummy_sampled_df() @@ -176,3 +194,13 @@ def test__create_homogeneous_cugraph_dgl_nn_sparse_graph(): assert sparse_graph.num_src_nodes() == 2 assert sparse_graph.num_dst_nodes() == seednodes_range + 1 assert isinstance(sparse_graph, cugraph_dgl.nn.SparseGraph) + + +def test_create_homogeneous_sampled_graphs_from_dataframe_csc(): + df = get_dummy_sampled_df_csc() + batches = create_homogeneous_sampled_graphs_from_dataframe_csc(df) + + assert len(batches) == 3 + assert torch.equal(batches[0][0], torch.IntTensor([26, 29, 33, 22]).cuda()) + assert torch.equal(batches[1][0], torch.IntTensor([23, 32, 18, 29, 33]).cuda()) + assert torch.equal(batches[2][0], torch.IntTensor([33, 8, 30, 32]).cuda())