Skip to content

Commit

Permalink
sync rg changes
Browse files Browse the repository at this point in the history
  fix describe/list rg err msg (milvus-io#1291)

  fix load collection (milvus-io#1293)

  add param check (milvus-io#1297)

  expose rg info in replica (milvus-io#1305)

  refine rg api (milvus-io#1310)

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Feb 22, 2023
1 parent 9e4b20f commit a1525be
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 21 deletions.
8 changes: 8 additions & 0 deletions examples/resource_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ def transfer_replica(source, target, collection_name, num_replica):
utility.transfer_replica(
source, target, collection_name, num_replica, using=_CONNECTION_NAME)


def run():
create_connection("root", "123456")
coll = create_collection(_COLLECTION_NAME, _ID_FIELD_NAME, _VECTOR_FIELD_NAME)
vectors = insert(coll, 10000, _DIM)
coll.flush()
create_index(coll, _VECTOR_FIELD_NAME)
load_collection(coll)

create_resource_group("rg")
list_resource_groups()
Expand All @@ -74,6 +76,12 @@ def run():
describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")

describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")
transfer_replica("rg", DEFAULT_RESOURCE_GROUP, _COLLECTION_NAME, 1)
describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")

describe_resource_group(DEFAULT_RESOURCE_GROUP)
describe_resource_group("rg")
transfer_replica("rg", DEFAULT_RESOURCE_GROUP, _COLLECTION_NAME, 1)
Expand Down
1 change: 1 addition & 0 deletions pymilvus/client/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ def __init__(self) -> None:
"operate_privilege_type": is_legal_operate_privilege_type,
"properties": is_legal_collection_properties,
"replica_number": is_legal_replica_number,
"resource_group_name": is_legal_table_name,
}

def check(self, key, value):
Expand Down
3 changes: 2 additions & 1 deletion pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,8 @@ def get_replicas(self, collection_name, timeout=None, **kwargs) -> Replica:
groups = []
for replica in response.replicas:
shards = [Shard(s.dm_channel_name, s.node_ids, s.leaderID) for s in replica.shard_replicas]
groups.append(Group(replica.replicaID, shards, replica.node_ids))
groups.append(Group(replica.replicaID, shards, replica.node_ids, replica.resource_group_name,
replica.num_outbound_node))

return Replica(groups)

Expand Down
7 changes: 7 additions & 0 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,12 @@ def get_server_version(cls):

@classmethod
def create_resource_group(cls, name):
check_pass_param(resource_group_name=name)
return milvus_types.CreateResourceGroupRequest(resource_group=name)

@classmethod
def drop_resource_group(cls, name):
check_pass_param(resource_group_name=name)
return milvus_types.DropResourceGroupRequest(resource_group=name)

@classmethod
Expand All @@ -834,16 +836,21 @@ def list_resource_groups(cls):

@classmethod
def describe_resource_group(cls, name):
check_pass_param(resource_group_name=name)
return milvus_types.DescribeResourceGroupRequest(resource_group=name)

@classmethod
def transfer_node(cls, source, target, num_node):
check_pass_param(resource_group_name=source)
check_pass_param(resource_group_name=target)
return milvus_types.TransferNodeRequest(source_resource_group=source,
target_resource_group=target,
num_node=num_node)

@classmethod
def transfer_replica(cls, source, target, collection_name, num_replica):
check_pass_param(resource_group_name=source)
check_pass_param(resource_group_name=target)
return milvus_types.TransferReplicaRequest(source_resource_group=source,
target_resource_group=target,
collection_name=collection_name,
Expand Down
11 changes: 10 additions & 1 deletion pymilvus/client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,12 @@ def shard_leader(self) -> int:


class Group:
def __init__(self, group_id: int, shards: list, group_nodes: list):
def __init__(self, group_id: int, shards: list, group_nodes: list, resource_group: str, num_outbound_node: dict):
self._id = group_id
self._shards = shards
self._group_nodes = tuple(group_nodes)
self._resource_group = resource_group
self._num_outbound_node = num_outbound_node

def __repr__(self) -> str:
s = f"Group: <group_id:{self.id}>, <group_nodes:{self.group_nodes}>, <shards:{self.shards}>"
Expand All @@ -332,6 +334,13 @@ def group_nodes(self):
def shards(self):
return self._shards

@property
def resource_group(self):
return self._resource_group

@property
def num_outbound_node(self):
return self._num_outbound_node

class Replica:
"""
Expand Down
34 changes: 17 additions & 17 deletions pymilvus/orm/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,39 +1000,39 @@ def list_resource_groups(using="default", timeout=None):
return _get_connection(using).list_resource_groups(timeout)


def transfer_node(source, target, num_node, using="default", timeout=None):
def transfer_node(source_group, target_group, num_nodes, using="default", timeout=None):
"""transfer num_node from source resource group to target resource_group
:param source: source resource group name
:type source: str
:param target: target resource group name
:type target: str
:param num_node: transfer node num
:type num_node: int
:param source_group: source resource group name
:type source_group: str
:param target_group: target resource group name
:type target_group: str
:param num_nodes: transfer node num
:type num_nodes: int
:example:
>>> from pymilvus import connections, utility
>>> connections.connect()
>>> rgs = utility.transfer_node(source, target, num_node)
>>> rgs = utility.transfer_node(source_group, target_group, num_nodes)
"""
return _get_connection(using).transfer_node(source, target, num_node, timeout)
return _get_connection(using).transfer_node(source_group, target_group, num_nodes, timeout)


def transfer_replica(source, target, collection_name, num_replica, using="default", timeout=None):
def transfer_replica(source_group, target_group, collection_name, num_replicas, using="default", timeout=None):
"""transfer num_replica from source resource group to target resource group
:param source: source resource group name
:type source: str
:param target: target resource group name
:type target: str
:param source_group: source resource group name
:type source_group: str
:param target_group: target resource group name
:type target_group: str
:param collection_name: collection name which replica belong to
:type collection_name: str
:param num_replica: transfer replica num
:type num_replica: int
:param num_replicas: transfer replica num
:type num_replicas: int
:example:
>>> from pymilvus import connections, utility
>>> connections.connect()
>>> rgs = utility.transfer_replica(source, target, collection_name, num_replica)
"""
return _get_connection(using).transfer_replica(source, target, collection_name, num_replica, timeout)
return _get_connection(using).transfer_replica(source_group, target_group, collection_name, num_replicas, timeout)
4 changes: 2 additions & 2 deletions tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.

from pymilvus import DataType
from pymilvus import DataType, DEFAULT_RESOURCE_GROUP
from pymilvus.exceptions import InvalidConsistencyLevel
from pymilvus.client.types import (
get_consistency_level, ConsistencyLevel,
Expand Down Expand Up @@ -128,7 +128,7 @@ def test_shard(self):
assert s.shard_leader == 1
print(s)

g = Group(2, [s], [1, 2, 3])
g = Group(2, [s], [1, 2, 3], DEFAULT_RESOURCE_GROUP, {})
assert g.id == 2
assert g.shards == [s]
assert g.group_nodes == (1, 2, 3)
Expand Down

0 comments on commit a1525be

Please sign in to comment.