diff --git a/examples/resource_group.py b/examples/resource_group.py index b9dc078fe..cdfc46914 100644 --- a/examples/resource_group.py +++ b/examples/resource_group.py @@ -1,4 +1,4 @@ -from pymilvus import utility, connections +from pymilvus import utility, connections, DEFAULT_RESOURCE_GROUP from example import * _HOST = '127.0.0.1' @@ -15,9 +15,9 @@ # Create a Milvus connection -def create_connection(): +def create_connection(user, passwd): print(f"\nCreate connection...") - connections.connect(alias=_CONNECTION_NAME,host=_HOST, port=_PORT) + connections.connect(alias=_CONNECTION_NAME,host=_HOST, port=_PORT, user=user, password=passwd) print(f"\nList connections:") print(connections.list_connections()) @@ -52,39 +52,37 @@ def transfer_replica(source, target, collection_name, num_replica): f"transfer {num_replica} replicas in {collection_name} from {source} to {target}") utility.transfer_replica( source, target, collection_name, num_replica, using=_CONNECTION_NAME) - - -def run(): - create_connection() + +def run(): + create_connection("root", "123456") coll = create_collection(_COLLECTION_NAME, _ID_FIELD_NAME, _VECTOR_FIELD_NAME) vectors = insert(coll, 10000, _DIM) coll.flush() create_index(coll, _VECTOR_FIELD_NAME) - load_collection(coll) create_resource_group("rg") + list_resource_groups() describe_resource_group("rg") - transfer_node("__default_resource_group", "rg", 1) - describe_resource_group("__default_resource_group") + transfer_node(DEFAULT_RESOURCE_GROUP, "rg", 1) + describe_resource_group(DEFAULT_RESOURCE_GROUP) describe_resource_group("rg") + release_collection(coll) + coll.load(_resource_groups=["rg"]) + print("load finish") - transfer_node("rg", "__default_resource_group", 1) - describe_resource_group("__default_resource_group") + transfer_node("rg", DEFAULT_RESOURCE_GROUP, 1) + describe_resource_group(DEFAULT_RESOURCE_GROUP) describe_resource_group("rg") - - describe_resource_group("__default_resource_group") + describe_resource_group(DEFAULT_RESOURCE_GROUP) describe_resource_group("rg") - # transfer_replica("__default_resource_group", "rg", _COLLECTION_NAME, 1) - describe_resource_group("__default_resource_group") + transfer_replica("rg", DEFAULT_RESOURCE_GROUP, _COLLECTION_NAME, 1) + describe_resource_group(DEFAULT_RESOURCE_GROUP) describe_resource_group("rg") - drop_resource_group("rg") - release_collection(coll) drop_collection(_COLLECTION_NAME) - if __name__ == "__main__": run() diff --git a/pymilvus/__init__.py b/pymilvus/__init__.py index 8704bdef8..72e94648a 100644 --- a/pymilvus/__init__.py +++ b/pymilvus/__init__.py @@ -58,7 +58,7 @@ ) from .orm import utility -from .orm.default_config import DefaultConfig, ENV_CONNECTION_CONF +from .orm.default_config import DefaultConfig, ENV_CONNECTION_CONF, DEFAULT_RESOURCE_GROUP from .orm.search import SearchResult, Hits, Hit from .orm.schema import FieldSchema, CollectionSchema diff --git a/pymilvus/client/check.py b/pymilvus/client/check.py index 959318c49..1d503f4b5 100644 --- a/pymilvus/client/check.py +++ b/pymilvus/client/check.py @@ -214,6 +214,8 @@ def is_legal_partition_name_array(tag_array: Any) -> bool: return True +def is_legal_replica_number(replica_number: int) -> bool: + return isinstance(replica_number, int) # https://milvus.io/cn/docs/v1.0.0/metric.md#floating def is_legal_index_metric_type(index_type: str, metric_type: str) -> bool: @@ -341,6 +343,7 @@ def __init__(self) -> None: "privilege": is_legal_privilege, "operate_privilege_type": is_legal_operate_privilege_type, "properties": is_legal_collection_properties, + "replica_number": is_legal_replica_number, } def check(self, key, value): diff --git a/pymilvus/client/grpc_handler.py b/pymilvus/client/grpc_handler.py index 56dded9e7..12845b326 100644 --- a/pymilvus/client/grpc_handler.py +++ b/pymilvus/client/grpc_handler.py @@ -291,7 +291,7 @@ def drop_partition(self, collection_name, partition_name, timeout=None): raise MilvusException(response.error_code, response.reason) @retry_on_rpc_failure() - def has_partition(self, collection_name, partition_name, timeout=None): + def has_partition(self, collection_name, partition_name, timeout=None, **kwargs): check_pass_param(collection_name=collection_name, partition_name=partition_name) request = Prepare.has_partition_request(collection_name, partition_name) rf = self._stub.HasPartition.future(request, timeout=timeout) @@ -651,7 +651,7 @@ def wait_for_creating_index(self, collection_name, index_name, timeout=None, **k @retry_on_rpc_failure() def load_collection(self, collection_name, replica_number=1, timeout=None, **kwargs): - check_pass_param(collection_name=collection_name) + check_pass_param(collection_name=collection_name, replica_number=replica_number) _refresh = kwargs.get("_refresh", False) _resource_groups = kwargs.get("_resource_groups") request = Prepare.load_collection("", collection_name, replica_number, _refresh, _resource_groups) @@ -696,7 +696,10 @@ def release_collection(self, collection_name, timeout=None): @retry_on_rpc_failure() def load_partitions(self, collection_name, partition_names, replica_number=1, timeout=None, **kwargs): - check_pass_param(collection_name=collection_name, partition_name_array=partition_names) + check_pass_param( + collection_name=collection_name, + partition_name_array=partition_names, + replica_number=replica_number) _refresh = kwargs.get("_refresh", False) _resource_groups = kwargs.get("_resource_groups") request = Prepare.load_partitions("", collection_name, partition_names, replica_number, _refresh, @@ -1219,7 +1222,7 @@ def list_resource_groups(self, timeout=None, **kwargs): req = Prepare.list_resource_groups() resp = self._stub.ListResourceGroups(req, wait_for_ready=True, timeout=timeout) if resp.status.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + raise MilvusException(resp.status.error_code, resp.status.reason) return list(resp.resource_groups) @retry_on_rpc_failure() @@ -1227,7 +1230,7 @@ def describe_resource_group(self, name, timeout=None, **kwargs) -> ResourceGroup req = Prepare.describe_resource_group(name) resp = self._stub.DescribeResourceGroup(req, wait_for_ready=True, timeout=timeout) if resp.status.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + raise MilvusException(resp.status.error_code, resp.status.reason) return ResourceGroupInfo(resp.resource_group) @retry_on_rpc_failure() diff --git a/pymilvus/decorators.py b/pymilvus/decorators.py index aee37de21..d37c1e80f 100644 --- a/pymilvus/decorators.py +++ b/pymilvus/decorators.py @@ -117,8 +117,8 @@ def handler(*args, **kwargs): raise e except Exception as e: record_dict["Exception"] = str(datetime.datetime.now()) - LOGGER.error(f"Unexcepted error: [{inner_name}], {e}, ") - raise e + LOGGER.error(f"Unexpected error: [{inner_name}], {e}, ") + raise MilvusException(message=f"Unexpected error, message=<{str(e)}>") from e return handler return wrapper diff --git a/pymilvus/orm/connections.py b/pymilvus/orm/connections.py index 7c178ff65..2186fccc7 100644 --- a/pymilvus/orm/connections.py +++ b/pymilvus/orm/connections.py @@ -269,7 +269,7 @@ def connect_milvus(**kwargs): gh = GrpcHandler(**kwargs) t = kwargs.get("timeout") - timeout = t if isinstance(t, int) else DefaultConfig.DEFAULT_CONNECT_TIMEOUT + timeout = t if isinstance(t, (int, float)) else DefaultConfig.DEFAULT_CONNECT_TIMEOUT gh._wait_for_channel_ready(timeout=timeout) kwargs.pop('password') diff --git a/pymilvus/orm/default_config.py b/pymilvus/orm/default_config.py index 7f2377726..3eaafb8fd 100644 --- a/pymilvus/orm/default_config.py +++ b/pymilvus/orm/default_config.py @@ -15,7 +15,9 @@ class DefaultConfig: DEFAULT_USING = "default" DEFAULT_HOST = "localhost" DEFAULT_PORT = "19530" - DEFAULT_CONNECT_TIMEOUT = 3 + DEFAULT_CONNECT_TIMEOUT = 10 ENV_CONNECTION_CONF = "MILVUS_DEFAULT_CONNECTION" + +DEFAULT_RESOURCE_GROUP = "__default_resource_group"