Skip to content

Commit

Permalink
enhance: Enable setting the replica number and resource group during …
Browse files Browse the repository at this point in the history
…collection creation (#34403) (#34561)

issue: #30040
pr: #34403

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Aug 6, 2024
1 parent eebd980 commit 2ac1bf7
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 29 deletions.
41 changes: 33 additions & 8 deletions internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,48 @@ func (broker *CoordinatorBroker) DescribeDatabase(ctx context.Context, dbName st

// try to get database level replica_num and resource groups, return (resource_groups, replica_num, error)
func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) {
// to do by weiliu1031: querycoord should cache mappings: collectionID->dbName
collectionInfo, err := broker.DescribeCollection(ctx, collectionID)
if err != nil {
return nil, 0, err
}

dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
replicaNum, err := common.CollectionLevelReplicaNumber(collectionInfo.GetProperties())
if err != nil {
return nil, 0, err
log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if replicaNum > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
replicaNum, err := common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())

rgs, err := common.CollectionLevelResourceGroups(collectionInfo.GetProperties())
if err != nil {
return nil, 0, err
log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if len(rgs) > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
rgs, err := common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
return nil, 0, err

if replicaNum <= 0 || len(rgs) == 0 {
dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
if err != nil {
return nil, 0, err
}

if replicaNum <= 0 {
replicaNum, err = common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
if err != nil {
log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if replicaNum > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
}

if len(rgs) == 0 {
rgs, err = common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if len(rgs) > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
}
}

return rgs, replicaNum, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/meta/coordinator_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (s *CoordinatorBrokerRootCoordSuite) TestGetCollectionLoadInfo() {
Properties: []*commonpb.KeyValuePair{},
}, nil)
_, _, err := s.broker.GetCollectionLoadInfo(ctx, 1)
s.Error(err)
s.NoError(err)
s.resetMock()
})
}
Expand Down
20 changes: 9 additions & 11 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,20 +217,18 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
}

if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use database level config
// when replica number or resource groups is not set, use pre-defined load config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to get data base level load info", zap.Error(err))
}

if req.GetReplicaNumber() <= 0 {
log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas))
req.ReplicaNumber = int32(replicas)
}
log.Warn("failed to get pre-defined load info", zap.Error(err))
} else {
if req.GetReplicaNumber() <= 0 && replicas > 0 {
req.ReplicaNumber = int32(replicas)
}

if len(req.GetResourceGroups()) == 0 {
log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs))
req.ResourceGroups = rgs
if len(req.GetResourceGroups()) == 0 && len(rgs) > 0 {
req.ResourceGroups = rgs
}
}
}

Expand Down
39 changes: 39 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ const (
DatabaseDiskQuotaKey = "database.diskQuota.mb"
DatabaseMaxCollectionsKey = "database.max.collections"
DatabaseForceDenyWritingKey = "database.force.deny.writing"

// collection level load properties
CollectionReplicaNumber = "collection.replica.number"
CollectionResourceGroups = "collection.resource_groups"
)

// common properties
Expand Down Expand Up @@ -288,3 +292,38 @@ func DatabaseLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error)

return nil, fmt.Errorf("database property not found: %s", DatabaseResourceGroups)
}

func CollectionLevelReplicaNumber(kvs []*commonpb.KeyValuePair) (int64, error) {
for _, kv := range kvs {
if kv.Key == CollectionReplicaNumber {
replicaNum, err := strconv.ParseInt(kv.Value, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)
}

return replicaNum, nil
}
}

return 0, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}

func CollectionLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) {
for _, kv := range kvs {
if kv.Key == CollectionResourceGroups {
invalidPropValue := fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)
if len(kv.Value) == 0 {
return nil, invalidPropValue
}

rgs := strings.Split(kv.Value, ",")
if len(rgs) == 0 {
return nil, invalidPropValue
}

return rgs, nil
}
}

return nil, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}
187 changes: 180 additions & 7 deletions tests/integration/replicas/load/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,29 @@ func (s *LoadTestSuite) SetupSuite() {
s.Require().NoError(s.SetupEmbedEtcd())
}

func (s *LoadTestSuite) loadCollection(collectionName string, replica int, rgs []string) {
func (s *LoadTestSuite) loadCollection(collectionName string, db string, replica int, rgs []string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// load
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
DbName: db,
CollectionName: collectionName,
ReplicaNumber: int32(replica),
ResourceGroups: rgs,
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.WaitForLoad(ctx, collectionName)
s.WaitForLoadWithDB(ctx, db, collectionName)
}

func (s *LoadTestSuite) releaseCollection(collectionName string) {
func (s *LoadTestSuite) releaseCollection(db, collectionName string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// load
status, err := s.Cluster.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
DbName: dbName,
DbName: db,
CollectionName: collectionName,
})
s.NoError(err)
Expand Down Expand Up @@ -171,15 +171,188 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() {
s.Len(resp1.GetProperties(), 2)

// load collection without specified replica and rgs
s.loadCollection(collectionName, 0, nil)
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(collectionName)
s.releaseCollection(dbName, collectionName)
}

func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() {
ctx := context.Background()

// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},

TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}

resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)

for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}

s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)

s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
ReplicaNumber: 3,
ResourceGroups: rgs,
})

// load collection without specified replica and rgs
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(dbName, collectionName)
}

func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() {
ctx := context.Background()

// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},

TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}

resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)

for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}

s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)

newDbName := "db_load_test_with_db_level_config"
resp1, err := s.Cluster.Proxy.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{
DbName: newDbName,
Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseReplicaNumber,
Value: "3",
},
{
Key: common.DatabaseResourceGroups,
Value: strings.Join(rgs, ","),
},
},
})
s.NoError(err)
s.True(merr.Ok(resp1))

s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: newDbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
})

// load collection without specified replica and rgs
s.loadCollection(collectionName, newDbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: newDbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(newDbName, collectionName)
}

func TestReplicas(t *testing.T) {
Expand Down
Loading

0 comments on commit 2ac1bf7

Please sign in to comment.