Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cannot pass consistency level for delete #2350

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion pymilvus/client/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,27 @@
from .singleton_utils import Singleton


def validate_strs(**kwargs):
"""validate if all values are legal non-emtpy str"""
invalid_pair = {k: v for k, v in kwargs.items() if not validate_str(v)}
if invalid_pair:
msg = f"Illegal str variables: {invalid_pair}, expect non-empty str"
raise ParamError(message=msg)


def validate_nullable_strs(**kwargs):
"""validate if all values are either None or legal non-empty str"""
invalid_pair = {k: v for k, v in kwargs.items() if v is not None and not validate_str(v)}
if invalid_pair:
msg = f"Illegal nullable str variables: {invalid_pair}, expect None or non-empty str"
raise ParamError(message=msg)


def validate_str(var: Any) -> bool:
"""check if a variable is legal non-empty str"""
return var and isinstance(var, str)


def is_legal_address(addr: Any) -> bool:
if not isinstance(addr, str):
return False
Expand Down Expand Up @@ -65,7 +86,7 @@ def is_legal_index_size(index_size: Any) -> bool:


def is_legal_table_name(table_name: Any) -> bool:
return table_name and isinstance(table_name, str)
return validate_str(table_name)


def is_legal_db_name(db_name: Any) -> bool:
Expand Down
12 changes: 5 additions & 7 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,17 +596,15 @@ def delete(
check_pass_param(collection_name=collection_name, timeout=timeout)
try:
req = Prepare.delete_request(
collection_name,
partition_name,
expression,
consistency_level=kwargs.get("consistency_level", 0),
param_name=kwargs.pop("param_name", None),
collection_name=collection_name,
filter=expression,
partition_name=partition_name,
consistency_level=kwargs.pop("consistency_level", 0),
**kwargs,
)
future = self._stub.Delete.future(req, timeout=timeout)

if kwargs.get("_async", False):
cb = kwargs.get("_callback")
cb = kwargs.pop("_callback")
czs007 marked this conversation as resolved.
Show resolved Hide resolved
f = MutationFuture(future, cb, timeout=timeout, **kwargs)
f.add_callback(ts_utils.update_ts_on_mutation(collection_name))
return f
Expand Down
28 changes: 10 additions & 18 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pymilvus.orm.schema import CollectionSchema
from pymilvus.orm.types import infer_dtype_by_scalar_data

from . import __version__, blob, entity_helper, ts_utils, utils
from . import __version__, blob, check, entity_helper, ts_utils, utils
from .check import check_pass_param, is_legal_collection_properties
from .constants import (
DEFAULT_CONSISTENCY_LEVEL,
Expand Down Expand Up @@ -734,29 +734,21 @@ def batch_upsert_param(
def delete_request(
cls,
collection_name: str,
partition_name: str,
expr: str,
consistency_level: Optional[Union[int, str]],
filter: str,
partition_name: Optional[str] = None,
consistency_level: Optional[Union[int, str]] = None,
**kwargs,
):
def check_str(instr: str, prefix: str):
if instr is None:
raise ParamError(message=f"{prefix} cannot be None")
if not isinstance(instr, str):
raise ParamError(message=f"{prefix} value {instr} is illegal")
if len(instr) == 0:
raise ParamError(message=f"{prefix} cannot be empty")

check_str(collection_name, "collection_name")
if partition_name is not None and partition_name != "":
check_str(partition_name, "partition_name")
param_name = kwargs.get("param_name", "expr")
check_str(expr, param_name)
check.validate_strs(
collection_name=collection_name,
filter=filter,
)
check.validate_nullable_strs(partition_name=partition_name)

return milvus_types.DeleteRequest(
collection_name=collection_name,
partition_name=partition_name,
expr=expr,
expr=filter,
consistency_level=get_consistency_level(consistency_level),
expr_template_values=cls.prepare_expression_template(kwargs.get("expr_params", {})),
)
Expand Down
43 changes: 22 additions & 21 deletions pymilvus/milvus_client/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ def search(
limit=limit,
output_fields=output_fields,
partition_names=partition_names,
timeout=timeout,
expr_params=kwargs.pop("filter_params", {}),
timeout=timeout,
**kwargs,
)
except Exception as ex:
Expand Down Expand Up @@ -543,10 +543,10 @@ def delete(
collection_name: str,
ids: Optional[Union[list, str, int]] = None,
timeout: Optional[float] = None,
filter: Optional[str] = "",
XuanYang-cn marked this conversation as resolved.
Show resolved Hide resolved
partition_name: Optional[str] = "",
filter: Optional[str] = None,
partition_name: Optional[str] = None,
**kwargs,
) -> Dict:
) -> Dict[str, int]:
"""Delete entries in the collection by their pk or by filter.

Starting from version 2.3.2, Milvus no longer includes the primary keys in the result
Expand All @@ -558,14 +558,17 @@ def delete(
Milvus(previous 2.3.2) is not empty, the list of primary keys is still returned.

Args:
ids (list, str, int): The pk's to delete. Depending on pk_field type it can be int
or str or alist of either. Default to None.
filter(str, optional): A filter to use for the deletion. Defaults to empty.
ids (list, str, int, optional): The pk's to delete.
Depending on pk_field type it can be int or str or a list of either.
Default to None.
filter(str, optional): A filter to use for the deletion. Defaults to none.
timeout (int, optional): Timeout to use, overides the client level assigned at init.
Defaults to None.

Note: You need to passin either ids or filter, and they cannot be used at the same time.

Returns:
Dict: Number of rows that were deleted.
Dict: with key 'deleted_count' and value number of rows that were deleted.
"""
pks = kwargs.get("pks", [])
if isinstance(pks, (int, str)):
Expand All @@ -589,35 +592,32 @@ def delete(
msg = f"wrong type of argument ids, expect list, int or str, got '{type(ids).__name__}'"
raise TypeError(msg)

# validate ambiguous delete filter param before describe collection rpc
if filter and len(pks) > 0:
raise ParamError(message=ExceptionsMessage.AmbiguousDeleteFilterParam)

expr = ""
conn = self._get_connection()
if pks:
if len(pks) > 0:
try:
schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs)
except Exception as ex:
logger.error("Failed to describe collection: %s", collection_name)
raise ex from ex

expr = self._pack_pks_expr(schema_dict, pks)

if filter:
if expr:
raise ParamError(message=ExceptionsMessage.AmbiguousDeleteFilterParam)

else:
if not isinstance(filter, str):
raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter))

