Skip to content

Commit

Permalink
enhance: support print iterator info(milvus-io#2261)
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Sep 11, 2024
1 parent baaca92 commit 4130caa
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
6 changes: 6 additions & 0 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,12 @@ def query_request(

ignore_growing = kwargs.get("ignore_growing", False)
stop_reduce_for_best = kwargs.get(REDUCE_STOP_FOR_BEST, False)
is_iterator = kwargs.get(ITERATOR_FIELD)
if is_iterator is not None:
req.query_params.append(
common_types.KeyValuePair(key=ITERATOR_FIELD, value=is_iterator)
)

req.query_params.append(
common_types.KeyValuePair(key="ignore_growing", value=str(ignore_growing))
)
Expand Down
1 change: 1 addition & 0 deletions pymilvus/orm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
IS_PRIMARY = "is_primary"
REDUCE_STOP_FOR_BEST = "reduce_stop_for_best"
ITERATOR_FIELD = "iterator"
PRINT_ITERATOR_CURSOR = "print_iterator_cursor"
DEFAULT_MAX_L2_DISTANCE = 99999999.0
DEFAULT_MIN_IP_DISTANCE = -99999999.0
DEFAULT_MAX_HAMMING_DISTANCE = 99999999.0
Expand Down
18 changes: 16 additions & 2 deletions pymilvus/orm/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
MILVUS_LIMIT,
OFFSET,
PARAMS,
PRINT_ITERATOR_CURSOR,
RADIUS,
RANGE_FILTER,
REDUCE_STOP_FOR_BEST,
Expand All @@ -40,10 +41,12 @@
from .types import DataType

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.ERROR)
LOGGER.setLevel(logging.INFO)
QueryIterator = TypeVar("QueryIterator")
SearchIterator = TypeVar("SearchIterator")

log = logging.getLogger(__name__)


def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: bool) -> int:
extend_rate = 1
Expand All @@ -56,6 +59,10 @@ def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: b
return min(MAX_BATCH_SIZE, batch_size * extend_rate)


def check_set_flag(obj: Any, flag_name: str, kwargs: Dict[str, Any], key: str):
setattr(obj, flag_name, kwargs.get(key, False))


class QueryIterator:
def __init__(
self,
Expand All @@ -81,6 +88,7 @@ def __init__(
self.__check_set_batch_size(batch_size)
self._limit = limit
self.__check_set_reduce_stop_for_best()
check_set_flag(self, "_print_iterator_cursor", self._kwargs, PRINT_ITERATOR_CURSOR)
self._returned_count = 0
self.__setup__pk_prop()
self.__set_up_expr(expr)
Expand Down Expand Up @@ -156,6 +164,8 @@ def next(self):
else:
iterator_cache.release_cache(self._cache_id_in_use)
current_expr = self.__setup_next_expr()
if self._print_iterator_cursor:
log.info(f"query_iterator_next_expr:{current_expr}")
res = self._conn.query(
collection_name=self._collection_name,
expr=current_expr,
Expand Down Expand Up @@ -194,7 +204,7 @@ def __setup__pk_prop(self):
if self._pk_field_name is None or self._pk_field_name == "":
raise MilvusException(message="schema must contain pk field, broke")

def __setup_next_expr(self) -> None:
def __setup_next_expr(self) -> str:
current_expr = self._expr
if self._next_id is None:
return current_expr
Expand Down Expand Up @@ -331,6 +341,7 @@ def __init__(
self.__check_rm_range_search_parameters()
self.__setup__pk_prop()
self.__init_search_iterator()
check_set_flag(self, "_print_iterator_cursor", self._kwargs, PRINT_ITERATOR_CURSOR)

def __init_search_iterator(self):
init_page = self.__execute_next_search(self._param, self._expr, False)
Expand Down Expand Up @@ -538,6 +549,8 @@ def __try_search_fill(self) -> SearchPage:
def __execute_next_search(
self, next_params: dict, next_expr: str, to_extend_batch: bool
) -> SearchPage:
if self.__print_iterator_cursor:
log.info(f"search_iterator_next_expr:{next_expr}, next_params:{next_params}")
res = self._conn.search(
self._iterator_params["collection_name"],
self._iterator_params["data"],
Expand All @@ -552,6 +565,7 @@ def __execute_next_search(
schema=self._schema,
**self._kwargs,
)

return SearchPage(res[0])

# at present, the range_filter parameter means 'larger/less and equal',
Expand Down

0 comments on commit 4130caa

Please sign in to comment.