Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Apr 19, 2023
1 parent c932b73 commit 8b049dd
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
33 changes: 18 additions & 15 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ func (m *GroupManager) Bootstrap() error {
}
for _, group := range groups {
if group.ID == utils.DefaultKeyspaceGroupID {
if len(group.Members) == 0 {
if len(group.Members) == 0 && m.client != nil {
// The default keyspace group should have one replica at least.
m.wg.Add(1)
go m.allocNodesForDefaultKeyspaceGroup(1)
go m.allocNodesForDefaultKeyspaceGroup()
}
}
userKind := endpoint.StringUserKind(group.UserKind)
Expand All @@ -152,26 +152,27 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesForDefaultKeyspaceGroup(replica int) {
func (m *GroupManager) allocNodesForDefaultKeyspaceGroup() {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesForDefaultKeyspaceGroupInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
replica := m.GetNodesNum()
if err == nil && kg != nil && len(kg.Members) >= replica {
return
continue
}
nodes, err := m.AllocNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, replica)
if err == nil && len(nodes) == replica {
log.Info("alloc nodes for default keyspace group", zap.Reflect("nodes", nodes))
return
}
log.Warn("failed to alloc nodes for default keyspace group", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
}
}

Expand All @@ -185,12 +186,9 @@ func (m *GroupManager) startWatchLoop() {
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision
Expand All @@ -205,6 +203,11 @@ func (m *GroupManager) startWatchLoop() {
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (s *Server) IsKeyspaceServing(keyspaceID, keyspaceGroupID uint32) bool {
log.Error("failed to get election member", errs.ZapError(err))
return false
}
if member == nil {
return false
}
return member.IsLeader()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,9 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
if err != nil {
return nil, err
}
if am == nil {
return nil, nil
}
return am.getMember(), nil
}

Expand Down

0 comments on commit 8b049dd

Please sign in to comment.