Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add set handler for balancer and alloc node for default keyspace group #6342

Merged
merged 16 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 93 additions & 18 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import (
)

const (
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodeTimeout = 1 * time.Second
allocNodeInterval = 10 * time.Millisecond
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodesForDefaultKeyspaceGroupInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
Expand Down Expand Up @@ -112,6 +113,14 @@ func (m *GroupManager) Bootstrap() error {

m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
m.wg.Add(1)
go m.startWatchLoop()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand All @@ -124,16 +133,16 @@ func (m *GroupManager) Bootstrap() error {
return err
}
for _, group := range groups {
if group.ID == utils.DefaultKeyspaceGroupID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that, not just about default keyspace group, this logic applies to any keyspace group instead, i.e., if any keyspace group's member/replica count is under the desired replica count, then alloc nodes to it. The desired replica count is specified as one of parameters when creating keyspace group and the default value is 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to check params for set-nodes and alloc-nodes interface?

if len(group.Members) == 0 && m.client != nil {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// The default keyspace group should have one replica at least.
m.wg.Add(1)
go m.allocNodesForDefaultKeyspaceGroup()
}
}
userKind := endpoint.StringUserKind(group.UserKind)
m.groups[userKind].Put(group)
}

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.wg.Add(1)
go m.startWatchLoop()
}
return nil
}

Expand All @@ -143,6 +152,30 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesForDefaultKeyspaceGroup() {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesForDefaultKeyspaceGroupInterval)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need timeout for it?

select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
replica := m.GetNodesNum()
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
if err == nil && kg != nil && len(kg.Members) >= replica {
Copy link
Contributor

@binshi-bing binshi-bing Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from this line, if my understanding is correct, we want to assign a replica/member of default keyspace group to every tso node, but this isn't true. Currently tso service use this logic, because the kg.Members is empty. I remember we can specify the desired replica/member count (default value is 2 if it isn't specified) of a keyspace group when creating a keyspace group, so we should use that desired replica/member count instead of GetNodesNum() here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add all nodes for the default keyspace group, because we do not support offline nodes in keyspace balancer, but we test delete tso node primary in test. If only one replica, some test will fail.

continue
}
nodes, err := m.AllocNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, replica)
if err == nil && len(nodes) == replica {
log.Info("alloc nodes for default keyspace group", zap.Reflect("nodes", nodes))
}
log.Warn("failed to alloc nodes for default keyspace group", zap.Error(err))
}
}

func (m *GroupManager) startWatchLoop() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand All @@ -153,12 +186,9 @@ func (m *GroupManager) startWatchLoop() {
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision
Expand All @@ -173,6 +203,11 @@ func (m *GroupManager) startWatchLoop() {
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
Expand Down Expand Up @@ -221,7 +256,7 @@ func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (i
for _, event := range wresp.Events {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(event.Kv.Value, s); err != nil {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("failed to unmarshal service registry entry", zap.Error(err))
log.Warn("failed to unmarshal service registry entry", zap.Error(err), zap.ByteString("value", event.Kv.Value))
}
switch event.Type {
case clientv3.EventTypePut:
Expand Down Expand Up @@ -590,14 +625,19 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {

// GetNodesNum returns the number of nodes.
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
func (m *GroupManager) GetNodesNum() int {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
if m.nodesBalancer == nil {
return 0
}
return m.nodesBalancer.Len()
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout)
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
defer cancel()
ticker := time.NewTicker(allocNodeInterval)
ticker := time.NewTicker(allocNodesInterval)
defer ticker.Stop()
nodes := make([]endpoint.KeyspaceGroupMember, 0, replica)
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
Expand All @@ -613,6 +653,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
if len(exists) >= replica {
return nil
}
for len(exists) < replica {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -641,3 +684,35 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
}
return nodes, nil
}

// SetNodesForKeyspaceGroup sets the nodes for the keyspace group.
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel here we have more work to do in the future, such as sanity check and service availability check for the nodes switched to before actually refreshing the member list. Please ignore this comment for now.

for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
}
}
return false
}
3 changes: 3 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (s *Server) IsKeyspaceServing(keyspaceID, keyspaceGroupID uint32) bool {
log.Error("failed to get election member", errs.ZapError(err))
return false
}
if member == nil {
return false
}
return member.IsLeader()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,9 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
if err != nil {
return nil, err
}
if am == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if it is primary after kgm.deleteKeyspaceGroup with default keyspace group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean delete the default keyspace group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default keyspace group no longer contain this member

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it actually goes here, it will panic I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be removed after @binshi-bing has fixed

return nil, nil
}
return am.getMember(), nil
}

Expand Down
61 changes: 55 additions & 6 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
router.GET("", GetKeyspaceGroups)
router.GET("/:id", GetKeyspaceGroupByID)
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodeForKeyspaceGroup)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
}
Expand Down Expand Up @@ -190,21 +191,21 @@ func FinishSplitKeyspaceByID(c *gin.Context) {
c.JSON(http.StatusOK, nil)
}

// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodeForKeyspaceGroupParams struct {
// AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodesForKeyspaceGroupParams struct {
Replica int `json:"replica"`
}

// AllocNodeForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodeForKeyspaceGroup(c *gin.Context) {
// AllocNodesForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
allocParams := &AllocNodeForKeyspaceGroupParams{}
allocParams := &AllocNodesForKeyspaceGroupParams{}
err = c.BindJSON(allocParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
Expand Down Expand Up @@ -232,6 +233,54 @@ func AllocNodeForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
}

// SetNodesForKeyspaceGroup sets nodes for keyspace group.
func SetNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
setParams := &SetNodesForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
// check keyspace group whether exist
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
if err != nil || keyspaceGroup == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check nodes whether empty
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
if len(setParams.Nodes) == 0 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty nodes")
return
}
// check nodes whether exist
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
}
// set nodes
err = manager.SetNodesForKeyspaceGroup(id, setParams.Nodes)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down
Loading