From 967f94aaa2f67d5335d52263afa1b308f8983fc4 Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Sat, 12 Oct 2024 17:25:15 +0800 Subject: [PATCH] enhance: enable compatible for iterator(#2278) (#2297) related: #2278 Signed-off-by: MrPresent-Han Co-authored-by: MrPresent-Han --- pymilvus/orm/iterator.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pymilvus/orm/iterator.py b/pymilvus/orm/iterator.py index 6fe1f64f6..2f3fd9a80 100644 --- a/pymilvus/orm/iterator.py +++ b/pymilvus/orm/iterator.py @@ -1,3 +1,4 @@ +import datetime import logging from copy import deepcopy from pathlib import Path @@ -43,6 +44,7 @@ ) from .schema import CollectionSchema from .types import DataType +from .utility import mkts_from_datetime LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.INFO) @@ -52,6 +54,11 @@ log = logging.getLogger(__name__) +def fall_back_to_latest_session_ts(): + d = datetime.datetime.now() + return mkts_from_datetime(d, milliseconds=1000.0) + + def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: bool) -> int: extend_rate = 1 if to_extend_batch_size: @@ -177,11 +184,17 @@ def __setup_ts_by_request(self): timeout=self._timeout, **init_ts_kwargs, ) - if res is not None and res.extra is not None: + if res is None: + raise MilvusException( + message="failed to connect to milvus for setting up " + "mvccTs, check milvus servers' status" + ) + if res.extra is not None: self._session_ts = res.extra.get(ITERATOR_SESSION_TS_FIELD, 0) - self._kwargs[GUARANTEE_TIMESTAMP] = self._session_ts - else: - raise MilvusException(message="failed to set up mvccTs for query iterator") + if self._session_ts <= 0: + log.warning("failed to get mvccTs from milvus server, use client-side ts instead") + self._session_ts = fall_back_to_latest_session_ts() + self._kwargs[GUARANTEE_TIMESTAMP] = self._session_ts def __set_up_ts_cp(self): self._cp_file_path = self._kwargs.get(ITERATOR_SESSION_CP_FILE, None) @@ -429,7 +442,8 @@ def __init_search_iterator(self): init_page = self.__execute_next_search(self._param, self._expr, False) self._session_ts = init_page.get_session_ts() if self._session_ts <= 0: - raise MilvusException(message="failed to set up mvccTs for search iterator") + log.warning("failed to set up mvccTs from milvus server, use client-side ts instead") + self._session_ts = fall_back_to_latest_session_ts() self._kwargs[GUARANTEE_TIMESTAMP] = self._session_ts if len(init_page) == 0: message = (