Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support Link Prediction and Negative Sampling in cuGraph-DGL #50

Draft
wants to merge 11 commits into
base: branch-25.02
Choose a base branch
from
215 changes: 207 additions & 8 deletions python/cugraph-dgl/cugraph_dgl/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(

self.__graph = None
self.__vertex_offsets = None
self.__edge_lookup_table = None
self.__handle = None
self.__is_multi_gpu = is_multi_gpu

Expand All @@ -127,6 +128,11 @@ def __init__(
def is_multi_gpu(self):
return self.__is_multi_gpu

def _clear_graph(self):
self.__graph = None
self.__edge_lookup_table = None
self.__vertex_offsets = None

def to_canonical_etype(
self, etype: Union[str, Tuple[str, str, str]]
) -> Tuple[str, str, str]:
Expand All @@ -146,6 +152,20 @@ def to_canonical_etype(

raise ValueError("Unknown relation type " + etype)

def _to_numeric_etype(self, etype: Union[str, Tuple[str, str, str]]) -> int:
if etype is None:
if len(self.canonical_etypes) > 1:
raise ValueError("Edge type is required for heterogeneous graphs.")
return 0

etype = self.to_canonical_etype(etype)
return {
k: i
for i, k in enumerate(
sorted(self.__edge_indices.keys(leaves_only=True, include_nested=True))
)
}[etype]

def add_nodes(
self,
global_num_nodes: int,
Expand Down Expand Up @@ -217,8 +237,7 @@ def add_nodes(
_cast_to_torch_tensor(feature_tensor), **self.__wg_kwargs
)

self.__graph = None
self.__vertex_offsets = None
self._clear_graph()

def __check_node_ids(self, ntype: str, ids: TensorType):
"""
Expand Down Expand Up @@ -309,8 +328,7 @@ def add_edges(

self.__num_edges_dict[dgl_can_edge_type] = int(num_edges)

self.__graph = None
self.__vertex_offsets = None
self._clear_graph()

def num_nodes(self, ntype: Optional[str] = None) -> int:
"""
Expand Down Expand Up @@ -537,7 +555,7 @@ def _graph(
self.__graph["direction"] != direction
or self.__graph["prob_attr"] != prob_attr
):
self.__graph = None
self._clear_graph()

if self.__graph is None:
src_col, dst_col = ("src", "dst") if direction == "out" else ("dst", "src")
Expand Down Expand Up @@ -620,9 +638,6 @@ def _get_n_emb(
)

try:
print(
u,
)
return self.__ndata_storage[ntype, emb_name].fetch(
_cast_to_torch_tensor(u), "cuda"
)
Expand Down Expand Up @@ -895,6 +910,190 @@ def all_edges(
else:
raise ValueError(f"Invalid form {form}")

@property
def _edge_lookup_table(self):
if self.__edge_lookup_table is None:
self.__edge_lookup_table = pylibcugraph.EdgeIdLookupTable(
self._resource_handle,
self._graph("out") if self.__graph is None else self.__graph["graph"],
)

return self.__edge_lookup_table

def find_edges(
self, eid: "torch.Tensor", etype: Union[str, Tuple[str, str, str]] = None
) -> Tuple["torch.Tensor", "torch.Tensor"]:
"""
Looks up and returns the appropriate src/dst pairs given a sequence of edge
ids and an edge type.
"""

# Have to properly de-offset the vertices based on edge type
etype = self.to_canonical_etype(etype)
num_edge_type = self._to_numeric_etype(etype)
out = self._edge_lookup_table.lookup_vertex_ids(
cupy.asarray(eid), num_edge_type
)

src_name = "sources" if self.__graph["direction"] == "out" else "destinations"
dst_name = "destinations" if self.__graph["direction"] == "out" else "sources"
offsets = self._vertex_offsets

return (
torch.as_tensor(out[src_name], device="cuda") - offsets[etype[0]],
torch.as_tensor(out[dst_name], device="cuda") - offsets[etype[2]],
)

def global_uniform_negative_sampling(
self,
num_samples: int,
exclude_self_loops: bool = True,
replace: bool = False,
etype: Optional[Union[str, Tuple[str, str, str]]] = None,
redundancy: Optional[float] = None,
):
"""
Performs negative sampling, which constructs a set of source and destination
pairs that do not exist in this graph.

Parameters
----------
num_samples: int
Target number of negative edges to generate. May generate less depending
on whether the existing set of edges allows for it.
exclude_self_loops: bool
Whether to drop edges where the source and destination is the same.
Defaults to True.
replace: bool
Whether to sample with replacement. Sampling with replacement is not
supported by the cuGraph-DGL generator. Defaults to False.
etype: str or tuple[str, str, str] (Optional)
The edge type to generate negative edges for. Optional if there is
only one edge type in the graph.
redundancy: float (Optional)
Not supported by the cuGraph-DGL generator.
"""

if redundancy:
warnings.warn("The 'redudancy' parameter is ignored by cuGraph-DGL.")
if replace:
raise NotImplementedError(
"Negative sampling with replacement is not supported by cuGraph-DGL."
)

if len(self.ntypes) == 1:
vertices = torch.arange(self.num_nodes())
src_vertex_offset = 0
dst_vertex_offset = 0
src_bias = cupy.ones(len(vertices), dtype="float32")
dst_bias = src_bias
else:
can_edge_type = self.to_canonical_etype(etype)
src_vertex_offset = self._vertex_offsets[can_edge_type[0]]
dst_vertex_offset = self._vertex_offsets[can_edge_type[2]]

# Limit sampled vertices to those of the given edge type.
if can_edge_type[0] == can_edge_type[2]:
vertices = torch.arange(
src_vertex_offset,
src_vertex_offset + self.num_nodes(can_edge_type[0]),
dtype=torch.int64,
device="cuda",
)
src_bias = cupy.ones(self.num_nodes(can_edge_type[0]), dtype="float32")
dst_bias = src_bias

else:
vertices = torch.concat(
[
torch.arange(
src_vertex_offset,
src_vertex_offset + self.num_nodes(can_edge_type[0]),
dtype=torch.int64,
device="cuda",
),
torch.arange(
dst_vertex_offset,
dst_vertex_offset + self.num_nodes(can_edge_type[2]),
dtype=torch.int64,
device="cuda",
),
]
)

src_bias = cupy.concatenate(
[
cupy.ones(self.num_nodes(can_edge_type[0]), dtype="float32"),
cupy.zeros(self.num_nodes(can_edge_type[2]), dtype="float32"),
]
)

dst_bias = cupy.concatenate(
[
cupy.zeros(self.num_nodes(can_edge_type[0]), dtype="float32"),
cupy.ones(self.num_nodes(can_edge_type[2]), dtype="float32"),
]
)

if self.is_multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

num_samples_global = torch.tensor([num_samples], device="cuda")
torch.distributed.all_reduce(
num_samples_global, op=torch.distributed.ReduceOp.SUM
)
num_samples_global = int(num_samples_global)

vertices = torch.tensor_split(vertices, world_size)[rank]

src_bias = cupy.array_split(src_bias, world_size)[rank]
dst_bias = (
src_bias
if can_edge_type[0] == can_edge_type[2]
else cupy.array_split(dst_bias, world_size)[rank]
)
else:
num_samples_global = num_samples

graph = (
self.__graph["graph"]
if self.__graph is not None and self.__graph["direction"] == "out"
else self._graph(
"out", None if self.__graph is None else self.__graph["prob_attr"]
)
)

result_dict = pylibcugraph.negative_sampling(
self._resource_handle,
graph,
num_samples_global,
vertices=cupy.asarray(vertices),
src_bias=src_bias,
dst_bias=dst_bias,
remove_duplicates=True,
remove_false_negatives=True,
exact_number_of_samples=True,
do_expensive_check=False,
)

# TODO remove this workaround once the C API is updated to take a local number
# of negatives (rapidsai/cugraph#4672)
src_neg = (
torch.as_tensor(result_dict["sources"], device="cuda")[:num_samples]
- src_vertex_offset
)
dst_neg = (
torch.as_tensor(result_dict["destinations"], device="cuda")[:num_samples]
- dst_vertex_offset
)

if exclude_self_loops:
f = src_neg != dst_neg
return src_neg[f], dst_neg[f]
else:
return src_neg, dst_neg

@property
def ndata(self) -> HeteroNodeDataView:
"""
Expand Down
70 changes: 70 additions & 0 deletions python/cugraph-dgl/cugraph_dgl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@

import dgl
import torch
import numpy as np

import cugraph_dgl

from cugraph.testing.mg_utils import (
start_dask_client,
stop_dask_client,
)

from cugraph.datasets import karate


@pytest.fixture(scope="module")
def dask_client():
Expand Down Expand Up @@ -66,3 +71,68 @@ def dgl_graph_1():
src = torch.tensor([0, 1, 0, 2, 3, 0, 4, 0, 5, 0, 6, 7, 0, 8, 9])
dst = torch.tensor([1, 9, 2, 9, 9, 4, 9, 5, 9, 6, 9, 9, 8, 9, 0])
return dgl.graph((src, dst))


def create_karate_bipartite(multi_gpu: bool = False):
df = karate.get_edgelist()
df.src = df.src.astype("int64")
df.dst = df.dst.astype("int64")

graph = cugraph_dgl.Graph(is_multi_gpu=multi_gpu)
total_num_nodes = max(df.src.max(), df.dst.max()) + 1

num_nodes_group_1 = total_num_nodes // 2
num_nodes_group_2 = total_num_nodes - num_nodes_group_1

node_x_1 = np.random.random((num_nodes_group_1,))
node_x_2 = np.random.random((num_nodes_group_2,))

if multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

node_x_1 = np.array_split(node_x_1, world_size)[rank]
node_x_2 = np.array_split(node_x_2, world_size)[rank]

graph.add_nodes(num_nodes_group_1, {"x": node_x_1}, "type1")
graph.add_nodes(num_nodes_group_2, {"x": node_x_2}, "type2")

edges = {}
edges["type1", "e1", "type1"] = df[
(df.src < num_nodes_group_1) & (df.dst < num_nodes_group_1)
]
edges["type1", "e2", "type2"] = df[
(df.src < num_nodes_group_1) & (df.dst >= num_nodes_group_1)
]
edges["type2", "e3", "type1"] = df[
(df.src >= num_nodes_group_1) & (df.dst < num_nodes_group_1)
]
edges["type2", "e4", "type2"] = df[
(df.src >= num_nodes_group_1) & (df.dst >= num_nodes_group_1)
]

edges["type1", "e2", "type2"].dst -= num_nodes_group_1
edges["type2", "e3", "type1"].src -= num_nodes_group_1
edges["type2", "e4", "type2"].dst -= num_nodes_group_1
edges["type2", "e4", "type2"].src -= num_nodes_group_1

if multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

edges_local = {
etype: edf.iloc[np.array_split(np.arange(len(edf)), world_size)[rank]]
for etype, edf in edges.items()
}
else:
edges_local = edges

for etype, edf in edges_local.items():
graph.add_edges(edf.src, edf.dst, etype=etype)

return graph, edges, (num_nodes_group_1, num_nodes_group_2)


@pytest.fixture
def karate_bipartite():
return create_karate_bipartite(False)
Loading
Loading