Skip to content

Commit

Permalink
support delete with consistency level (#1841)
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Jan 2, 2024
1 parent f67c8c9 commit cfc5187
Show file tree
Hide file tree
Showing 11 changed files with 615 additions and 409 deletions.
7 changes: 6 additions & 1 deletion pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,12 @@ def delete(
):
check_pass_param(collection_name=collection_name)
try:
req = Prepare.delete_request(collection_name, partition_name, expression)
req = Prepare.delete_request(
collection_name,
partition_name,
expression,
consistency_level=kwargs.get("consistency_level", 0),
)
future = self._stub.Delete.future(req, timeout=timeout)

if kwargs.get("_async", False):
Expand Down
19 changes: 16 additions & 3 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

from . import blob, entity_helper, ts_utils
from .check import check_pass_param, is_legal_collection_properties
from .constants import DEFAULT_CONSISTENCY_LEVEL, GROUP_BY_FIELD, REDUCE_STOP_FOR_BEST
from .constants import (
DEFAULT_CONSISTENCY_LEVEL,
GROUP_BY_FIELD,
REDUCE_STOP_FOR_BEST,
)
from .types import DataType, PlaceholderType, get_consistency_level
from .utils import traverse_info, traverse_rows_info

Expand Down Expand Up @@ -535,7 +539,13 @@ def batch_upsert_param(
return cls._parse_batch_request(request, entities, fields_info, location)

@classmethod
def delete_request(cls, collection_name: str, partition_name: str, expr: str):
def delete_request(
cls,
collection_name: str,
partition_name: str,
expr: str,
consistency_level: Optional[Union[int, str]],
):
def check_str(instr: str, prefix: str):
if instr is None:
raise ParamError(message=f"{prefix} cannot be None")
Expand All @@ -550,7 +560,10 @@ def check_str(instr: str, prefix: str):
check_str(expr, "expr")

return milvus_types.DeleteRequest(
collection_name=collection_name, partition_name=partition_name, expr=expr
collection_name=collection_name,
partition_name=partition_name,
expr=expr,
consistency_level=get_consistency_level(consistency_level),
)

@classmethod
Expand Down
130 changes: 67 additions & 63 deletions pymilvus/grpc_gen/common_pb2.py

Large diffs are not rendered by default.

20 changes: 18 additions & 2 deletions pymilvus/grpc_gen/common_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class PlaceholderType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
BinaryVector: _ClassVar[PlaceholderType]
FloatVector: _ClassVar[PlaceholderType]
Float16Vector: _ClassVar[PlaceholderType]
BFloat16Vector: _ClassVar[PlaceholderType]
Int64: _ClassVar[PlaceholderType]
VarChar: _ClassVar[PlaceholderType]

Expand Down Expand Up @@ -277,6 +278,11 @@ class ObjectPrivilege(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
PrivilegeDropDatabase: _ClassVar[ObjectPrivilege]
PrivilegeListDatabases: _ClassVar[ObjectPrivilege]
PrivilegeFlushAll: _ClassVar[ObjectPrivilege]
PrivilegeCreatePartition: _ClassVar[ObjectPrivilege]
PrivilegeDropPartition: _ClassVar[ObjectPrivilege]
PrivilegeShowPartitions: _ClassVar[ObjectPrivilege]
PrivilegeHasPartition: _ClassVar[ObjectPrivilege]
PrivilegeGetFlushState: _ClassVar[ObjectPrivilege]

class StateCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
Expand Down Expand Up @@ -369,6 +375,7 @@ None: PlaceholderType
BinaryVector: PlaceholderType
FloatVector: PlaceholderType
Float16Vector: PlaceholderType
BFloat16Vector: PlaceholderType
Int64: PlaceholderType
VarChar: PlaceholderType
Undefined: MsgType
Expand Down Expand Up @@ -530,6 +537,11 @@ PrivilegeCreateDatabase: ObjectPrivilege
PrivilegeDropDatabase: ObjectPrivilege
PrivilegeListDatabases: ObjectPrivilege
PrivilegeFlushAll: ObjectPrivilege
PrivilegeCreatePartition: ObjectPrivilege
PrivilegeDropPartition: ObjectPrivilege
PrivilegeShowPartitions: ObjectPrivilege
PrivilegeHasPartition: ObjectPrivilege
PrivilegeGetFlushState: ObjectPrivilege
Initializing: StateCode
Healthy: StateCode
Abnormal: StateCode
Expand All @@ -543,14 +555,18 @@ PRIVILEGE_EXT_OBJ_FIELD_NUMBER: _ClassVar[int]
privilege_ext_obj: _descriptor.FieldDescriptor

class Status(_message.Message):
__slots__ = ["error_code", "reason", "code"]
__slots__ = ["error_code", "reason", "code", "retriable", "detail"]
ERROR_CODE_FIELD_NUMBER: _ClassVar[int]
REASON_FIELD_NUMBER: _ClassVar[int]
CODE_FIELD_NUMBER: _ClassVar[int]
RETRIABLE_FIELD_NUMBER: _ClassVar[int]
DETAIL_FIELD_NUMBER: _ClassVar[int]
error_code: ErrorCode
reason: str
code: int
def __init__(self, error_code: _Optional[_Union[ErrorCode, str]] = ..., reason: _Optional[str] = ..., code: _Optional[int] = ...) -> None: ...
retriable: bool
detail: str
def __init__(self, error_code: _Optional[_Union[ErrorCode, str]] = ..., reason: _Optional[str] = ..., code: _Optional[int] = ..., retriable: bool = ..., detail: _Optional[str] = ...) -> None: ...

class KeyValuePair(_message.Message):
__slots__ = ["key", "value"]
Expand Down
626 changes: 332 additions & 294 deletions pymilvus/grpc_gen/milvus_pb2.py

Large diffs are not rendered by default.

48 changes: 46 additions & 2 deletions pymilvus/grpc_gen/milvus_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,20 @@ class CreateIndexRequest(_message.Message):
index_name: str
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., field_name: _Optional[str] = ..., extra_params: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ..., index_name: _Optional[str] = ...) -> None: ...

class AlterIndexRequest(_message.Message):
__slots__ = ["base", "db_name", "collection_name", "index_name", "extra_params"]
BASE_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
COLLECTION_NAME_FIELD_NUMBER: _ClassVar[int]
INDEX_NAME_FIELD_NUMBER: _ClassVar[int]
EXTRA_PARAMS_FIELD_NUMBER: _ClassVar[int]
base: _common_pb2.MsgBase
db_name: str
collection_name: str
index_name: str
extra_params: _containers.RepeatedCompositeFieldContainer[_common_pb2.KeyValuePair]
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., index_name: _Optional[str] = ..., extra_params: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ...) -> None: ...

class DescribeIndexRequest(_message.Message):
__slots__ = ["base", "db_name", "collection_name", "field_name", "index_name", "timestamp"]
BASE_FIELD_NUMBER: _ClassVar[int]
Expand Down Expand Up @@ -686,20 +700,22 @@ class MutationResult(_message.Message):
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., IDs: _Optional[_Union[_schema_pb2.IDs, _Mapping]] = ..., succ_index: _Optional[_Iterable[int]] = ..., err_index: _Optional[_Iterable[int]] = ..., acknowledged: bool = ..., insert_cnt: _Optional[int] = ..., delete_cnt: _Optional[int] = ..., upsert_cnt: _Optional[int] = ..., timestamp: _Optional[int] = ...) -> None: ...

