Skip to content

Commit

Permalink
bootstrap default keyspace group in the tso service (#6306)
Browse files Browse the repository at this point in the history
ref #6232

Changes:

1. Introduce the initialization logic of the default keyspace group.
- If the default keyspace group isn't configured in the etcd, every tso node/pod should initialize it and join the election for the primary of this group.
- If the default keyspace group is configured in the etcd, the tso nodes/pods which are assigned with this group will initialize it and join the election for the primary of this group.

2. Introduce the keyspace group membership restriction -- default keyspace always belongs to default keyspace group.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored Apr 14, 2023
1 parent 4201238 commit 0e16783
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 130 deletions.
14 changes: 8 additions & 6 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupSt

// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
// Force the membership restriction that the default keyspace must belong to default keyspace group.
// Have no information to specify the distribution of the default keyspace group replicas, so just
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
// to every tso node/pod by default.
defaultKeyspaceGroup := &endpoint.KeyspaceGroup{
ID: utils.DefaultKeySpaceGroupID,
UserKind: endpoint.Basic.String(),
ID: utils.DefaultKeyspaceGroupID,
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{utils.DefaultKeyspaceID},
}

m.Lock()
Expand All @@ -71,11 +76,8 @@ func (m *GroupManager) Bootstrap() error {
return err
}

userKind := endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)
m.groups[userKind].Put(defaultKeyspaceGroup)

// Load all the keyspace groups from the storage and add to the respective userKind groups.
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeySpaceGroupID, 0)
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) er
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
Expand Down
9 changes: 4 additions & 5 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *Server) IsServing() bool {
}

member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID)
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
Expand All @@ -221,7 +221,7 @@ func (s *Server) IsServing() bool {
// The entry at the index 0 is the primary's service endpoint.
func (s *Server) GetLeaderListenUrls() []string {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID)
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return nil
Expand Down Expand Up @@ -436,15 +436,14 @@ func (s *Server) startServer() (err error) {
return err
}

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
// The param `false` means that we don't initialize the keyspace group manager
// by loading the keyspace group meta from etcd.
if err := s.keyspaceGroupManager.Initialize(false); err != nil {
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ const (

// DefaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap and reserved for users who haven't been assigned keyspace.
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
DefaultKeyspaceID = uint32(0)

// DefaultKeySpaceGroupID is the default key space group id.
// DefaultKeyspaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeySpaceGroupID = uint32(0)
DefaultKeyspaceGroupID = uint32(0)

// APIServiceName is the name of api server.
APIServiceName = "api"
Expand Down
114 changes: 84 additions & 30 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,15 @@ func (s *state) getAMWithMembershipCheck(
if kgid, ok := s.keyspaceLookupTable[keyspaceID]; ok {
return nil, kgid, genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
return nil, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID)

if keyspaceGroupID != mcsutils.DefaultKeyspaceGroupID {
return nil, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID)
}

// The keyspace doesn't belong to any keyspace group, so return the default keyspace group.
// It's for migrating the existing keyspaces which have no keyspace group assigned, so the
// the default keyspace group is used to serve the keyspaces.
return s.ams[mcsutils.DefaultKeyspaceGroupID], mcsutils.DefaultKeyspaceGroupID, nil
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
Expand Down Expand Up @@ -222,23 +230,12 @@ func NewKeyspaceGroupManager(
}

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
// Initialize the default keyspace group if not loading from storage
if !loadFromStorage {
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeySpaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
return nil
}

func (kgm *KeyspaceGroupManager) Initialize() error {
// Load the initial keyspace group assignment from storage with time limit
done := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(kgm.ctx)
go kgm.checkInitProgress(ctx, cancel, done)
watchStartRevision, err := kgm.initAssignment(ctx)
watchStartRevision, defaultKGConfigured, err := kgm.initAssignment(ctx)
done <- struct{}{}
if err != nil {
log.Error("failed to initialize keyspace group manager", errs.ZapError(err))
Expand All @@ -247,6 +244,12 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
return err
}

// Initialize the default keyspace group if it isn't configured in the storage.
if !defaultKGConfigured {
keyspaces := []uint32{mcsutils.DefaultKeyspaceID}
kgm.initDefaultKeysapceGroup(keyspaces)
}

// Watch/apply keyspace group membership/distribution meta changes dynamically.
kgm.wg.Add(1)
go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision)
Expand Down Expand Up @@ -284,14 +287,26 @@ func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel c
<-done
}

func (kgm *KeyspaceGroupManager) initDefaultKeysapceGroup(keyspaces []uint32) {
log.Info("initializing default keyspace group",
zap.Int("keyspaces-length", len(keyspaces)))

group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
Keyspaces: keyspaces,
}
kgm.updateKeyspaceGroup(group)
}

// initAssignment loads initial keyspace group assignment from storage and initialize the group manager.
func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, error) {
// Return watchStartRevision, the start revision for watching keyspace group membership/distribution change.
func (kgm *KeyspaceGroupManager) initAssignment(
ctx context.Context,
) (watchStartRevision int64, defaultKGConfigured bool, err error) {
var (
// The start revision for watching keyspace group membership/distribution change
watchStartRevision int64
groups []*endpoint.KeyspaceGroup
more bool
err error
keyspaceGroupsLoaded uint32
revision int64
)
Expand All @@ -300,7 +315,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err
for {
revision, groups, more, err = kgm.loadKeyspaceGroups(ctx, keyspaceGroupsLoaded, kgm.loadKeyspaceGroupsBatchSize)
if err != nil {
return 0, err
return
}

keyspaceGroupsLoaded += uint32(len(groups))
Expand All @@ -313,10 +328,15 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err
for _, group := range groups {
select {
case <-ctx.Done():
return watchStartRevision, errs.ErrLoadKeyspaceGroupsTerminated
err = errs.ErrLoadKeyspaceGroupsTerminated
return
default:
}

if group.ID == mcsutils.DefaultKeyspaceGroupID {
defaultKGConfigured = true
}

kgm.updateKeyspaceGroup(group)
}

Expand All @@ -326,7 +346,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err
}

log.Info("loaded keyspace groups", zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded))
return watchStartRevision, nil
return
}

