diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 0760bd1fbb5..324c8e5cad5 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -28,7 +28,9 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core/storelimit" mcs "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" @@ -377,3 +379,122 @@ func (suite *serverTestSuite) TestForwardRegionHeartbeat() { reflect.DeepEqual(region.GetDownPeers(), downPeers) && reflect.DeepEqual(region.GetPendingPeers(), pendingPeers) }) } + +func (suite *serverTestSuite) TestStoreLimit() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + oc := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetOperatorController() + leaderServer := suite.pdLeader.GetServer() + conf := leaderServer.GetReplicationConfig().Clone() + conf.MaxReplicas = 1 + leaderServer.SetReplicationConfig(*conf) + grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr()) + for i := uint64(1); i <= 2; i++ { + resp, err := grpcPDClient.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + + stream, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + for i := uint64(2); i <= 10; i++ { + peers := []*metapb.Peer{{Id: i, StoreId: 1}} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()), + Region: &metapb.Region{ + Id: i, + Peers: peers, + StartKey: []byte(fmt.Sprintf("t%d", i)), + EndKey: []byte(fmt.Sprintf("t%d", i+1)), + }, + Leader: peers[0], + ApproximateSize: 10 * units.MiB, + } + err = stream.Send(regionReq) + re.NoError(err) + } + + leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.AddPeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.RemovePeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60) + // There is a time window between setting store limit in API service side and capturing the change in scheduling service. + waitSyncFinish(re, tc, storelimit.AddPeer, 60) + for i := uint64(1); i <= 5; i++ { + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 120) + waitSyncFinish(re, tc, storelimit.AddPeer, 120) + for i := uint64(1); i <= 10; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.AddPeer, 60) + waitSyncFinish(re, tc, storelimit.AddPeer, 60) + for i := uint64(1); i <= 5; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60) + waitSyncFinish(re, tc, storelimit.RemovePeer, 60) + for i := uint64(1); i <= 5; i++ { + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 120) + waitSyncFinish(re, tc, storelimit.RemovePeer, 120) + for i := uint64(1); i <= 10; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.RemovePeer, 60) + waitSyncFinish(re, tc, storelimit.RemovePeer, 60) + for i := uint64(1); i <= 5; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorFail(re, oc, op) +} + +func checkOperatorSuccess(re *require.Assertions, oc *operator.Controller, op *operator.Operator) { + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + re.True(op.IsEnd()) + re.Equal(op, oc.GetOperatorStatus(op.RegionID()).Operator) +} + +func checkOperatorFail(re *require.Assertions, oc *operator.Controller, op *operator.Operator) { + re.False(oc.AddOperator(op)) + re.False(oc.RemoveOperator(op)) +} + +func waitSyncFinish(re *require.Assertions, tc *tests.TestSchedulingCluster, typ storelimit.Type, expectedLimit float64) { + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetPersistConfig().GetStoreLimitByType(2, typ) == expectedLimit + }) +}