diff --git a/pymilvus/client/prepare.py b/pymilvus/client/prepare.py index 1642e6a20..24ea4b79c 100644 --- a/pymilvus/client/prepare.py +++ b/pymilvus/client/prepare.py @@ -1026,6 +1026,12 @@ def query_request( ignore_growing = kwargs.get("ignore_growing", False) stop_reduce_for_best = kwargs.get(REDUCE_STOP_FOR_BEST, False) + is_iterator = kwargs.get(ITERATOR_FIELD) + if is_iterator is not None: + req.query_params.append( + common_types.KeyValuePair(key=ITERATOR_FIELD, value=is_iterator) + ) + req.query_params.append( common_types.KeyValuePair(key="ignore_growing", value=str(ignore_growing)) ) diff --git a/pymilvus/orm/constants.py b/pymilvus/orm/constants.py index b4980204d..7890718f9 100644 --- a/pymilvus/orm/constants.py +++ b/pymilvus/orm/constants.py @@ -39,6 +39,7 @@ IS_PRIMARY = "is_primary" REDUCE_STOP_FOR_BEST = "reduce_stop_for_best" ITERATOR_FIELD = "iterator" +PRINT_ITERATOR_CURSOR = "print_iterator_cursor" DEFAULT_MAX_L2_DISTANCE = 99999999.0 DEFAULT_MIN_IP_DISTANCE = -99999999.0 DEFAULT_MAX_HAMMING_DISTANCE = 99999999.0 diff --git a/pymilvus/orm/iterator.py b/pymilvus/orm/iterator.py index 0d84ad016..82b6edfe4 100644 --- a/pymilvus/orm/iterator.py +++ b/pymilvus/orm/iterator.py @@ -31,6 +31,7 @@ MILVUS_LIMIT, OFFSET, PARAMS, + PRINT_ITERATOR_CURSOR, RADIUS, RANGE_FILTER, REDUCE_STOP_FOR_BEST, @@ -40,10 +41,12 @@ from .types import DataType LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.ERROR) +LOGGER.setLevel(logging.INFO) QueryIterator = TypeVar("QueryIterator") SearchIterator = TypeVar("SearchIterator") +log = logging.getLogger(__name__) + def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: bool) -> int: extend_rate = 1 @@ -56,6 +59,10 @@ def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: b return min(MAX_BATCH_SIZE, batch_size * extend_rate) +def check_set_flag(obj: Any, flag_name: str, kwargs: Dict[str, Any], key: str): + setattr(obj, flag_name, kwargs.get(key, False)) + + class QueryIterator: def __init__( self, @@ -81,6 +88,7 @@ def __init__( self.__check_set_batch_size(batch_size) self._limit = limit self.__check_set_reduce_stop_for_best() + check_set_flag(self, "_print_iterator_cursor", self._kwargs, PRINT_ITERATOR_CURSOR) self._returned_count = 0 self.__setup__pk_prop() self.__set_up_expr(expr) @@ -156,6 +164,8 @@ def next(self): else: iterator_cache.release_cache(self._cache_id_in_use) current_expr = self.__setup_next_expr() + if self._print_iterator_cursor: + log.info(f"query_iterator_next_expr:{current_expr}") res = self._conn.query( collection_name=self._collection_name, expr=current_expr, @@ -194,7 +204,7 @@ def __setup__pk_prop(self): if self._pk_field_name is None or self._pk_field_name == "": raise MilvusException(message="schema must contain pk field, broke") - def __setup_next_expr(self) -> None: + def __setup_next_expr(self) -> str: current_expr = self._expr if self._next_id is None: return current_expr @@ -331,6 +341,7 @@ def __init__( self.__check_rm_range_search_parameters() self.__setup__pk_prop() self.__init_search_iterator() + check_set_flag(self, "_print_iterator_cursor", self._kwargs, PRINT_ITERATOR_CURSOR) def __init_search_iterator(self): init_page = self.__execute_next_search(self._param, self._expr, False) @@ -538,6 +549,8 @@ def __try_search_fill(self) -> SearchPage: def __execute_next_search( self, next_params: dict, next_expr: str, to_extend_batch: bool ) -> SearchPage: + if self.__print_iterator_cursor: + log.info(f"search_iterator_next_expr:{next_expr}, next_params:{next_params}") res = self._conn.search( self._iterator_params["collection_name"], self._iterator_params["data"], @@ -552,6 +565,7 @@ def __execute_next_search( schema=self._schema, **self._kwargs, ) + return SearchPage(res[0]) # at present, the range_filter parameter means 'larger/less and equal',