Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COH-29986 - Implement missing apis - add_index and remove_index in Python client #139

Merged
merged 8 commits into from
Apr 29, 2024
46 changes: 46 additions & 0 deletions src/coherence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
from .aggregator import AverageAggregator, EntryAggregator, PriorityAggregator, SumAggregator
from .comparator import Comparator
from .event import MapLifecycleEvent, MapListener, SessionLifecycleEvent
from .extractor import ValueExtractor
from .filter import Filter
from .messages_pb2 import PageRequest # type: ignore
from .processor import EntryProcessor
from .serialization import Serializer, SerializerRegistry
from .services_pb2_grpc import NamedCacheServiceStub
from .util import RequestFactory

E = TypeVar("E")
K = TypeVar("K")
V = TypeVar("V")
R = TypeVar("R")
Expand Down Expand Up @@ -456,6 +458,34 @@ def entries(
:return: an AsyncIterator of MapEntry instances that satisfy the specified criteria
"""

@abc.abstractmethod
def add_index(
self, extractor: ValueExtractor[T, E], ordered: bool = False, comparator: Optional[Comparator] = None
) -> None:
"""
Add an index to this map.

:param extractor: The :class: `coherence.extractor.ValueExtractor` object that is used to extract
an indexable Object from a value stored in the
indexed Map. Must not be 'None'.
:param ordered: true if the contents of the indexed information
should be ordered false otherwise.
:param comparator: The :class: `coherence.comparator.Comparator` object which imposes an ordering
on entries in the indexed map or None if the
entries' values natural ordering should be used.
"""

@abc.abstractmethod
def remove_index(self, extractor: ValueExtractor[T, E]) -> None:
"""
Removes an index on this `NamedMap`.

:param extractor: The :class: `coherence.extractor.ValueExtractor` object that is used to extract
an indexable Object from a value stored in the
indexed Map. Must not be 'None'.

"""


class NamedCache(NamedMap[K, V]):
"""
Expand Down Expand Up @@ -743,6 +773,22 @@ async def remove_map_listener(self, listener: MapListener[K, V], listener_for: O
else:
await self._events_manager._remove_key_listener(listener, listener_for)

@_pre_call_cache
dhirupandey marked this conversation as resolved.
Show resolved Hide resolved
async def add_index(
self, extractor: ValueExtractor[T, E], ordered: bool = False, comparator: Optional[Comparator] = None
) -> None:
if extractor is None:
raise ValueError("A ValueExtractor must be specified")
r = self._request_factory.add_index_request(extractor, ordered, comparator)
await self._client_stub.addIndex(r)

@_pre_call_cache
async def remove_index(self, extractor: ValueExtractor[T, E]) -> None:
if extractor is None:
raise ValueError("A ValueExtractor must be specified")
r = self._request_factory.remove_index_request(extractor)
await self._client_stub.removeIndex(r)

def _setup_event_handlers(self) -> None:
"""
Setup handlers to notify cache-level handlers of events.
Expand Down
29 changes: 29 additions & 0 deletions src/coherence/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

from .aggregator import EntryAggregator
from .comparator import Comparator
from .extractor import ValueExtractor
from .filter import Filter, Filters, MapEventFilter
from .messages_pb2 import ( # type: ignore
AddIndexRequest,
AggregateRequest,
ClearRequest,
ContainsKeyRequest,
Expand All @@ -29,6 +31,7 @@
PutAllRequest,
PutIfAbsentRequest,
PutRequest,
RemoveIndexRequest,
RemoveMappingRequest,
RemoveRequest,
ReplaceMappingRequest,
Expand Down Expand Up @@ -352,3 +355,29 @@ def __generate_next_request_id(self, prefix: str) -> str:
"""Generates a prefix map-specific prefix when starting a MapEvent gRPC stream."""
self.__next_request_id += 1
return prefix + self.__uidPrefix + str(self.__next_request_id)

def add_index_request(
self, extractor: ValueExtractor[T, E], ordered: bool = False, comparator: Optional[Comparator] = None
) -> AddIndexRequest:
r = AddIndexRequest(
scope=self._scope,
cache=self._cache_name,
format=self._serializer.format,
extractor=self._serializer.serialize(extractor),
)
r.sorted = ordered

if comparator is not None:
r.comparator = self._serializer.serialize(comparator)

return r

def remove_index_request(self, extractor: ValueExtractor[T, E]) -> RemoveIndexRequest:
r = RemoveIndexRequest(
scope=self._scope,
cache=self._cache_name,
format=self._serializer.format,
extractor=self._serializer.serialize(extractor),
)

return r
45 changes: 43 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import pytest_asyncio

import tests
from coherence import Filters, MapEntry, NamedCache, Session
from coherence import Aggregators, Filters, MapEntry, NamedCache, Session
from coherence.event import MapLifecycleEvent
from coherence.extractor import ChainedExtractor, UniversalExtractor
from coherence.extractor import ChainedExtractor, Extractors, UniversalExtractor
from coherence.processor import ExtractorProcessor
from tests.address import Address
from tests.person import Person
Expand Down Expand Up @@ -562,3 +562,44 @@ def callback(n: str) -> None:
assert not cache.active
finally:
await session.close()


# noinspection PyShadowingNames,DuplicatedCode,PyUnresolvedReferences
@pytest.mark.asyncio
async def test_add_remove_index(setup_and_teardown_person_cache: NamedCache[str, Person]) -> None:
cache: NamedCache[str, Person] = setup_and_teardown_person_cache

await cache.add_index(Extractors.extract("age"))
result = await cache.aggregate(Aggregators.record(), None, Filters.greater("age", 25))
# print(result)
# {'@class': 'util.SimpleQueryRecord', 'results': [{'@class': 'util.SimpleQueryRecord.PartialResult',
# 'partitionSet': {'@class': 'net.partition.PartitionSet', 'bits': [2147483647], 'markedCount': -1,
# 'partitionCount': 31, 'tailMask': 2147483647}, 'steps': [{'@class': 'util.SimpleQueryRecord.PartialResult.Step',
# 'efficiency': 5, 'filter': 'GreaterFilter(.age, 25)',
# 'indexLookupRecords': [{'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord',
# 'bytes': 6839, 'distinctValues': 5, 'extractor': '.age', 'index': 'Partitioned: Footprint=6.67KB, Size=5',
# 'indexDesc': 'Partitioned: ', 'ordered': False}], 'keySetSizePost': 0, 'keySetSizePre': 7, 'millis': 0,
# 'subSteps': []}]}], 'type': {'@class': 'aggregator.QueryRecorder.RecordType', 'enum': 'EXPLAIN'}}

idx_rec = result["results"][0].get("steps")[0].get("indexLookupRecords")[0]
# print(idx_rec)
# {'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord', 'bytes': 6839, 'distinctValues': 5,
# 'extractor': '.age', 'index': 'Partitioned: Footprint=6.67KB, Size=5', 'indexDesc': 'Partitioned: ',
# 'ordered': False}
assert "index" in idx_rec

await cache.remove_index(Extractors.extract("age"))
result2 = await cache.aggregate(Aggregators.record(), None, Filters.greater("age", 25))
print(result2)
# {'@class': 'util.SimpleQueryRecord', 'results': [{'@class': 'util.SimpleQueryRecord.PartialResult',
# 'partitionSet': {'@class': 'net.partition.PartitionSet', 'bits': [2147483647], 'markedCount': -1,
# 'partitionCount': 31, 'tailMask': 2147483647}, 'steps': [{'@class': 'util.SimpleQueryRecord.PartialResult.Step',
# 'efficiency': 7000, 'filter': 'GreaterFilter(.age, 25)',
# 'indexLookupRecords': [{'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord', 'bytes': -1,
# 'distinctValues': -1, 'extractor': '.age', 'ordered': False}], 'keySetSizePost': 0, 'keySetSizePre': 7,
# 'millis': 0, 'subSteps': []}]}], 'type': {'@class': 'aggregator.QueryRecorder.RecordType', 'enum': 'EXPLAIN'}}
idx_rec = result2["results"][0].get("steps")[0].get("indexLookupRecords")[0]
# print(idx_rec)
# {'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord', 'bytes': -1, 'distinctValues': -1,
# 'extractor': '.age', 'ordered': False}
assert "index" not in idx_rec