From 7a1a1ebb7e06e0260c755cf5f4f305576489144e Mon Sep 17 00:00:00 2001 From: annie-mac Date: Sat, 17 Aug 2024 18:17:56 -0700 Subject: [PATCH] remove async keyword from changeFeed query in aio package --- .../_change_feed/aio/change_feed_iterable.py | 104 +++++++++++++----- .../_change_feed/aio/change_feed_state.py | 56 +++++++--- ...feed_range_composite_continuation_token.py | 42 ++++--- .../azure/cosmos/_change_feed/feed_range.py | 102 +++++++++++++++++ .../azure/cosmos/aio/_container.py | 71 ++---------- .../azure-cosmos/azure/cosmos/container.py | 1 - .../azure/cosmos/partition_key.py | 18 ++- .../test/test_change_feed_async.py | 30 ++--- 8 files changed, 281 insertions(+), 143 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range.py diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_iterable.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_iterable.py index 501f3a7e4150..16a431653e9c 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_iterable.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_iterable.py @@ -21,11 +21,13 @@ """Iterable change feed results in the Azure Cosmos database service. """ + from azure.core.async_paging import AsyncPageIterator +from azure.cosmos import PartitionKey from azure.cosmos._change_feed.aio.change_feed_fetcher import ChangeFeedFetcherV1, ChangeFeedFetcherV2 from azure.cosmos._change_feed.aio.change_feed_state import ChangeFeedStateV1, ChangeFeedState -from azure.cosmos._utils import is_base64_encoded +from azure.cosmos._utils import is_base64_encoded, is_key_exists_and_not_none class ChangeFeedIterable(AsyncPageIterator): @@ -57,40 +59,30 @@ def __init__( self._options = options self._fetch_function = fetch_function self._collection_link = collection_link + self._change_feed_fetcher = None - change_feed_state = self._options.get("changeFeedState") - if not change_feed_state: - raise ValueError("Missing changeFeedState in feed options") + if not is_key_exists_and_not_none(self._options, "changeFeedStateContext"): + raise ValueError("Missing changeFeedStateContext in feed options") - if isinstance(change_feed_state, ChangeFeedStateV1): - if continuation_token: - if is_base64_encoded(continuation_token): - raise ValueError("Incompatible continuation token") - else: - change_feed_state.apply_server_response_continuation(continuation_token) + change_feed_state_context = self._options.pop("changeFeedStateContext") - self._change_feed_fetcher = ChangeFeedFetcherV1( - self._client, - self._collection_link, - self._options, - fetch_function - ) - else: - if continuation_token: - if not is_base64_encoded(continuation_token): - raise ValueError("Incompatible continuation token") + continuation = continuation_token if continuation_token is not None else change_feed_state_context.pop("continuation", None) - effective_change_feed_context = {"continuationFeedRange": continuation_token} - effective_change_feed_state = ChangeFeedState.from_json(change_feed_state.container_rid, effective_change_feed_context) - # replace with the effective change feed state - self._options["continuationFeedRange"] = effective_change_feed_state + # analysis and validate continuation token + # there are two types of continuation token we support currently: + # v1 version: the continuation token would just be the _etag, + # which is being returned when customer is using partition_key_range_id, + # which is under deprecation and does not support split/merge + # v2 version: the continuation token will be base64 encoded composition token which includes full change feed state + if continuation is not None: + if is_base64_encoded(continuation): + change_feed_state_context["continuationFeedRange"] = continuation + else: + change_feed_state_context["continuationPkRangeId"] = continuation + + self._validate_change_feed_state_context(change_feed_state_context) + self._options["changeFeedStateContext"] = change_feed_state_context - self._change_feed_fetcher = ChangeFeedFetcherV2( - self._client, - self._collection_link, - self._options, - fetch_function - ) super(ChangeFeedIterable, self).__init__(self._fetch_next, self._unpack, continuation_token=continuation_token) async def _unpack(self, block): @@ -112,7 +104,59 @@ async def _fetch_next(self, *args): # pylint: disable=unused-argument :return: List of results. :rtype: list """ + if self._change_feed_fetcher is None: + await self._initialize_change_feed_fetcher() + block = await self._change_feed_fetcher.fetch_next_block() if not block: raise StopAsyncIteration return block + + async def _initialize_change_feed_fetcher(self): + change_feed_state_context = self._options.pop("changeFeedStateContext") + conn_properties = await change_feed_state_context.pop("containerProperties") + if is_key_exists_and_not_none(change_feed_state_context, "partitionKey"): + change_feed_state_context["partitionKey"] = await change_feed_state_context.pop("partitionKey") + + pk_properties = conn_properties.get("partitionKey") + partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) + + change_feed_state =\ + ChangeFeedState.from_json(self._collection_link, conn_properties["_rid"], partition_key_definition, change_feed_state_context) + self._options["changeFeedState"] = change_feed_state + + if isinstance(change_feed_state, ChangeFeedStateV1): + self._change_feed_fetcher = ChangeFeedFetcherV1( + self._client, + self._collection_link, + self._options, + self._fetch_function + ) + else: + self._change_feed_fetcher = ChangeFeedFetcherV2( + self._client, + self._collection_link, + self._options, + self._fetch_function + ) + + def _validate_change_feed_state_context(self, change_feed_state_context: dict[str, any]) -> None: + + if is_key_exists_and_not_none(change_feed_state_context, "continuationPkRangeId"): + # if continuation token is in v1 format, throw exception if feed_range is set + if is_key_exists_and_not_none(change_feed_state_context, "feedRange"): + raise ValueError("feed_range and continuation are incompatible") + elif is_key_exists_and_not_none(change_feed_state_context, "continuationFeedRange"): + # if continuation token is in v2 format, since the token itself contains the full change feed state + # so we will ignore other parameters (including incompatible parameters) if they passed in + pass + else: + # validation when no continuation is passed + exclusive_keys = ["partitionKeyRangeId", "partitionKey", "feedRange"] + count = sum(1 for key in exclusive_keys if + key in change_feed_state_context and change_feed_state_context[key] is not None) + if count > 1: + raise ValueError( + "partition_key_range_id, partition_key, feed_range are exclusive parameters, please only set one of them") + + diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_state.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_state.py index ae2e37568bd4..eede9bd4fe15 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_state.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_state.py @@ -29,11 +29,12 @@ from abc import ABC, abstractmethod from typing import Optional, Union, List, Any -from azure.cosmos import http_constants +from azure.cosmos import http_constants, PartitionKey from azure.cosmos._change_feed.aio.change_feed_start_from import ChangeFeedStartFromETagAndFeedRange, \ ChangeFeedStartFromInternal from azure.cosmos._change_feed.aio.composite_continuation_token import CompositeContinuationToken from azure.cosmos._change_feed.aio.feed_range_composite_continuation_token import FeedRangeCompositeContinuation +from azure.cosmos._change_feed.feed_range import FeedRangeEpk, FeedRangePartitionKey, FeedRange from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider from azure.cosmos._routing.routing_range import Range from azure.cosmos._utils import is_key_exists_and_not_none @@ -49,7 +50,10 @@ def populate_feed_options(self, feed_options: dict[str, any]) -> None: pass @abstractmethod - async def populate_request_headers(self, routing_provider: SmartRoutingMapProvider, request_headers: dict[str, any]) -> None: + async def populate_request_headers( + self, + routing_provider: SmartRoutingMapProvider, + request_headers: dict[str, any]) -> None: pass @abstractmethod @@ -57,7 +61,11 @@ def apply_server_response_continuation(self, continuation: str) -> None: pass @staticmethod - def from_json(container_link: str, container_rid: str, data: dict[str, Any]): + def from_json( + container_link: str, + container_rid: str, + partition_key_definition: PartitionKey, + data: dict[str, Any]): if is_key_exists_and_not_none(data, "partitionKeyRangeId") or is_key_exists_and_not_none(data, "continuationPkRangeId"): return ChangeFeedStateV1.from_json(container_link, container_rid, data) else: @@ -69,11 +77,11 @@ def from_json(container_link: str, container_rid: str, data: dict[str, Any]): if version is None: raise ValueError("Invalid base64 encoded continuation string [Missing version]") elif version == "V2": - return ChangeFeedStateV2.from_continuation(container_link, container_rid, continuation_json) + return ChangeFeedStateV2.from_continuation(container_link, container_rid, partition_key_definition, continuation_json) else: raise ValueError("Invalid base64 encoded continuation string [Invalid version]") # when there is no continuation token, by default construct ChangeFeedStateV2 - return ChangeFeedStateV2.from_initial_state(container_link, container_rid, data) + return ChangeFeedStateV2.from_initial_state(container_link, container_rid, partition_key_definition, data) class ChangeFeedStateV1(ChangeFeedState): """Change feed state v1 implementation. This is used when partition key range id is used or the continuation is just simple _etag @@ -110,7 +118,10 @@ def from_json(cls, container_link: str, container_rid: str, data: dict[str, Any] data.get("continuationPkRangeId") ) - async def populate_request_headers(self, routing_provider: SmartRoutingMapProvider, headers: dict[str, Any]) -> None: + async def populate_request_headers( + self, + routing_provider: SmartRoutingMapProvider, + headers: dict[str, Any]) -> None: headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue # When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time @@ -140,7 +151,8 @@ def __init__( self, container_link: str, container_rid: str, - feed_range: Range, + partition_key_definition: PartitionKey, + feed_range: FeedRange, change_feed_start_from: ChangeFeedStartFromInternal, continuation: Optional[FeedRangeCompositeContinuation] = None): @@ -151,7 +163,9 @@ def __init__( self._continuation = continuation if self._continuation is None: composite_continuation_token_queue = collections.deque() - composite_continuation_token_queue.append(CompositeContinuationToken(self._feed_range, None)) + composite_continuation_token_queue.append(CompositeContinuationToken( + self._feed_range.get_normalized_range(partition_key_definition), + None)) self._continuation =\ FeedRangeCompositeContinuation(self._container_rid, self._feed_range, composite_continuation_token_queue) @@ -168,7 +182,10 @@ def to_dict(self) -> dict[str, Any]: self.continuation_property_name: self._continuation.to_dict() } - async def populate_request_headers(self, routing_provider: SmartRoutingMapProvider, headers: dict[str, any]) -> None: + async def populate_request_headers( + self, + routing_provider: SmartRoutingMapProvider, + headers: dict[str, any]) -> None: headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue # When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time @@ -224,6 +241,7 @@ def from_continuation( cls, container_link: str, container_rid: str, + partition_key_definition: PartitionKey, continuation_json: dict[str, Any]) -> 'ChangeFeedStateV2': container_rid_from_continuation = continuation_json.get(ChangeFeedStateV2.container_rid_property_name) @@ -244,6 +262,7 @@ def from_continuation( return ChangeFeedStateV2( container_link=container_link, container_rid=container_rid, + partition_key_definition=partition_key_definition, feed_range=continuation.feed_range, change_feed_start_from=change_feed_start_from, continuation=continuation) @@ -253,26 +272,29 @@ def from_initial_state( cls, container_link: str, collection_rid: str, + partition_key_definition: PartitionKey, data: dict[str, Any]) -> 'ChangeFeedStateV2': if is_key_exists_and_not_none(data, "feedRange"): feed_range_str = base64.b64decode(data["feedRange"]).decode('utf-8') feed_range_json = json.loads(feed_range_str) - feed_range = Range.ParseFromDict(feed_range_json) - elif is_key_exists_and_not_none(data, "partitionKeyFeedRange"): - feed_range = data["partitionKeyFeedRange"] + feed_range = FeedRangeEpk(Range.ParseFromDict(feed_range_json)) + elif is_key_exists_and_not_none(data, "partitionKey"): + feed_range = FeedRangePartitionKey(data["partitionKey"]) else: # default to full range - feed_range = Range( - "", - "FF", - True, - False) + feed_range = FeedRangeEpk( + Range( + "", + "FF", + True, + False)) change_feed_start_from = ChangeFeedStartFromInternal.from_start_time(data.get("startTime")) return cls( container_link=container_link, container_rid=collection_rid, + partition_key_definition=partition_key_definition, feed_range=feed_range, change_feed_start_from=change_feed_start_from, continuation=None) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/feed_range_composite_continuation_token.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/feed_range_composite_continuation_token.py index 6e1b8f974eea..d7bf97c0a903 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/feed_range_composite_continuation_token.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/feed_range_composite_continuation_token.py @@ -27,20 +27,21 @@ from typing import Any from azure.cosmos._change_feed.aio.composite_continuation_token import CompositeContinuationToken +from azure.cosmos._change_feed.feed_range import FeedRange, FeedRangeEpk, FeedRangePartitionKey from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider from azure.cosmos._routing.routing_range import Range +from azure.cosmos._utils import is_key_exists_and_not_none class FeedRangeCompositeContinuation(object): - _version_property_name = "V" - _container_rid_property_name = "Rid" - _continuation_property_name = "Continuation" - _feed_range_property_name = "Range" + _version_property_name = "v" + _container_rid_property_name = "rid" + _continuation_property_name = "continuation" def __init__( self, container_rid: str, - feed_range: Range, + feed_range: FeedRange, continuation: collections.deque[CompositeContinuationToken]): if container_rid is None: raise ValueError("container_rid is missing") @@ -55,31 +56,34 @@ def __init__( def current_token(self): return self._current_token + def get_feed_range(self) -> FeedRange: + if isinstance(self._feed_range, FeedRangeEpk): + return FeedRangeEpk(self.current_token.feed_range) + else: + return self._feed_range + def to_dict(self) -> dict[str, Any]: - return { - self._version_property_name: "v1", #TODO: should this start from v2 + json_data = { + self._version_property_name: "v2", self._container_rid_property_name: self._container_rid, self._continuation_property_name: [childToken.to_dict() for childToken in self._continuation], - self._feed_range_property_name: self._feed_range.to_dict() } + json_data.update(self._feed_range.to_dict()) + return json_data + @classmethod def from_json(cls, data) -> 'FeedRangeCompositeContinuation': version = data.get(cls._version_property_name) if version is None: raise ValueError(f"Invalid feed range composite continuation token [Missing {cls._version_property_name}]") - if version != "v1": + if version != "v2": raise ValueError("Invalid feed range composite continuation token [Invalid version]") container_rid = data.get(cls._container_rid_property_name) if container_rid is None: raise ValueError(f"Invalid feed range composite continuation token [Missing {cls._container_rid_property_name}]") - feed_range_data = data.get(cls._feed_range_property_name) - if feed_range_data is None: - raise ValueError(f"Invalid feed range composite continuation token [Missing {cls._feed_range_property_name}]") - feed_range = Range.ParseFromDict(feed_range_data) - continuation_data = data.get(cls._continuation_property_name) if continuation_data is None: raise ValueError(f"Invalid feed range composite continuation token [Missing {cls._continuation_property_name}]") @@ -87,6 +91,14 @@ def from_json(cls, data) -> 'FeedRangeCompositeContinuation': raise ValueError(f"Invalid feed range composite continuation token [The {cls._continuation_property_name} must be non-empty array]") continuation = [CompositeContinuationToken.from_json(child_range_continuation_token) for child_range_continuation_token in continuation_data] + # parsing feed range + if is_key_exists_and_not_none(data, FeedRangeEpk.type_property_name): + feed_range = FeedRangeEpk.from_json({ FeedRangeEpk.type_property_name: data[FeedRangeEpk.type_property_name] }) + elif is_key_exists_and_not_none(data, FeedRangePartitionKey.type_property_name): + feed_range = FeedRangePartitionKey.from_json({ FeedRangePartitionKey.type_property_name: data[FeedRangePartitionKey.type_property_name] }) + else: + raise ValueError("Invalid feed range composite continuation token [Missing feed range scope]") + return cls(container_rid=container_rid, feed_range=feed_range, continuation=deque(continuation)) async def handle_feed_range_gone(self, routing_provider: SmartRoutingMapProvider, collection_link: str) -> None: @@ -130,5 +142,5 @@ def apply_not_modified_response(self) -> None: self._initial_no_result_range = self._current_token.feed_range @property - def feed_range(self) -> Range: + def feed_range(self) -> FeedRange: return self._feed_range diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range.py new file mode 100644 index 000000000000..a4b4b5dfedda --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range.py @@ -0,0 +1,102 @@ +# The MIT License (MIT) +# Copyright (c) 2014 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Internal class for feed range implementation in the Azure Cosmos +database service. +""" +import json +from abc import ABC, abstractmethod +from typing import Union, List + +from azure.cosmos import PartitionKey +from azure.cosmos._routing.routing_range import Range +from azure.cosmos._utils import is_key_exists_and_not_none +from azure.cosmos.partition_key import _Undefined, _Empty + + +class FeedRange(ABC): + + @abstractmethod + def get_normalized_range(self, partition_key_range_definition: PartitionKey) -> Range: + pass + + @abstractmethod + def to_dict(self) -> dict[str, any]: + pass + +class FeedRangePartitionKey(FeedRange): + type_property_name = "PK" + + def __init__( + self, + pk_value: Union[str, int, float, bool, List[Union[str, int, float, bool]], _Empty, _Undefined]): + if pk_value is None: + raise ValueError("PartitionKey cannot be None") + + self._pk_value = pk_value + + def get_normalized_range(self, partition_key_definition: PartitionKey) -> Range: + return partition_key_definition._get_epk_range_for_partition_key(self._pk_value).to_normalized_range() + + def to_dict(self) -> dict[str, any]: + if isinstance(self._pk_value, _Undefined): + return { self.type_property_name: [{}] } + elif isinstance(self._pk_value, _Empty): + return { self.type_property_name: [] } + else: + return { self.type_property_name: json.dumps(self._pk_value) } + + @classmethod + def from_json(cls, data: dict[str, any]) -> 'FeedRangePartitionKey': + if is_key_exists_and_not_none(data, cls.type_property_name): + pk_value = data.get(cls.type_property_name) + if isinstance(pk_value, list): + if not pk_value: + return cls(_Empty()) + if pk_value == [{}]: + return cls(_Undefined()) + + return cls(json.loads(data.get(cls.type_property_name))) + raise ValueError(f"Can not parse FeedRangePartitionKey from the json, there is no property {cls.type_property_name}") + +class FeedRangeEpk(FeedRange): + type_property_name = "Range" + + def __init__(self, feed_range: Range): + if feed_range is None: + raise ValueError("feed_range cannot be None") + + self._range = feed_range + + def get_normalized_range(self, partition_key_definition: PartitionKey) -> Range: + return self._range.to_normalized_range() + + def to_dict(self) -> dict[str, any]: + return { + self.type_property_name: self._range.to_dict() + } + + @classmethod + def from_json(cls, data: dict[str, any]) -> 'FeedRangeEpk': + if is_key_exists_and_not_none(data, cls.type_property_name): + feed_range = Range.ParseFromDict(data.get(cls.type_property_name)) + return cls(feed_range) + raise ValueError(f"Can not parse FeedRangeEPK from the json, there is no property {cls.type_property_name}") \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index a8559839aad7..c68ddad7eb0d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -41,10 +41,9 @@ GenerateGuidId, _set_properties_cache ) -from .._change_feed.aio.change_feed_state import ChangeFeedState from .._routing import routing_range from .._routing.routing_range import Range -from .._utils import is_key_exists_and_not_none, is_base64_encoded +from .._utils import is_key_exists_and_not_none from ..offer import ThroughputProperties from ..partition_key import ( NonePartitionKeyValue, @@ -137,25 +136,12 @@ async def _set_partition_key( return _return_undefined_or_empty_partition_key(await self.is_system_key) return cast(Union[str, int, float, bool, List[Union[str, int, float, bool]]], partition_key) - async def _get_epk_range_for_partition_key(self, partition_key_value: PartitionKeyType) -> Range: - container_properties = await self._get_properties() - partition_key_definition = container_properties.get("partitionKey") - partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"]) - - is_prefix_partition_key = await self.__is_prefix_partition_key(partition_key_value) - - return partition_key._get_epk_range_for_partition_key(partition_key_value, is_prefix_partition_key) - async def __is_prefix_partition_key(self, partition_key: PartitionKeyType) -> bool: properties = await self._get_properties() pk_properties = properties.get("partitionKey") partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) - if partition_key_definition.kind != "MultiHash": - return False - if isinstance(partition_key, list) and len(partition_key_definition['paths']) == len(partition_key): - return False - return True + return partition_key_definition._is_prefix_partition_key(partition_key) @distributed_trace_async async def read( @@ -506,13 +492,12 @@ def query_items( return items @overload - async def query_items_change_feed( + def query_items_change_feed( self, *, max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, partition_key: Optional[PartitionKeyType] = None, - # -> would RU usage be more efficient, bug to backend team? deprecate it or using FeedRange to convert? priority: Optional[Literal["High", "Low"]] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: @@ -561,7 +546,7 @@ async def query_items_change_feed( ... @overload - async def query_items_change_feed( + def query_items_change_feed( self, *, continuation: Optional[str] = None, @@ -582,7 +567,7 @@ async def query_items_change_feed( ... @distributed_trace - async def query_items_change_feed( + def query_items_change_feed( self, *args: Any, **kwargs: Any @@ -637,16 +622,7 @@ async def query_items_change_feed( continuation = feed_options.pop('continuation') except KeyError: continuation = args[2] - - # there are two types of continuation token we support currently: - # v1 version: the continuation token would just be the _etag, - # which is being returned when customer is using partition_key_range_id, - # which is under deprecation and does not support split/merge - # v2 version: the continuation token will be base64 encoded composition token which includes full change feed state - if is_base64_encoded(continuation): - change_feed_state_context["continuationFeedRange"] = continuation - else: - change_feed_state_context["continuationPkRangeId"] = continuation + change_feed_state_context["continuation"] = continuation if len(args) >= 4 and args[3] is not None or is_key_exists_and_not_none(kwargs, "max_item_count"): try: @@ -655,42 +631,21 @@ async def query_items_change_feed( feed_options["maxItemCount"] = args[3] if is_key_exists_and_not_none(kwargs, "partition_key"): - partition_key = kwargs.pop("partition_key") - change_feed_state_context["partitionKey"] = await self._set_partition_key(partition_key) - change_feed_state_context["partitionKeyFeedRange"] = await self._get_epk_range_for_partition_key(partition_key) + change_feed_state_context["partitionKey"] = self._set_partition_key(kwargs.pop("partition_key")) if is_key_exists_and_not_none(kwargs, "feed_range"): change_feed_state_context["feedRange"] = kwargs.pop('feed_range') - # validate exclusive or in-compatible parameters - if is_key_exists_and_not_none(change_feed_state_context, "continuationPkRangeId"): - # if continuation token is in v1 format, throw exception if feed_range is set - if is_key_exists_and_not_none(change_feed_state_context, "feedRange"): - raise ValueError("feed_range and continuation are incompatible") - elif is_key_exists_and_not_none(change_feed_state_context, "continuationFeedRange"): - # if continuation token is in v2 format, since the token itself contains the full change feed state - # so we will ignore other parameters if they passed in - if is_key_exists_and_not_none(change_feed_state_context, "partitionKeyRangeId"): - raise ValueError("partition_key_range_id and continuation are incompatible") - else: - # validation when no continuation is passed - exclusive_keys = ["partitionKeyRangeId", "partitionKey", "feedRange"] - count = sum(1 for key in exclusive_keys if - key in change_feed_state_context and change_feed_state_context[key] is not None) - if count > 1: - raise ValueError( - "partition_key_range_id, partition_key, feed_range are exclusive parameters, please only set one of them") - - container_properties = await self._get_properties() - container_rid = container_properties.get("_rid") - change_feed_state = ChangeFeedState.from_json(self.container_link, container_rid, change_feed_state_context) - feed_options["changeFeedState"] = change_feed_state - feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] + change_feed_state_context["containerProperties"] = self._get_properties() + feed_options["changeFeedStateContext"] = change_feed_state_context response_hook = kwargs.pop('response_hook', None) if hasattr(response_hook, "clear"): response_hook.clear() + if self.container_link in self.__get_client_container_caches(): + feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] + result = self.client_connection.QueryItemsChangeFeed( self.container_link, options=feed_options, response_hook=response_hook, **kwargs ) @@ -1269,5 +1224,3 @@ async def read_feed_ranges( [Range("", "FF", True, False)]) return [routing_range.Range.PartitionKeyRangeToRange(partitionKeyRange).to_base64_encoded_string() for partitionKeyRange in partition_key_ranges] - - diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 32fd818075f8..d4f1d5480241 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -327,7 +327,6 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, partition_key: Optional[PartitionKeyType] = None, - # -> would RU usage be more efficient, bug to backend team? deprecate it or using FeedRange to convert? priority: Optional[Literal["High", "Low"]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py index 9f0a5cde29a2..c89e1d9ac771 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py @@ -175,17 +175,14 @@ def _get_epk_range_for_prefix_partition_key( def _get_epk_range_for_partition_key( self, - pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]], - is_prefix_pk_value: bool = False + pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] ) -> _Range: - if is_prefix_pk_value: + if self._is_prefix_partition_key(pk_value): return self._get_epk_range_for_prefix_partition_key(pk_value) # else return point range effective_partition_key_string = self._get_effective_partition_key_string(pk_value) - partition_key_range = _Range(effective_partition_key_string, effective_partition_key_string, True, True) - - return partition_key_range.to_normalized_range() + return _Range(effective_partition_key_string, effective_partition_key_string, True, True) def _get_effective_partition_key_for_hash_partitioning(self) -> str: # We shouldn't be supporting V1 @@ -279,6 +276,15 @@ def _get_effective_partition_key_for_multi_hash_partitioning_v2( return ''.join(sb).upper() + def _is_prefix_partition_key( + self, + partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]]) -> bool: + if self.kind!= "MultiHash": + return False + if isinstance(partition_key, list) and len(self.path) == len(partition_key): + return False + return True + def _return_undefined_or_empty_partition_key(is_system_key: bool) -> Union[_Empty, _Undefined]: if is_system_key: diff --git a/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py b/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py index c3246768a796..b4165af0601c 100644 --- a/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py +++ b/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py @@ -76,12 +76,12 @@ async def test_query_change_feed_with_different_filter_async(self, change_feed_f filter_param = None # Read change feed without passing any options - query_iterable = await created_collection.query_items_change_feed() + query_iterable = created_collection.query_items_change_feed() iter_list = [item async for item in query_iterable] assert len(iter_list) == 0 # Read change feed from current should return an empty list - query_iterable = await created_collection.query_items_change_feed(filter_param) + query_iterable = created_collection.query_items_change_feed(filter_param) iter_list = [item async for item in query_iterable] assert len(iter_list) == 0 if 'Etag' in created_collection.client_connection.last_response_headers: @@ -92,7 +92,7 @@ async def test_query_change_feed_with_different_filter_async(self, change_feed_f fail("No Etag or etag found in last response headers") # Read change feed from beginning should return an empty list - query_iterable = await created_collection.query_items_change_feed( + query_iterable = created_collection.query_items_change_feed( is_start_from_beginning=True, **filter_param ) @@ -109,7 +109,7 @@ async def test_query_change_feed_with_different_filter_async(self, change_feed_f # Create a document. Read change feed should return be able to read that document document_definition = {'pk': 'pk', 'id': 'doc1'} await created_collection.create_item(body=document_definition) - query_iterable = await created_collection.query_items_change_feed( + query_iterable = created_collection.query_items_change_feed( is_start_from_beginning=True, **filter_param ) @@ -134,7 +134,7 @@ async def test_query_change_feed_with_different_filter_async(self, change_feed_f for pageSize in [2, 100]: # verify iterator - query_iterable = await created_collection.query_items_change_feed( + query_iterable = created_collection.query_items_change_feed( continuation=continuation2, max_item_count=pageSize, **filter_param) @@ -147,7 +147,7 @@ async def test_query_change_feed_with_different_filter_async(self, change_feed_f # verify by_page # the options is not copied, therefore it need to be restored - query_iterable = await created_collection.query_items_change_feed( + query_iterable = created_collection.query_items_change_feed( continuation=continuation2, max_item_count=pageSize, **filter_param @@ -167,7 +167,7 @@ async def test_query_change_feed_with_different_filter_async(self, change_feed_f assert actual_ids == expected_ids # verify reading change feed from the beginning - query_iterable = await created_collection.query_items_change_feed( + query_iterable = created_collection.query_items_change_feed( is_start_from_beginning=True, **filter_param ) @@ -184,7 +184,7 @@ async def test_query_change_feed_with_different_filter_async(self, change_feed_f fail("No Etag or etag found in last response headers") # verify reading empty change feed - query_iterable = await created_collection.query_items_change_feed( + query_iterable = created_collection.query_items_change_feed( continuation=continuation3, is_start_from_beginning=True, **filter_param @@ -235,7 +235,7 @@ async def create_random_items(container, batch_size): await create_random_items(created_collection, batchSize) # now query change feed based on start time - change_feed_iter = [i async for i in await created_collection.query_items_change_feed(start_time=start_time)] + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=start_time)] totalCount = len(change_feed_iter) # now check if the number of items that were changed match the batch size @@ -243,13 +243,13 @@ async def create_random_items(container, batch_size): # negative test: pass in a valid time in the future future_time = start_time + timedelta(hours=1) - change_feed_iter = [i async for i in await created_collection.query_items_change_feed(start_time=future_time)] + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=future_time)] totalCount = len(change_feed_iter) # A future time should return 0 assert totalCount == 0 # test a date that is not utc, will be converted to utc by sdk - change_feed_iter = [i async for i in await created_collection.query_items_change_feed(start_time=not_utc_time)] + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=not_utc_time)] totalCount = len(change_feed_iter) # Should equal batch size assert totalCount == batchSize @@ -257,7 +257,7 @@ async def create_random_items(container, batch_size): # test an invalid value, Attribute error will be raised for passing non datetime object invalid_time = "Invalid value" try: - change_feed_iter = [i async for i in await created_collection.query_items_change_feed(start_time=invalid_time)] + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=invalid_time)] fail("Cannot format date on a non datetime object.") except ValueError as e: assert ("Invalid start_time 'Invalid value'" == e.args[0]) @@ -270,7 +270,7 @@ async def test_query_change_feed_with_split_async(self, setup): offer_throughput=400) # initial change feed query returns empty result - query_iterable = await created_collection.query_items_change_feed(start_time="Beginning") + query_iterable = created_collection.query_items_change_feed(start_time="Beginning") iter_list = [item async for item in query_iterable] assert len(iter_list) == 0 continuation = created_collection.client_connection.last_response_headers['etag'] @@ -279,7 +279,7 @@ async def test_query_change_feed_with_split_async(self, setup): # create one doc and make sure change feed query can return the document document_definition = {'pk': 'pk', 'id': 'doc1'} await created_collection.create_item(body=document_definition) - query_iterable = await created_collection.query_items_change_feed(continuation=continuation) + query_iterable = created_collection.query_items_change_feed(continuation=continuation) iter_list = [item async for item in query_iterable] assert len(iter_list) == 1 continuation = created_collection.client_connection.last_response_headers['etag'] @@ -309,7 +309,7 @@ async def test_query_change_feed_with_split_async(self, setup): for document in new_documents: await created_collection.create_item(body=document) - query_iterable = await created_collection.query_items_change_feed(continuation=continuation) + query_iterable = created_collection.query_items_change_feed(continuation=continuation) it = query_iterable.__aiter__() actual_ids = [] async for item in it: