-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate renumbering and compression to cugraph-dgl
to accelerate MFG creation
#3887
Changes from all commits
f0e9f1f
3b1fd23
67f4d7b
8f521d2
da3da9b
0b87ee1
9b5950b
5fbb177
b9611ab
c3ee02b
04c9105
bdc840c
8c304b3
b16a071
09a38d7
82ad8e4
d99b512
c15d580
9135629
dfd1cb7
6dfd4fe
7d5821f
58189ed
39db98a
98c8e0a
c151f95
fc5a4f0
094aaf9
cf57a6d
2b48b7e
79acc8e
0481bfb
2af9333
23cd2c2
6eaf67e
4dc0a92
2947b33
0a2b2b7
db35940
b8b72be
38dd11e
c86ceac
37a37bf
002fe93
5051dfc
6cdf92b
6682cb4
0d12a28
f5733f2
52e2f57
befeb25
8781612
f92b5f5
2bd93d9
3195298
74195cb
374b103
bd625e3
b2a4ed1
9686ae3
9fb7438
2ade9c3
c770a17
b569563
ab2a185
89a1b33
a9d46ef
17e9013
c5543b2
16e83bc
1e7098d
ae94c35
7beba4b
16ed5ef
79e3cef
fd5cceb
a47691d
195d063
0af1750
d65632c
247d8d2
72bebc2
4d51751
9dfa3fa
10c8c1f
08cf3e1
358875f
1b0cc1f
eb3aadc
a124964
6990c23
920bed7
f8df56f
5238c81
ef2ec5b
8e22ab9
13bdd43
77a5ba3
757f385
22217dc
7f838ae
6531e14
3a6b6b9
564ddb4
9e73617
e9c8bbb
3620321
45f93f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,10 +11,12 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from __future__ import annotations | ||
from typing import Tuple, Dict, Optional | ||
from typing import List, Tuple, Dict, Optional | ||
from collections import defaultdict | ||
import cudf | ||
import cupy | ||
from cugraph.utilities.utils import import_optional | ||
from cugraph_dgl.nn import SparseGraph | ||
|
||
dgl = import_optional("dgl") | ||
torch = import_optional("torch") | ||
|
@@ -401,3 +403,154 @@ def create_heterogenous_dgl_block_from_tensors_dict( | |
block = dgl.to_block(sampled_graph, dst_nodes=seed_nodes, src_nodes=src_d) | ||
block.edata[dgl.EID] = sampled_graph.edata[dgl.EID] | ||
return block | ||
|
||
|
||
def _process_sampled_df_csc( | ||
df: cudf.DataFrame, | ||
reverse_hop_id: bool = True, | ||
) -> Tuple[ | ||
Dict[int, Dict[int, Dict[str, torch.Tensor]]], | ||
List[torch.Tensor], | ||
List[List[int, int]], | ||
]: | ||
""" | ||
Convert a dataframe generated by BulkSampler to a dictionary of tensors, to | ||
facilitate MFG creation. The sampled graphs in the dataframe use CSC-format. | ||
|
||
Parameters | ||
---------- | ||
df: cudf.DataFrame | ||
The output from BulkSampler compressed in CSC format. The dataframe | ||
should be generated with `compression="CSR"` in BulkSampler, | ||
since the sampling routine treats seed nodes as sources. | ||
|
||
reverse_hop_id: bool (default=True) | ||
Reverse hop id. | ||
|
||
Returns | ||
------- | ||
tensors_dict: dict | ||
A nested dictionary keyed by batch id and hop id. | ||
`tensor_dict[batch_id][hop_id]` holds "minors" and "major_offsets" | ||
values for CSC MFGs. | ||
|
||
renumber_map_list: list | ||
List of renumbering maps for looking up global indices of nodes. One | ||
map for each batch. | ||
|
||
mfg_sizes: list | ||
List of the number of nodes in each message passing layer. For the | ||
k-th hop, mfg_sizes[k] and mfg_sizes[k+1] is the number of sources and | ||
destinations, respectively. | ||
""" | ||
# dropna | ||
major_offsets = df.major_offsets.dropna().values | ||
label_hop_offsets = df.label_hop_offsets.dropna().values | ||
renumber_map_offsets = df.renumber_map_offsets.dropna().values | ||
renumber_map = df.map.dropna().values | ||
minors = df.minors.dropna().values | ||
Comment on lines
+447
to
+451
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this assumes that the length of the renumber_map is smaller than major_offsets. I will check this again if possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we are making any assumptions here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, My bad I just re-read the code below and I think we should be fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For each batch:
|
||
|
||
n_batches = renumber_map_offsets.size - 1 | ||
n_hops = int((label_hop_offsets.size - 1) / n_batches) | ||
|
||
# make global offsets local | ||
major_offsets -= major_offsets[0] | ||
label_hop_offsets -= label_hop_offsets[0] | ||
renumber_map_offsets -= renumber_map_offsets[0] | ||
|
||
# get the sizes of each adjacency matrix (for MFGs) | ||
mfg_sizes = (label_hop_offsets[1:] - label_hop_offsets[:-1]).reshape( | ||
(n_batches, n_hops) | ||
) | ||
n_nodes = renumber_map_offsets[1:] - renumber_map_offsets[:-1] | ||
mfg_sizes = cupy.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1))) | ||
if reverse_hop_id: | ||
mfg_sizes = mfg_sizes[:, ::-1] | ||
|
||
tensors_dict = {} | ||
renumber_map_list = [] | ||
for batch_id in range(n_batches): | ||
batch_dict = {} | ||
|
||
for hop_id in range(n_hops): | ||
hop_dict = {} | ||
idx = batch_id * n_hops + hop_id # idx in label_hop_offsets | ||
major_offsets_start = label_hop_offsets[idx].item() | ||
major_offsets_end = label_hop_offsets[idx + 1].item() | ||
minors_start = major_offsets[major_offsets_start].item() | ||
minors_end = major_offsets[major_offsets_end].item() | ||
# Note: minors and major_offsets from BulkSampler are of type int32 | ||
# and int64 respectively. Since pylibcugraphops binding code doesn't | ||
# support distinct node and edge index type, we simply casting both | ||
# to int32 for now. | ||
hop_dict["minors"] = torch.as_tensor( | ||
minors[minors_start:minors_end], device="cuda" | ||
).int() | ||
hop_dict["major_offsets"] = torch.as_tensor( | ||
major_offsets[major_offsets_start : major_offsets_end + 1] | ||
- major_offsets[major_offsets_start], | ||
device="cuda", | ||
).int() | ||
if reverse_hop_id: | ||
batch_dict[n_hops - 1 - hop_id] = hop_dict | ||
else: | ||
batch_dict[hop_id] = hop_dict | ||
|
||
tensors_dict[batch_id] = batch_dict | ||
|
||
renumber_map_list.append( | ||
torch.as_tensor( | ||
renumber_map[ | ||
renumber_map_offsets[batch_id] : renumber_map_offsets[batch_id + 1] | ||
], | ||
device="cuda", | ||
) | ||
) | ||
|
||
return tensors_dict, renumber_map_list, mfg_sizes.tolist() | ||
|
||
|
||
def _create_homogeneous_sparse_graphs_from_csc( | ||
tensors_dict: Dict[int, Dict[int, Dict[str, torch.Tensor]]], | ||
renumber_map_list: List[torch.Tensor], | ||
mfg_sizes: List[int, int], | ||
) -> List[List[torch.Tensor, torch.Tensor, List[SparseGraph]]]: | ||
"""Create mini-batches of MFGs. The input arguments are the outputs of | ||
the function `_process_sampled_df_csc`. | ||
|
||
Returns | ||
------- | ||
output: list | ||
A list of mini-batches. Each mini-batch is a list that consists of | ||
`input_nodes` tensor, `output_nodes` tensor and a list of MFGs. | ||
""" | ||
n_batches, n_hops = len(mfg_sizes), len(mfg_sizes[0]) - 1 | ||
output = [] | ||
for b_id in range(n_batches): | ||
output_batch = [] | ||
output_batch.append(renumber_map_list[b_id]) | ||
output_batch.append(renumber_map_list[b_id][: mfg_sizes[b_id][-1]]) | ||
mfgs = [ | ||
SparseGraph( | ||
size=(mfg_sizes[b_id][h_id], mfg_sizes[b_id][h_id + 1]), | ||
src_ids=tensors_dict[b_id][h_id]["minors"], | ||
cdst_ids=tensors_dict[b_id][h_id]["major_offsets"], | ||
formats=["csc"], | ||
reduce_memory=True, | ||
) | ||
for h_id in range(n_hops) | ||
] | ||
|
||
output_batch.append(mfgs) | ||
|
||
output.append(output_batch) | ||
|
||
return output | ||
|
||
|
||
def create_homogeneous_sampled_graphs_from_dataframe_csc(sampled_df: cudf.DataFrame): | ||
"""Public API to create mini-batches of MFGs using a dataframe output by | ||
BulkSampler, where the sampled graph is compressed in CSC format.""" | ||
return _create_homogeneous_sparse_graphs_from_csc( | ||
*(_process_sampled_df_csc(sampled_df)) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tingyu66 , Can we also change
seeds_per_call
to100_000
to make a better default based on your testing ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change it to the default value of BulkSampler:
200_000
? After our call the other day, I tested a wide range ofseeds_per_call
values and none of the runs threw a OOM error.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, if it just works we can probably set the default to None and let upstream handle it ? What do you think, any default which is reasonable and just works is fine by me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the value to
200_000
in 45f93f2 to align with BulkSampler. Did not set to None to avoid the extra step of handling None case.