class DeleteRequest(_message.Message):
__slots__ = ["base", "db_name", "collection_name", "partition_name", "expr", "hash_keys"]
__slots__ = ["base", "db_name", "collection_name", "partition_name", "expr", "hash_keys", "consistency_level"]
BASE_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
COLLECTION_NAME_FIELD_NUMBER: _ClassVar[int]
PARTITION_NAME_FIELD_NUMBER: _ClassVar[int]
EXPR_FIELD_NUMBER: _ClassVar[int]
HASH_KEYS_FIELD_NUMBER: _ClassVar[int]
CONSISTENCY_LEVEL_FIELD_NUMBER: _ClassVar[int]
base: _common_pb2.MsgBase
db_name: str
collection_name: str
partition_name: str
expr: str
hash_keys: _containers.RepeatedScalarFieldContainer[int]
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., partition_name: _Optional[str] = ..., expr: _Optional[str] = ..., hash_keys: _Optional[_Iterable[int]] = ...) -> None: ...
consistency_level: _common_pb2.ConsistencyLevel
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., partition_name: _Optional[str] = ..., expr: _Optional[str] = ..., hash_keys: _Optional[_Iterable[int]] = ..., consistency_level: _Optional[_Union[_common_pb2.ConsistencyLevel, str]] = ...) -> None: ...

class SearchRequest(_message.Message):
__slots__ = ["base", "db_name", "collection_name", "partition_names", "dsl", "placeholder_group", "dsl_type", "output_fields", "search_params", "travel_timestamp", "guarantee_timestamp", "nq", "not_return_all_meta", "consistency_level", "use_default_consistency", "search_by_primary_keys"]
Expand Down Expand Up @@ -757,6 +773,34 @@ class SearchResults(_message.Message):
collection_name: str
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., results: _Optional[_Union[_schema_pb2.SearchResultData, _Mapping]] = ..., collection_name: _Optional[str] = ...) -> None: ...