expr = filter

ret_pks = []
try:
res = conn.delete(
collection_name,
expr,
partition_name,
timeout=timeout,
param_name="filter or ids",
collection_name=collection_name,
expression=expr,
partition_name=partition_name,
expr_params=kwargs.pop("filter_params", {}),
timeout=timeout,
**kwargs,
)
if res.primary_keys:
Expand All @@ -626,6 +626,7 @@ def delete(
logger.error("Failed to delete primary keys in collection: %s", collection_name)
raise ex from ex

# compatible with deletions that returns primary keys
if ret_pks:
return ret_pks

Expand Down
18 changes: 15 additions & 3 deletions tests/test_prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@


class TestPrepare:
@pytest.mark.parametrize("coll_name", [None, "", -1, 1.1, []])
@pytest.mark.parametrize("expr", [None, "", -1, 1.1, []])
def test_delete_request_wrong_coll_name(self, coll_name: str, expr: str):
with pytest.raises(MilvusException):
Prepare.delete_request(coll_name, expr, None, 0)

@pytest.mark.parametrize("part_name", [])
def test_delete_request_wrong_part_name(self, part_name):
with pytest.raises(MilvusException):
Prepare.delete_request("coll", "id>1", part_name, 0)


def test_search_requests_with_expr_offset(self):
fields = [
FieldSchema("pk", DataType.INT64, is_primary=True),
Expand Down Expand Up @@ -42,7 +54,7 @@ def test_search_requests_with_expr_offset(self):
params = json.loads(p.value)
if PAGE_RETAIN_ORDER_FIELD in params:
page_retain_order_exists = True
assert params[PAGE_RETAIN_ORDER_FIELD] == True
assert params[PAGE_RETAIN_ORDER_FIELD] is True

assert offset_exists is True
assert page_retain_order_exists is True
Expand Down Expand Up @@ -112,7 +124,7 @@ def test_get_schema_from_collection_schema(self):

c_schema = Prepare.get_schema_from_collection_schema("random", schema)

assert c_schema.enable_dynamic_field == False
assert c_schema.enable_dynamic_field is False
assert c_schema.name == "random"
assert len(c_schema.fields) == 2
assert c_schema.fields[0].name == "field_vector"
Expand Down Expand Up @@ -190,7 +202,7 @@ def test_row_insert_param_with_auto_id(self):
]

Prepare.row_insert_param("", rows, "", fields_info=schema.to_dict()["fields"], enable_dynamic=True)

def test_row_insert_param_with_none(self):
import numpy as np
rng = np.random.default_rng(seed=19530)
Expand Down