Skip to content

Commit

Permalink
enhance: enable compatible for iterator(#2278) (#2297)
Browse files Browse the repository at this point in the history
related: #2278

Signed-off-by: MrPresent-Han <[email protected]>
Co-authored-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han and MrPresent-Han authored Oct 12, 2024
1 parent 70118a4 commit 967f94a
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions pymilvus/orm/iterator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from copy import deepcopy
from pathlib import Path
Expand Down Expand Up @@ -43,6 +44,7 @@
)
from .schema import CollectionSchema
from .types import DataType
from .utility import mkts_from_datetime

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
Expand All @@ -52,6 +54,11 @@
log = logging.getLogger(__name__)


def fall_back_to_latest_session_ts():
d = datetime.datetime.now()
return mkts_from_datetime(d, milliseconds=1000.0)


def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: bool) -> int:
extend_rate = 1
if to_extend_batch_size:
Expand Down Expand Up @@ -177,11 +184,17 @@ def __setup_ts_by_request(self):
timeout=self._timeout,
**init_ts_kwargs,
)
if res is not None and res.extra is not None:
if res is None:
raise MilvusException(
message="failed to connect to milvus for setting up "
"mvccTs, check milvus servers' status"
)
if res.extra is not None:
self._session_ts = res.extra.get(ITERATOR_SESSION_TS_FIELD, 0)
self._kwargs[GUARANTEE_TIMESTAMP] = self._session_ts
else:
raise MilvusException(message="failed to set up mvccTs for query iterator")
if self._session_ts <= 0:
log.warning("failed to get mvccTs from milvus server, use client-side ts instead")
self._session_ts = fall_back_to_latest_session_ts()
self._kwargs[GUARANTEE_TIMESTAMP] = self._session_ts

def __set_up_ts_cp(self):
self._cp_file_path = self._kwargs.get(ITERATOR_SESSION_CP_FILE, None)
Expand Down Expand Up @@ -429,7 +442,8 @@ def __init_search_iterator(self):
init_page = self.__execute_next_search(self._param, self._expr, False)
self._session_ts = init_page.get_session_ts()
if self._session_ts <= 0:
raise MilvusException(message="failed to set up mvccTs for search iterator")
log.warning("failed to set up mvccTs from milvus server, use client-side ts instead")
self._session_ts = fall_back_to_latest_session_ts()
self._kwargs[GUARANTEE_TIMESTAMP] = self._session_ts
if len(init_page) == 0:
message = (
Expand Down

0 comments on commit 967f94a

Please sign in to comment.