// loadKeyspaceGroups loads keyspace groups from the start ID with limit.
Expand Down Expand Up @@ -441,7 +461,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
return revision, wresp.Err()
}
for _, event := range wresp.Events {
id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key))
groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key))
if err != nil {
log.Warn("failed to extract keyspace group ID from the key path",
zap.String("key-path", string(event.Kv.Key)), zap.Error(err))
Expand All @@ -453,12 +473,20 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
group := &endpoint.KeyspaceGroup{}
if err := json.Unmarshal(event.Kv.Value, group); err != nil {
log.Warn("failed to unmarshal keyspace group",
zap.Uint32("keysapce-group-id", id),
zap.Uint32("keysapce-group-id", groupID),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
}
kgm.updateKeyspaceGroup(group)
case clientv3.EventTypeDelete:
kgm.deleteKeyspaceGroup(id)
if groupID == mcsutils.DefaultKeyspaceGroupID {
keyspaces := kgm.kgs[groupID].Keyspaces
kgm.deleteKeyspaceGroup(groupID)
log.Warn("removed default keyspace group meta config from the storage. " +
"now every tso node/pod will initialize it")
kgm.initDefaultKeysapceGroup(keyspaces)
} else {
kgm.deleteKeyspaceGroup(groupID)
}
}
}
revision = wresp.Header.Revision
Expand All @@ -473,6 +501,11 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
// If the default keyspace group isn't assigned to any tso node/pod, assign it to everyone.
if group.ID == mcsutils.DefaultKeyspaceGroupID && len(group.Members) == 0 {
return true
}

for _, member := range group.Members {
if member.Address == kgm.tsoServiceID.ServiceAddr {
return true
Expand Down Expand Up @@ -516,7 +549,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
tsRootPath string
storage *endpoint.StorageEndpoint
)
if group.ID == mcsutils.DefaultKeySpaceGroupID {
if group.ID == mcsutils.DefaultKeyspaceGroupID {
tsRootPath = kgm.legacySvcRootPath
storage = kgm.legacySvcStorage
} else {
Expand All @@ -536,6 +569,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
kgm.ams[group.ID] = am
kgm.Unlock()
} else {
if group.ID == mcsutils.DefaultKeyspaceGroupID {
log.Info("resign default keyspace group membership",
zap.Any("default-keyspace-group", group))
}
// Not assigned to me. If this host/pod owns this keyspace group, it should resign.
kgm.deleteKeyspaceGroup(group.ID)
}
Expand All @@ -554,7 +591,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
return newKeyspaces[i] < newKeyspaces[j]
})

// Mostly, the membership has no change, so we optimize for this case.
// Mostly, the membership has no change, so optimize for this case.
sameMembership := true
if oldLen != newLen {
sameMembership = false
Expand All @@ -571,10 +608,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
defer kgm.Unlock()

if sameMembership {
// The keyspace group membership is not changed, so we reuse the old one.
// The keyspace group membership is not changed. Reuse the old one.
newGroup.KeyspaceLookupTable = oldGroup.KeyspaceLookupTable
} else {
// The keyspace group membership is changed, so we update the keyspace lookup table.
// The keyspace group membership is changed. Update the keyspace lookup table.
newGroup.KeyspaceLookupTable = make(map[uint32]struct{})
for i, j := 0, 0; i < oldLen || j < newLen; {
if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] {
Expand All @@ -590,12 +627,31 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
j++
}
}
if groupID == mcsutils.DefaultKeyspaceGroupID {
if _, ok := newGroup.KeyspaceLookupTable[mcsutils.DefaultKeyspaceID]; !ok {
log.Warn("default keyspace is not in default keyspace group. add it back")
kgm.keyspaceLookupTable[mcsutils.DefaultKeyspaceID] = groupID
newGroup.KeyspaceLookupTable[mcsutils.DefaultKeyspaceID] = struct{}{}
newGroup.Keyspaces = make([]uint32, 1+len(newKeyspaces))
newGroup.Keyspaces[0] = mcsutils.DefaultKeyspaceID
copy(newGroup.Keyspaces[1:], newKeyspaces)
}
} else {
if _, ok := newGroup.KeyspaceLookupTable[mcsutils.DefaultKeyspaceID]; ok {
log.Warn("default keyspace is in non-default keyspace group. remove it")
kgm.keyspaceLookupTable[mcsutils.DefaultKeyspaceID] = mcsutils.DefaultKeyspaceGroupID
delete(newGroup.KeyspaceLookupTable, mcsutils.DefaultKeyspaceID)
newGroup.Keyspaces = newKeyspaces[1:]
}
}
}
kgm.kgs[groupID] = newGroup
}

// deleteKeyspaceGroup deletes the given keyspace group.
func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
log.Info("delete keyspace group", zap.Uint32("keyspace-group-id", groupID))

kgm.Lock()
defer kgm.Unlock()

Expand All @@ -618,8 +674,6 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
am.close()
kgm.ams[groupID] = nil
}

log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", groupID))
}

// GetAllocatorManager returns the AllocatorManager of the given keyspace group
Expand Down
Loading

0 comments on commit 0e16783

Please sign in to comment.