class SearchRequestV2(_message.Message):
__slots__ = ["base", "db_name", "collection_name", "partition_names", "requests", "rank_params", "travel_timestamp", "guarantee_timestamp", "not_return_all_meta", "output_fields", "consistency_level", "use_default_consistency"]
BASE_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
COLLECTION_NAME_FIELD_NUMBER: _ClassVar[int]
PARTITION_NAMES_FIELD_NUMBER: _ClassVar[int]
REQUESTS_FIELD_NUMBER: _ClassVar[int]
RANK_PARAMS_FIELD_NUMBER: _ClassVar[int]
TRAVEL_TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
GUARANTEE_TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
NOT_RETURN_ALL_META_FIELD_NUMBER: _ClassVar[int]
OUTPUT_FIELDS_FIELD_NUMBER: _ClassVar[int]
CONSISTENCY_LEVEL_FIELD_NUMBER: _ClassVar[int]
USE_DEFAULT_CONSISTENCY_FIELD_NUMBER: _ClassVar[int]
base: _common_pb2.MsgBase
db_name: str
collection_name: str
partition_names: _containers.RepeatedScalarFieldContainer[str]
requests: _containers.RepeatedCompositeFieldContainer[SearchRequest]
rank_params: _containers.RepeatedCompositeFieldContainer[_common_pb2.KeyValuePair]
travel_timestamp: int
guarantee_timestamp: int
not_return_all_meta: bool
output_fields: _containers.RepeatedScalarFieldContainer[str]
consistency_level: _common_pb2.ConsistencyLevel
use_default_consistency: bool
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., partition_names: _Optional[_Iterable[str]] = ..., requests: _Optional[_Iterable[_Union[SearchRequest, _Mapping]]] = ..., rank_params: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ..., travel_timestamp: _Optional[int] = ..., guarantee_timestamp: _Optional[int] = ..., not_return_all_meta: bool = ..., output_fields: _Optional[_Iterable[str]] = ..., consistency_level: _Optional[_Union[_common_pb2.ConsistencyLevel, str]] = ..., use_default_consistency: bool = ...) -> None: ...

class FlushRequest(_message.Message):
__slots__ = ["base", "db_name", "collection_names"]
BASE_FIELD_NUMBER: _ClassVar[int]
Expand Down
66 changes: 66 additions & 0 deletions pymilvus/grpc_gen/milvus_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ def __init__(self, channel):
request_serializer=milvus__pb2.CreateIndexRequest.SerializeToString,
response_deserializer=common__pb2.Status.FromString,
)
self.AlterIndex = channel.unary_unary(
'/milvus.proto.milvus.MilvusService/AlterIndex',
request_serializer=milvus__pb2.AlterIndexRequest.SerializeToString,
response_deserializer=common__pb2.Status.FromString,
)
self.DescribeIndex = channel.unary_unary(
'/milvus.proto.milvus.MilvusService/DescribeIndex',
request_serializer=milvus__pb2.DescribeIndexRequest.SerializeToString,
Expand Down Expand Up @@ -181,6 +186,11 @@ def __init__(self, channel):
request_serializer=milvus__pb2.SearchRequest.SerializeToString,
response_deserializer=milvus__pb2.SearchResults.FromString,
)
self.SearchV2 = channel.unary_unary(
'/milvus.proto.milvus.MilvusService/SearchV2',
request_serializer=milvus__pb2.SearchRequestV2.SerializeToString,
response_deserializer=milvus__pb2.SearchResults.FromString,
)
self.Flush = channel.unary_unary(
'/milvus.proto.milvus.MilvusService/Flush',
request_serializer=milvus__pb2.FlushRequest.SerializeToString,
Expand Down Expand Up @@ -570,6 +580,12 @@ def CreateIndex(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def AlterIndex(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def DescribeIndex(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
Expand Down Expand Up @@ -626,6 +642,12 @@ def Search(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def SearchV2(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Flush(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
Expand Down Expand Up @@ -1042,6 +1064,11 @@ def add_MilvusServiceServicer_to_server(servicer, server):
request_deserializer=milvus__pb2.CreateIndexRequest.FromString,
response_serializer=common__pb2.Status.SerializeToString,
),
'AlterIndex': grpc.unary_unary_rpc_method_handler(
servicer.AlterIndex,
request_deserializer=milvus__pb2.AlterIndexRequest.FromString,
response_serializer=common__pb2.Status.SerializeToString,
),
'DescribeIndex': grpc.unary_unary_rpc_method_handler(
servicer.DescribeIndex,
request_deserializer=milvus__pb2.DescribeIndexRequest.FromString,
Expand Down Expand Up @@ -1087,6 +1114,11 @@ def add_MilvusServiceServicer_to_server(servicer, server):
request_deserializer=milvus__pb2.SearchRequest.FromString,
response_serializer=milvus__pb2.SearchResults.SerializeToString,
),
'SearchV2': grpc.unary_unary_rpc_method_handler(
servicer.SearchV2,
request_deserializer=milvus__pb2.SearchRequestV2.FromString,
response_serializer=milvus__pb2.SearchResults.SerializeToString,
),
'Flush': grpc.unary_unary_rpc_method_handler(
servicer.Flush,
request_deserializer=milvus__pb2.FlushRequest.FromString,
Expand Down Expand Up @@ -1745,6 +1777,23 @@ def CreateIndex(request,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def AlterIndex(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/milvus.proto.milvus.MilvusService/AlterIndex',
milvus__pb2.AlterIndexRequest.SerializeToString,
common__pb2.Status.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def DescribeIndex(request,
target,
Expand Down Expand Up @@ -1898,6 +1947,23 @@ def Search(request,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def SearchV2(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/milvus.proto.milvus.MilvusService/SearchV2',
milvus__pb2.SearchRequestV2.SerializeToString,
milvus__pb2.SearchResults.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def Flush(request,
target,
Expand Down
Loading

0 comments on commit cfc5187

Please sign in to comment.