diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 7bd8de7bc28..7966e772e68 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -16,6 +16,7 @@ package keyspace import ( "context" + "strconv" "time" "github.com/pingcap/errors" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/id" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/slice" @@ -72,7 +74,8 @@ type Manager struct { ctx context.Context // config is the configurations of the manager. config Config - kgm *GroupManager + // kgm is the keyspace group manager of the server. + kgm *GroupManager } // CreateKeyspaceRequest represents necessary arguments to create a keyspace. @@ -86,7 +89,9 @@ type CreateKeyspaceRequest struct { } // NewKeyspaceManager creates a Manager of keyspace related data. -func NewKeyspaceManager(store endpoint.KeyspaceStorage, +func NewKeyspaceManager( + ctx context.Context, + store endpoint.KeyspaceStorage, cluster schedule.Cluster, idAllocator id.Allocator, config Config, @@ -97,7 +102,7 @@ func NewKeyspaceManager(store endpoint.KeyspaceStorage, idAllocator: idAllocator, store: store, cluster: cluster, - ctx: context.TODO(), + ctx: ctx, config: config, kgm: kgm, } @@ -540,7 +545,18 @@ func (manager *Manager) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspac if startID > spaceIDMax { return nil, errors.Errorf("startID of the scan %d exceeds spaceID Max %d", startID, spaceIDMax) } - return manager.store.LoadRangeKeyspace(startID, limit) + var ( + keyspaces []*keyspacepb.KeyspaceMeta + err error + ) + err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { + keyspaces, err = manager.store.LoadRangeKeyspace(txn, startID, limit) + return err + }) + if err != nil { + return nil, err + } + return keyspaces, nil } // allocID allocate a new keyspace id. @@ -555,3 +571,67 @@ func (manager *Manager) allocID() (uint32, error) { } return id32, nil } + +// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups. +func (manager *Manager) PatrolKeyspaceAssignment() error { + // TODO: since the number of keyspaces might be large, we should consider to assign them in batches. + return manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { + defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID) + if err != nil { + return err + } + if defaultKeyspaceGroup == nil { + return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID) + } + if defaultKeyspaceGroup.IsSplitting() { + return ErrKeyspaceGroupInSplit + } + keyspaces, err := manager.store.LoadRangeKeyspace(txn, DefaultKeyspaceID, 0) + if err != nil { + return err + } + var ( + assigned = false + keyspaceIDsToUnlock = make([]uint32, 0, len(keyspaces)) + ) + defer func() { + for _, id := range keyspaceIDsToUnlock { + manager.metaLock.Unlock(id) + } + }() + for _, ks := range keyspaces { + if ks == nil { + continue + } + manager.metaLock.Lock(ks.Id) + if ks.Config == nil { + ks.Config = make(map[string]string, 1) + } else { + // If the keyspace already has a group ID, skip it. + _, ok := ks.Config[TSOKeyspaceGroupIDKey] + if ok { + manager.metaLock.Unlock(ks.Id) + continue + } + } + // Unlock the keyspace meta lock after the whole txn. + keyspaceIDsToUnlock = append(keyspaceIDsToUnlock, ks.Id) + // If the keyspace doesn't have a group ID, assign it to the default keyspace group. + if !slice.Contains(defaultKeyspaceGroup.Keyspaces, ks.Id) { + defaultKeyspaceGroup.Keyspaces = append(defaultKeyspaceGroup.Keyspaces, ks.Id) + assigned = true + } + ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10) + err = manager.store.SaveKeyspaceMeta(txn, ks) + if err != nil { + log.Error("[keyspace] failed to save keyspace meta during patrol", + zap.Uint32("ID", ks.Id), zap.Error(err)) + return err + } + } + if assigned { + return manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup) + } + return nil + }) +} diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index f1ef85711fd..f3d6d9a8971 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" @@ -60,7 +61,7 @@ func (suite *keyspaceTestSuite) SetupTest() { store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) allocator := mockid.NewIDAllocator() kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0) - suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm) + suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm) suite.NoError(kgm.Bootstrap()) suite.NoError(suite.manager.Bootstrap()) } @@ -354,3 +355,30 @@ func updateKeyspaceConfig(re *require.Assertions, manager *Manager, name string, oldMeta = updatedMeta } } + +func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() { + re := suite.Require() + // Create a keyspace without any keyspace group. + now := time.Now().Unix() + err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ + Id: 111, + Name: "111", + State: keyspacepb.KeyspaceState_ENABLED, + CreatedAt: now, + StateChangedAt: now, + }) + re.NoError(err) + // Check if the keyspace is not attached to the default group. + defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) + re.NoError(err) + re.NotNil(defaultKeyspaceGroup) + re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111)) + // Patrol the keyspace assignment. + err = suite.manager.PatrolKeyspaceAssignment() + re.NoError(err) + // Check if the keyspace is attached to the default group. + defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) + re.NoError(err) + re.NotNil(defaultKeyspaceGroup) + re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(111)) +} diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 9ea65e92b44..b65a68011e5 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -61,15 +61,9 @@ type GroupManager struct { // groups is the cache of keyspace group related information. // user kind -> keyspace group groups map[endpoint.UserKind]*indexedHeap - // patrolKeyspaceAssignmentOnce is used to indicate whether we have patrolled all keyspaces - // and assign them to the keyspace groups. - patrolKeyspaceAssignmentOnce bool // store is the storage for keyspace group related information. - store interface { - endpoint.KeyspaceGroupStorage - endpoint.KeyspaceStorage - } + store endpoint.KeyspaceGroupStorage client *clientv3.Client @@ -88,10 +82,7 @@ type GroupManager struct { // NewKeyspaceGroupManager creates a Manager of keyspace group related data. func NewKeyspaceGroupManager( ctx context.Context, - store interface { - endpoint.KeyspaceGroupStorage - endpoint.KeyspaceStorage - }, + store endpoint.KeyspaceGroupStorage, client *clientv3.Client, clusterID uint64, ) *GroupManager { @@ -170,38 +161,6 @@ func (m *GroupManager) Close() { m.wg.Wait() } -// patrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups. -func (m *GroupManager) patrolKeyspaceAssignment() error { - m.Lock() - defer m.Unlock() - if m.patrolKeyspaceAssignmentOnce { - return nil - } - keyspaces, err := m.store.LoadRangeKeyspace(utils.DefaultKeyspaceID, 0) - if err != nil { - return err - } - config, err := m.getKeyspaceConfigByKindLocked(endpoint.Basic) - if err != nil { - return err - } - for _, ks := range keyspaces { - if ks == nil { - continue - } - groupID, err := strconv.ParseUint(config[TSOKeyspaceGroupIDKey], 10, 64) - if err != nil { - return err - } - err = m.updateKeyspaceForGroupLocked(endpoint.Basic, groupID, ks.GetId(), opAdd) - if err != nil { - return err - } - } - m.patrolKeyspaceAssignmentOnce = true - return nil -} - func (m *GroupManager) allocNodesToAllKeyspaceGroups() { defer logutil.LogPanic() defer m.wg.Done() @@ -602,14 +561,10 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse // SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID. // And the keyspaces in the old keyspace group will be moved to the new keyspace group. func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error { - err := m.patrolKeyspaceAssignment() - if err != nil { - return err - } var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup m.Lock() defer m.Unlock() - if err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { // Load the old keyspace group first. splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID) if err != nil { diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index cfe035578e2..214c10bea7f 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -48,7 +47,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil, 0) idAllocator := mockid.NewIDAllocator() cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) - suite.kg = NewKeyspaceManager(store, cluster, idAllocator, &mockConfig{}, suite.kgm) + suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm) suite.NoError(suite.kgm.Bootstrap()) } @@ -321,31 +320,3 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444}) re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup) } - -func (suite *keyspaceGroupTestSuite) TestPatrolKeyspaceAssignment() { - re := suite.Require() - // Force the patrol to run once. - suite.kgm.patrolKeyspaceAssignmentOnce = false - // Create a keyspace group without any keyspace. - err := suite.kgm.CreateKeyspaceGroups([]*endpoint.KeyspaceGroup{ - { - ID: uint32(1), - UserKind: endpoint.Basic.String(), - Members: make([]endpoint.KeyspaceGroupMember, 2), - }, - }) - re.NoError(err) - // Create a keyspace without any keyspace group. - now := time.Now().Unix() - err = suite.kg.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ - Id: 111, - Name: "111", - State: keyspacepb.KeyspaceState_ENABLED, - CreatedAt: now, - StateChangedAt: now, - }) - re.NoError(err) - // Split to see if the keyspace is attached to the group. - err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{111}) - re.NoError(err) -} diff --git a/pkg/storage/endpoint/keyspace.go b/pkg/storage/endpoint/keyspace.go index 7aa82e8985b..09733ad59c1 100644 --- a/pkg/storage/endpoint/keyspace.go +++ b/pkg/storage/endpoint/keyspace.go @@ -41,7 +41,7 @@ type KeyspaceStorage interface { SaveKeyspaceID(txn kv.Txn, id uint32, name string) error LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32, error) // LoadRangeKeyspace loads no more than limit keyspaces starting at startID. - LoadRangeKeyspace(startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) + LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error } @@ -104,10 +104,10 @@ func (se *StorageEndpoint) RunInTxn(ctx context.Context, f func(txn kv.Txn) erro // LoadRangeKeyspace loads keyspaces starting at startID. // limit specifies the limit of loaded keyspaces. -func (se *StorageEndpoint) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) { +func (se *StorageEndpoint) LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) { startKey := KeyspaceMetaPath(startID) endKey := clientv3.GetPrefixRangeEnd(KeyspaceMetaPrefix()) - keys, values, err := se.LoadRange(startKey, endKey, limit) + keys, values, err := txn.LoadRange(startKey, endKey, limit) if err != nil { return nil, err } diff --git a/pkg/storage/keyspace_test.go b/pkg/storage/keyspace_test.go index 152d71afb5c..1236d1b34bd 100644 --- a/pkg/storage/keyspace_test.go +++ b/pkg/storage/keyspace_test.go @@ -72,9 +72,8 @@ func TestSaveLoadKeyspace(t *testing.T) { func TestLoadRangeKeyspaces(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() - - // Store test keyspace meta. keyspaces := makeTestKeyspaces() + // Store test keyspace meta. err := storage.RunInTxn(context.TODO(), func(txn kv.Txn) error { for _, keyspace := range keyspaces { re.NoError(storage.SaveKeyspaceMeta(txn, keyspace)) @@ -82,21 +81,26 @@ func TestLoadRangeKeyspaces(t *testing.T) { return nil }) re.NoError(err) + // Test load range keyspaces. + err = storage.RunInTxn(context.TODO(), func(txn kv.Txn) error { + // Load all keyspaces. + loadedKeyspaces, err := storage.LoadRangeKeyspace(txn, keyspaces[0].GetId(), 0) + re.NoError(err) + re.ElementsMatch(keyspaces, loadedKeyspaces) - // Load all keyspaces. - loadedKeyspaces, err := storage.LoadRangeKeyspace(keyspaces[0].GetId(), 0) - re.NoError(err) - re.ElementsMatch(keyspaces, loadedKeyspaces) + // Load keyspaces with id >= second test keyspace's id. + loadedKeyspaces2, err := storage.LoadRangeKeyspace(txn, keyspaces[1].GetId(), 0) + re.NoError(err) + re.ElementsMatch(keyspaces[1:], loadedKeyspaces2) - // Load keyspaces with id >= second test keyspace's id. - loadedKeyspaces2, err := storage.LoadRangeKeyspace(keyspaces[1].GetId(), 0) - re.NoError(err) - re.ElementsMatch(keyspaces[1:], loadedKeyspaces2) + // Load keyspace with the smallest id. + loadedKeyspace3, err := storage.LoadRangeKeyspace(txn, 1, 1) + re.NoError(err) + re.ElementsMatch(keyspaces[:1], loadedKeyspace3) - // Load keyspace with the smallest id. - loadedKeyspace3, err := storage.LoadRangeKeyspace(1, 1) + return nil + }) re.NoError(err) - re.ElementsMatch(keyspaces[:1], loadedKeyspace3) } func makeTestKeyspaces() []*keyspacepb.KeyspaceMeta { diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index a9f7d9d1395..2f9832066a5 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -17,6 +17,7 @@ package handlers import ( "net/http" "strconv" + "sync" "github.com/gin-gonic/gin" "github.com/pkg/errors" @@ -140,6 +141,11 @@ type SplitKeyspaceGroupByIDParams struct { Keyspaces []uint32 `json:"keyspaces"` } +var patrolKeyspaceAssignmentState struct { + sync.RWMutex + patrolled bool +} + // SplitKeyspaceGroupByID splits keyspace group by ID into a new keyspace group with the given new ID. // And the keyspaces in the old keyspace group will be moved to the new keyspace group. func SplitKeyspaceGroupByID(c *gin.Context) { @@ -164,8 +170,22 @@ func SplitKeyspaceGroupByID(c *gin.Context) { } svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - manager := svr.GetKeyspaceGroupManager() - err = manager.SplitKeyspaceGroupByID(id, splitParams.NewID, splitParams.Keyspaces) + patrolKeyspaceAssignmentState.Lock() + if !patrolKeyspaceAssignmentState.patrolled { + // Patrol keyspace assignment before splitting keyspace group. + manager := svr.GetKeyspaceManager() + err = manager.PatrolKeyspaceAssignment() + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + patrolKeyspaceAssignmentState.Unlock() + return + } + patrolKeyspaceAssignmentState.patrolled = true + } + patrolKeyspaceAssignmentState.Unlock() + // Split keyspace group. + groupManager := svr.GetKeyspaceGroupManager() + err = groupManager.SplitKeyspaceGroupByID(id, splitParams.NewID, splitParams.Keyspaces) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return diff --git a/server/server.go b/server/server.go index e83831364c7..e61d91c0c7a 100644 --- a/server/server.go +++ b/server/server.go @@ -443,7 +443,7 @@ func (s *Server) startServer(ctx context.Context) error { if s.IsAPIServiceMode() { s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client, s.clusterID) } - s.keyspaceManager = keyspace.NewKeyspaceManager(s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) + s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster) // initial hot_region_storage in here. s.hotRegionStorage, err = storage.NewHotRegionsStorage(