From cf0d9dd1e3532305957b0b568de25fd99c4c9d58 Mon Sep 17 00:00:00 2001 From: Muhammed Fatih Balin Date: Thu, 18 Jul 2024 08:39:12 -0400 Subject: [PATCH] [GraphBolt] Add CPUCachedFeature. --- graphbolt/src/cache_policy.cc | 4 +- python/dgl/graphbolt/impl/__init__.py | 1 + .../dgl/graphbolt/impl/cpu_cached_feature.py | 121 ++++++++++++++++++ python/dgl/graphbolt/impl/feature_cache.py | 15 ++- .../graphbolt/impl/test_cpu_cached_feature.py | 95 ++++++++++++++ .../graphbolt/impl/test_feature_cache.py | 2 +- 6 files changed, 233 insertions(+), 5 deletions(-) create mode 100644 python/dgl/graphbolt/impl/cpu_cached_feature.py create mode 100644 tests/python/pytorch/graphbolt/impl/test_cpu_cached_feature.py diff --git a/graphbolt/src/cache_policy.cc b/graphbolt/src/cache_policy.cc index 94324d62e2c8..91e9b582274e 100644 --- a/graphbolt/src/cache_policy.cc +++ b/graphbolt/src/cache_policy.cc @@ -77,7 +77,9 @@ torch::Tensor BaseCachePolicy::ReplaceImpl( const auto pos = pos_optional ? *pos_optional : policy.Insert(key); positions_ptr[i] = pos; TORCH_CHECK( - std::get<1>(position_set.insert(pos)), + // If there are duplicate values and the key was just inserted, + // we do not have to check for the uniqueness of the positions. + pos_optional.has_value() || std::get<1>(position_set.insert(pos)), "Can't insert all, larger cache capacity is needed."); } })); diff --git a/python/dgl/graphbolt/impl/__init__.py b/python/dgl/graphbolt/impl/__init__.py index d952c194ceac..5b92bb83f078 100644 --- a/python/dgl/graphbolt/impl/__init__.py +++ b/python/dgl/graphbolt/impl/__init__.py @@ -14,3 +14,4 @@ from .uniform_negative_sampler import * from .gpu_graph_cache import * from .feature_cache import * +from .cpu_cached_feature import * diff --git a/python/dgl/graphbolt/impl/cpu_cached_feature.py b/python/dgl/graphbolt/impl/cpu_cached_feature.py new file mode 100644 index 000000000000..7dd5bf39306b --- /dev/null +++ b/python/dgl/graphbolt/impl/cpu_cached_feature.py @@ -0,0 +1,121 @@ +"""CPU cached feature for GraphBolt.""" + +import torch + +from ..feature_store import Feature + +from .feature_cache import CPUFeatureCache + +__all__ = ["CPUCachedFeature"] + + +def num_cache_items(cache_capacity_in_bytes, single_item): + """Returns the number of rows to be cached.""" + item_bytes = single_item.nbytes + # Round up so that we never get a size of 0, unless bytes is 0. + return (cache_capacity_in_bytes + item_bytes - 1) // item_bytes + + +class CPUCachedFeature(Feature): + r"""CPU cached feature wrapping a fallback feature. + + Parameters + ---------- + fallback_feature : Feature + The fallback feature. + max_cache_size_in_bytes : int + The capacity of the cache in bytes. + policy: + The cache eviction policy algorithm name. See gb.impl.CPUFeatureCache + for the list of available policies. + pin_memory: + Whether the cache storage should be allocated on system pinned memory. + """ + + def __init__( + self, + fallback_feature: Feature, + max_cache_size_in_bytes: int, + policy: str = None, + pin_memory=False, + ): + super(CPUCachedFeature, self).__init__() + assert isinstance(fallback_feature, Feature), ( + f"The fallback_feature must be an instance of Feature, but got " + f"{type(fallback_feature)}." + ) + self._fallback_feature = fallback_feature + self.max_cache_size_in_bytes = max_cache_size_in_bytes + # Fetching the feature dimension from the underlying feature. + feat0 = fallback_feature.read(torch.tensor([0])) + cache_size = num_cache_items(max_cache_size_in_bytes, feat0) + self._feature = CPUFeatureCache( + (cache_size,) + feat0.shape[1:], + feat0.dtype, + policy=policy, + pin_memory=pin_memory, + ) + + def read(self, ids: torch.Tensor = None): + """Read the feature by index. + + The returned tensor is always in GPU memory, no matter whether the + fallback feature is in memory or on disk. + + Parameters + ---------- + ids : torch.Tensor, optional + The index of the feature. If specified, only the specified indices + of the feature are read. If None, the entire feature is returned. + + Returns + ------- + torch.Tensor + The read feature. + """ + if ids is None: + return self._fallback_feature.read() + values, missing_index, missing_keys = self._feature.query(ids) + missing_values = self._fallback_feature.read(missing_keys) + values[missing_index] = missing_values + self._feature.replace(missing_keys, missing_values) + return values + + def size(self): + """Get the size of the feature. + + Returns + ------- + torch.Size + The size of the feature. + """ + return self._fallback_feature.size() + + def update(self, value: torch.Tensor, ids: torch.Tensor = None): + """Update the feature. + + Parameters + ---------- + value : torch.Tensor + The updated value of the feature. + ids : torch.Tensor, optional + The indices of the feature to update. If specified, only the + specified indices of the feature will be updated. For the feature, + the `ids[i]` row is updated to `value[i]`. So the indices and value + must have the same length. If None, the entire feature will be + updated. + """ + if ids is None: + feat0 = value[:1] + self._fallback_feature.update(value) + cache_size = min( + num_cache_items(self.max_cache_size_in_bytes, feat0), + value.shape[0], + ) + self._feature = None # Destroy the existing cache first. + self._feature = CPUFeatureCache( + (cache_size,) + feat0.shape[1:], feat0.dtype + ) + else: + self._fallback_feature.update(value, ids) + self._feature.replace(ids, value) diff --git a/python/dgl/graphbolt/impl/feature_cache.py b/python/dgl/graphbolt/impl/feature_cache.py index cd6550176d9e..1cdeaaa3867a 100644 --- a/python/dgl/graphbolt/impl/feature_cache.py +++ b/python/dgl/graphbolt/impl/feature_cache.py @@ -1,7 +1,7 @@ """CPU Feature Cache implementation wrapper for graphbolt.""" import torch -__all__ = ["FeatureCache"] +__all__ = ["CPUFeatureCache"] caching_policies = { "s3-fifo": torch.ops.graphbolt.s3_fifo_cache_policy, @@ -11,7 +11,7 @@ } -class FeatureCache(object): +class CPUFeatureCache(object): r"""High level wrapper for the CPU feature cache. Parameters @@ -34,15 +34,24 @@ def __init__( self, cache_shape, dtype, - policy="sieve", + policy=None, num_parts=None, pin_memory=False, ): + if policy is None: + policy = "sieve" assert ( policy in caching_policies ), f"{list(caching_policies.keys())} are the available caching policies." if num_parts is None: num_parts = torch.get_num_threads() + min_num_cache_items = num_parts * (10 if policy == "s3-fifo" else 1) + # Since we partition the cache, each partition needs to have a positive + # number of slots. In addition, each "s3-fifo" partition needs at least + # 10 slots since the small queue is 10% and the small queue needs a + # positive size. + if cache_shape[0] < min_num_cache_items: + cache_shape = (min_num_cache_items,) + cache_shape[1:] self._policy = caching_policies[policy](cache_shape[0], num_parts) self._cache = torch.ops.graphbolt.feature_cache( cache_shape, dtype, pin_memory diff --git a/tests/python/pytorch/graphbolt/impl/test_cpu_cached_feature.py b/tests/python/pytorch/graphbolt/impl/test_cpu_cached_feature.py new file mode 100644 index 000000000000..e8a46edd948d --- /dev/null +++ b/tests/python/pytorch/graphbolt/impl/test_cpu_cached_feature.py @@ -0,0 +1,95 @@ +import backend as F + +import pytest +import torch + +from dgl import graphbolt as gb + + +@pytest.mark.parametrize( + "dtype", + [ + torch.bool, + torch.uint8, + torch.int8, + torch.int16, + torch.int32, + torch.int64, + torch.float16, + torch.bfloat16, + torch.float32, + torch.float64, + ], +) +@pytest.mark.parametrize("policy", ["s3-fifo", "sieve", "lru", "clock"]) +def test_cpu_cached_feature(dtype, policy): + cache_size_a = 32 + cache_size_b = 64 + a = torch.tensor([[1, 2, 3], [4, 5, 6]], dtype=dtype) + b = torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]], dtype=dtype) + + pin_memory = F._default_context_str == "gpu" + + cache_size_a *= a[:1].nbytes + cache_size_b *= b[:1].nbytes + + feat_store_a = gb.CPUCachedFeature( + gb.TorchBasedFeature(a), cache_size_a, policy, pin_memory + ) + feat_store_b = gb.CPUCachedFeature( + gb.TorchBasedFeature(b), cache_size_b, policy, pin_memory + ) + + # Test read the entire feature. + assert torch.equal(feat_store_a.read(), a) + assert torch.equal(feat_store_b.read(), b) + + # Test read with ids. + assert torch.equal( + feat_store_a.read(torch.tensor([0])), + torch.tensor([[1, 2, 3]], dtype=dtype), + ) + assert torch.equal( + feat_store_b.read(torch.tensor([1, 1])), + torch.tensor([[[4, 5], [6, 7]], [[4, 5], [6, 7]]], dtype=dtype), + ) + assert torch.equal( + feat_store_a.read(torch.tensor([1, 1])), + torch.tensor([[4, 5, 6], [4, 5, 6]], dtype=dtype), + ) + assert torch.equal( + feat_store_b.read(torch.tensor([0])), + torch.tensor([[[1, 2], [3, 4]]], dtype=dtype), + ) + # The cache should be full now for the large cache sizes, %100 hit expected. + total_miss = feat_store_a._feature.total_miss + feat_store_a.read(torch.tensor([0, 1])) + assert total_miss == feat_store_a._feature.total_miss + total_miss = feat_store_b._feature.total_miss + feat_store_b.read(torch.tensor([0, 1])) + assert total_miss == feat_store_b._feature.total_miss + + # Test get the size of the entire feature with ids. + assert feat_store_a.size() == torch.Size([3]) + assert feat_store_b.size() == torch.Size([2, 2]) + + # Test update the entire feature. + feat_store_a.update(torch.tensor([[0, 1, 2], [3, 5, 2]], dtype=dtype)) + assert torch.equal( + feat_store_a.read(), + torch.tensor([[0, 1, 2], [3, 5, 2]], dtype=dtype), + ) + + # Test update with ids. + feat_store_a.update( + torch.tensor([[2, 0, 1]], dtype=dtype), + torch.tensor([0]), + ) + assert torch.equal( + feat_store_a.read(), + torch.tensor([[2, 0, 1], [3, 5, 2]], dtype=dtype), + ) + + # Test with different dimensionality + feat_store_a.update(b) + assert torch.equal(feat_store_a.read(), b) diff --git a/tests/python/pytorch/graphbolt/impl/test_feature_cache.py b/tests/python/pytorch/graphbolt/impl/test_feature_cache.py index ac6e15117f52..b9c5eed2c766 100644 --- a/tests/python/pytorch/graphbolt/impl/test_feature_cache.py +++ b/tests/python/pytorch/graphbolt/impl/test_feature_cache.py @@ -29,7 +29,7 @@ def test_feature_cache(dtype, feature_size, num_parts, policy): torch.get_num_threads() if num_parts is None else num_parts ) a = torch.randint(0, 2, [1024, feature_size], dtype=dtype) - cache = gb.impl.FeatureCache( + cache = gb.impl.CPUFeatureCache( (cache_size,) + a.shape[1:], a.dtype, policy, num_parts )