Skip to content

Commit

Permalink
Sync latest codes from master
Browse files Browse the repository at this point in the history
- fix describe/list rg err msg (milvus-io#1291)
- fix load collection (milvus-io#1293)
- Change default connect timeout to 10s (milvus-io#1294)
- add param check (milvus-io#1297)
- Support rename collection API (milvus-io#1299)
- expose rg info in replica (milvus-io#1305)
- fix check string max len (milvus-io#1304)
- Support upsert (milvus-io#1303)
- expose rg info (milvus-io#1307)
- Add ignore_growing option for query and search (milvus-io#1286)
- refine rg api (milvus-io#1310)
- Fix some error (milvus-io#1312)
- Add GPU index types (milvus-io#1314)

Signed-off-by: yangxuan <[email protected]>

Co-authored-by: Wei Liu <[email protected]>
Co-authored-by: jaime <[email protected]>
Co-authored-by: Ievgen Zapolskyi <[email protected]>
Co-authored-by: lixinguo <[email protected]>
Co-authored-by: aoiasd <[email protected]>
Co-authored-by: Yudong Cai <[email protected]>
  • Loading branch information
6 people authored and XuanYang-cn committed Mar 1, 2023
1 parent 51fb7c1 commit f36c75c
Show file tree
Hide file tree
Showing 23 changed files with 450 additions and 182 deletions.
2 changes: 2 additions & 0 deletions docs/source/api/collection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Methods
+---------------------------------------------------------------+--------------------------------------------------------------------------+
| `search() <#pymilvus.Collection.search>`_ | Vector similarity search with an optional boolean expression as filters. |
+---------------------------------------------------------------+--------------------------------------------------------------------------+
| `upsert() <#pymilvus.Collection.upsert>`_ | Upsert data of collection. |
+---------------------------------------------------------------+--------------------------------------------------------------------------+
| `query() <#pymilvus.Collection.query>`_ | Query with a set of criteria. |
+---------------------------------------------------------------+--------------------------------------------------------------------------+
| `partition() <#pymilvus.Collection.partition>`_ | Return the partition corresponding to name. |
Expand Down
2 changes: 2 additions & 0 deletions docs/source/api/partition.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Methods
+--------------------------------------------+--------------------------------------------------------------------------+
| `delete() <#pymilvus.Partition.delete>`_ | Delete entities with an expression condition. |
+--------------------------------------------+--------------------------------------------------------------------------+
| `upsert() <#pymilvus.Collection.upsert>`_ |Upsert data of collection. |
+--------------------------------------------+--------------------------------------------------------------------------+
| `search() <#pymilvus.Partition.search>`_ | Vector similarity search with an optional boolean expression as filters. |
+--------------------------------------------+--------------------------------------------------------------------------+
| `query() <#pymilvus.Partition.query>`_ | Query with a set of criteria. |
Expand Down
23 changes: 21 additions & 2 deletions examples/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
default_float_vec_field_name = "float_vector"
default_binary_vec_field_name = "binary_vector"


all_index_types = [
"FLAT",
"IVF_FLAT",
Expand Down Expand Up @@ -55,7 +54,6 @@
{"nlist": 128}
]


default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"}
default_binary_index = {"index_type": "BIN_FLAT", "params": {"nlist": 1024}, "metric_type": "JACCARD"}

Expand Down Expand Up @@ -300,6 +298,25 @@ def alias_cases():
alias_cases()


def test_rename_collection():
connections.connect(alias="default")
schema = CollectionSchema(fields=[
FieldSchema("int64", DataType.INT64, description="int64", is_primary=True),
FieldSchema("float_vector", DataType.FLOAT_VECTOR, is_primary=False, dim=128),
])

old_collection = "old_collection"
new_collection = "new_collection"
Collection(old_collection, schema=schema)

print("\nlist collections:")
print(utility.list_collections())
assert utility.has_collection(old_collection)

utility.rename_collection(old_collection, new_collection)
assert utility.has_collection(new_collection)


if __name__ == "__main__":
print("test collection and get an existing collection")
name = test_create_collection()
Expand All @@ -317,4 +334,6 @@ def alias_cases():
test_specify_primary_key()
print("test alias")
test_alias()
print("test rename collection")
test_rename_collection()
print("test end")
36 changes: 17 additions & 19 deletions examples/resource_group.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pymilvus import utility, connections
from pymilvus import utility, connections, DEFAULT_RESOURCE_GROUP
from example import *

_HOST = '127.0.0.1'
Expand All @@ -15,9 +15,9 @@

# Create a Milvus connection

def create_connection():
def create_connection(user, passwd):
print(f"\nCreate connection...")
connections.connect(alias=_CONNECTION_NAME,host=_HOST, port=_PORT)
connections.connect(alias=_CONNECTION_NAME,host=_HOST, port=_PORT, user=user, password=passwd)
print(f"\nList connections:")
print(connections.list_connections())

Expand Down Expand Up @@ -52,39 +52,37 @@ def transfer_replica(source, target, collection_name, num_replica):
f"transfer {num_replica} replicas in {collection_name} from {source} to {target}")
utility.transfer_replica(
source, target, collection_name, num_replica, using=_CONNECTION_NAME)


def run():
create_connection()

def run():
create_connection("root", "123456")
coll = create_collection(_COLLECTION_NAME, _ID_FIELD_NAME, _VECTOR_FIELD_NAME)
vectors = insert(coll, 10000, _DIM)
coll.flush()
create_index(coll, _VECTOR_FIELD_NAME)
load_collection(coll)

create_resource_group("rg")
list_resource_groups()
describe_resource_group("rg")
transfer_node("__default_resource_group", "rg", 1)
describe_resource_group("__default_resource_group")
transfer_node(DEFAULT_RESOURCE_GROUP, "rg", 1)
describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")
release_collection(coll)
coll.load(_resource_groups=["rg"])
print("load finish")

transfer_node("rg", "__default_resource_group", 1)
describe_resource_group("__default_resource_group")
transfer_node("rg", DEFAULT_RESOURCE_GROUP, 1)
describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")


describe_resource_group("__default_resource_group")
describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")
# transfer_replica("__default_resource_group", "rg", _COLLECTION_NAME, 1)
describe_resource_group("__default_resource_group")
transfer_replica("rg", DEFAULT_RESOURCE_GROUP, _COLLECTION_NAME, 1)
describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")


drop_resource_group("rg")

release_collection(coll)
drop_collection(_COLLECTION_NAME)


if __name__ == "__main__":
run()
2 changes: 1 addition & 1 deletion pymilvus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
)

from .orm import utility
from .orm.default_config import DefaultConfig, ENV_CONNECTION_CONF
from .orm.default_config import DefaultConfig, ENV_CONNECTION_CONF, DEFAULT_RESOURCE_GROUP

from .orm.search import SearchResult, Hits, Hit
from .orm.schema import FieldSchema, CollectionSchema
Expand Down
12 changes: 6 additions & 6 deletions pymilvus/client/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,16 @@ def is_legal_replica_number(replica_number: int) -> bool:

# https://milvus.io/cn/docs/v1.0.0/metric.md#floating
def is_legal_index_metric_type(index_type: str, metric_type: str) -> bool:
if index_type not in ("FLAT",
if index_type not in ("GPU_FLAT"
"GPU_IVF_FLAT"
"GPU_IVF_SQ8"
"GPU_IVF_PQ"
"FLAT",
"IVF_FLAT",
"IVF_SQ8",
# "IVF_SQ8_HYBRID",
"IVF_PQ",
"HNSW",
# "NSG",
"ANNOY",
"RHNSW_FLAT",
"RHNSW_PQ",
"RHNSW_SQ",
"AUTOINDEX",
"DISKANN"):
return False
Expand Down Expand Up @@ -344,6 +343,7 @@ def __init__(self) -> None:
"operate_privilege_type": is_legal_operate_privilege_type,
"properties": is_legal_collection_properties,
"replica_number": is_legal_replica_number,
"resource_group_name": is_legal_table_name,
}

def check(self, key, value):
Expand Down
5 changes: 3 additions & 2 deletions pymilvus/client/entity_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ def check_str_arr(str_arr, max_len):
for s in str_arr:
if not isinstance(s, str):
raise ParamError(message=f"expect string input, got: {type(s)}")
if len(s) >= max_len:
raise ParamError(message=f"invalid input, length of string exceeds max length. length: {len(s)}, max length: {max_len}")
if len(s) > max_len:
raise ParamError(message=f"invalid input, length of string exceeds max length. length: {len(s)}, "
f"max length: {max_len}")


def entity_to_str_arr(entity, field_info, check=True):
Expand Down
71 changes: 59 additions & 12 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ def list_collections(self, timeout=None):

raise MilvusException(status.error_code, status.reason)

@retry_on_rpc_failure()
def rename_collections(self, old_name=None, new_name=None, timeout=None):
check_pass_param(collection_name=new_name)
check_pass_param(collection_name=old_name)
request = Prepare().rename_collections_request(old_name, new_name)
rf = self._stub.RenameCollection.future(request, timeout=timeout)
response = rf.result()

if response.error_code != 0:
raise MilvusException(response.error_code, response.reason)

@retry_on_rpc_failure()
def create_partition(self, collection_name, partition_name, timeout=None):
check_pass_param(collection_name=collection_name, partition_name=partition_name)
Expand All @@ -291,7 +302,7 @@ def drop_partition(self, collection_name, partition_name, timeout=None):
raise MilvusException(response.error_code, response.reason)

@retry_on_rpc_failure()
def has_partition(self, collection_name, partition_name, timeout=None):
def has_partition(self, collection_name, partition_name, timeout=None, **kwargs):
check_pass_param(collection_name=collection_name, partition_name=partition_name)
request = Prepare.has_partition_request(collection_name, partition_name)
rf = self._stub.HasPartition.future(request, timeout=timeout)
Expand Down Expand Up @@ -342,22 +353,28 @@ def get_partition_stats(self, collection_name, partition_name, timeout=None, **k

raise MilvusException(status.error_code, status.reason)

def _prepare_batch_insert_request(self, collection_name, entities, partition_name=None, timeout=None, **kwargs):
insert_param = kwargs.get('insert_param', None)
def _prepare_batch_insert_or_upsert_request(self, collection_name, entities, partition_name=None, timeout=None, isInsert=True, **kwargs):
param = kwargs.get('insert_param', None)

if not isInsert:
param = kwargs.get('upsert_param', None)

if insert_param and not isinstance(insert_param, milvus_types.RowBatch):
raise ParamError(message="The value of key 'insert_param' is invalid")
if param and not isinstance(param, milvus_types.RowBatch):
if isInsert:
raise ParamError(message="The value of key 'insert_param' is invalid")
raise ParamError(message="The value of key 'upsert_param' is invalid")
if not isinstance(entities, list):
raise ParamError(message="None entities, please provide valid entities.")

collection_schema = kwargs.get("schema", None)
if not collection_schema:
collection_schema = self.describe_collection(collection_name, timeout=timeout, **kwargs)
collection_schema = self.describe_collection(
collection_name, timeout=timeout, **kwargs)

fields_info = collection_schema["fields"]

request = insert_param if insert_param \
else Prepare.batch_insert_param(collection_name, entities, partition_name, fields_info)
request = param if param \
else Prepare.batch_insert_or_upsert_param(collection_name, entities, partition_name, fields_info, isInsert)

return request

Expand All @@ -367,7 +384,8 @@ def batch_insert(self, collection_name, entities, partition_name=None, timeout=N
raise ParamError(message="Invalid binary vector data exists")

try:
request = self._prepare_batch_insert_request(collection_name, entities, partition_name, timeout, **kwargs)
request = self._prepare_batch_insert_or_upsert_request(
collection_name, entities, partition_name, timeout, **kwargs)
rf = self._stub.Insert.future(request, timeout=timeout)
if kwargs.get("_async", False) is True:
cb = kwargs.get("_callback", None)
Expand Down Expand Up @@ -412,6 +430,34 @@ def delete(self, collection_name, expression, partition_name=None, timeout=None,
return MutationFuture(None, None, err)
raise err

@retry_on_rpc_failure()
def upsert(self, collection_name, entities, partition_name=None, timeout=None, **kwargs):
if not check_invalid_binary_vector(entities):
raise ParamError(message="Invalid binary vector data exists")

try:
request = self._prepare_batch_insert_or_upsert_request(
collection_name, entities, partition_name, timeout, False, **kwargs)
rf = self._stub.Upsert.future(request, timeout=timeout)
if kwargs.get("_async", False) is True:
cb = kwargs.get("_callback", None)
f = MutationFuture(rf, cb, timeout=timeout, **kwargs)
f.add_callback(ts_utils.update_ts_on_mutation(collection_name))
return f

response = rf.result()
if response.status.error_code == 0:
m = MutationResult(response)
ts_utils.update_collection_ts(collection_name, m.timestamp)
return m

raise MilvusException(
response.status.error_code, response.status.reason)
except Exception as err:
if kwargs.get("_async", False):
return MutationFuture(None, None, err)
raise err

def _execute_search_requests(self, requests, timeout=None, **kwargs):
auto_id = kwargs.get("auto_id", True)

Expand Down Expand Up @@ -1032,7 +1078,8 @@ def get_replicas(self, collection_name, timeout=None, **kwargs) -> Replica:
groups = []
for replica in response.replicas:
shards = [Shard(s.dm_channel_name, s.node_ids, s.leaderID) for s in replica.shard_replicas]
groups.append(Group(replica.replicaID, shards, replica.node_ids))
groups.append(Group(replica.replicaID, shards, replica.node_ids, replica.resource_group_name,
replica.num_outbound_node))

return Replica(groups)

Expand Down Expand Up @@ -1222,15 +1269,15 @@ def list_resource_groups(self, timeout=None, **kwargs):
req = Prepare.list_resource_groups()
resp = self._stub.ListResourceGroups(req, wait_for_ready=True, timeout=timeout)
if resp.status.error_code != 0:
raise MilvusException(resp.error_code, resp.reason)
raise MilvusException(resp.status.error_code, resp.status.reason)
return list(resp.resource_groups)

@retry_on_rpc_failure()
def describe_resource_group(self, name, timeout=None, **kwargs) -> ResourceGroupInfo:
req = Prepare.describe_resource_group(name)
resp = self._stub.DescribeResourceGroup(req, wait_for_ready=True, timeout=timeout)
if resp.status.error_code != 0:
raise MilvusException(resp.error_code, resp.reason)
raise MilvusException(resp.status.error_code, resp.status.reason)
return ResourceGroupInfo(resp.resource_group)

@retry_on_rpc_failure()
Expand Down
Loading

0 comments on commit f36c75c

Please sign in to comment.