Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add set handler for balancer and alloc node for default keyspace group #6342

Merged
merged 16 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 117 additions & 34 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import (
)

const (
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodeTimeout = 1 * time.Second
allocNodeInterval = 10 * time.Millisecond
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodesToKeyspaceGroupsInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
Expand Down Expand Up @@ -71,8 +72,6 @@ type GroupManager struct {
// tsoServiceEndKey is the end key of TSO service in etcd.
tsoServiceEndKey string

policy balancer.Policy

// TODO: add user kind with different balancer
// when we ensure where the correspondence between tso node and user kind will be found
nodesBalancer balancer.Balancer[string]
Expand All @@ -89,14 +88,15 @@ func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupSt
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
return &GroupManager{
ctx: ctx,
cancel: cancel,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
policy: defaultBalancerPolicy,
groups: groups,
ctx: ctx,
cancel: cancel,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
groups: groups,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #6346

}
}

Expand All @@ -114,6 +114,14 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error {

m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.wg.Add(2)
go m.startWatchLoop(ctx)
go m.allocNodesToAllKeyspaceGroups()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand All @@ -130,13 +138,6 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error {
m.groups[userKind].Put(group)
}

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop(ctx)
}
return nil
}

Expand All @@ -146,6 +147,45 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
defer ticker.Stop()
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need timeout for it?

select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount {
continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
log.Error("failed to load the all keyspace group", zap.Error(err))
continue
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Error(err))
continue
}
group.Members = nodes
}
}
if !withError {
// all keyspace groups have equal or more than default replica count
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continue or return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return. It will only occur when load keyspace group. In the future, we support scale-out and update nodes in balancer, it will update always. cc @binshi-bing

Copy link
Contributor

@binshi-bing binshi-bing Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have done all the work here, then we can return. The problem is that it seems that we have a severe bug -- when utils.KeyspaceGroupDefaultReplicaCount is 2 and there are two tso nodes just registered with the third tso node upcoming, this function will proceed to load all keyspace groups and allocate 2 nodes to all of them. After the third TSO node being registered, it won't be assigned any keyspace group. This is common case when we to create a new cluster including api service and tso service, where we first create API nodes then gradually add tso nodes.

Because of this reason, I prefer to let operator manually call balance api, after all tso nodes are registered, to assign tso nodes to the keyspace groups whose member count is less than utils.KeyspaceGroupDefaultReplicaCount instead of doing this job in group manager's bootstrap.

@lhy1024, @rleungx, @JmPotato let's discuss on Monday.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhy1024 , after some thoughts, let's keep your way, because we'll mostly setup 2 tso nodes for now and we can also use move keyspace group api to change the distribution.

}
}
}

func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
Expand All @@ -156,12 +196,9 @@ func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision + 1
Expand All @@ -177,6 +214,11 @@ func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
Expand Down Expand Up @@ -603,18 +645,23 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
return nil
}

// GetNodesNum returns the number of nodes.
func (m *GroupManager) GetNodesNum() int {
// GetNodesCount returns the count of nodes.
func (m *GroupManager) GetNodesCount() int {
if m.nodesBalancer == nil {
return 0
}
return m.nodesBalancer.Len()
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) {
ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout)
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
defer cancel()
ticker := time.NewTicker(allocNodeInterval)
ticker := time.NewTicker(allocNodesInterval)
defer ticker.Stop()
nodes := make([]endpoint.KeyspaceGroupMember, 0, replica)
nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount)
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
Expand All @@ -628,14 +675,17 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
for len(exists) < replica {
if len(exists) >= desiredReplicaCount {
return nil
}
for len(exists) < desiredReplicaCount {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
num := m.GetNodesNum()
if num < replica || num == 0 { // double check
countOfNodes := m.GetNodesCount()
if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need countOfNodes == 0 when countOfNodes >= desiredReplicaCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider meeting offline node

return ErrNoAvailableNode
}
addr := m.nodesBalancer.Next()
Expand All @@ -654,5 +704,38 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
if err != nil {
return nil, err
}
log.Info("alloc nodes for keyspace group", zap.Uint32("id", id), zap.Reflect("nodes", nodes))
return nodes, nil
}

// SetNodesForKeyspaceGroup sets the nodes for the keyspace group.
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel here we have more work to do in the future, such as sanity check and service availability check for the nodes switched to before actually refreshing the member list. Please ignore this comment for now.

for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
}
}
return false
}
3 changes: 3 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (s *Server) IsKeyspaceServing(keyspaceID, keyspaceGroupID uint32) bool {
log.Error("failed to get election member", errs.ZapError(err))
return false
}
if member == nil {
return false
}
return member.IsLeader()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,7 @@ const (
// MaxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the
// foreseen future, and the former is just for extensibility in theory.
MaxKeyspaceGroupCountInUse = uint32(4096)

// KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group.
KeyspaceGroupDefaultReplicaCount = 2
)
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,9 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
if err != nil {
return nil, err
}
if am == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if it is primary after kgm.deleteKeyspaceGroup with default keyspace group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean delete the default keyspace group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default keyspace group no longer contain this member

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it actually goes here, it will panic I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be removed after @binshi-bing has fixed

return nil, nil
}
return am.GetMember(), nil
}

Expand Down
65 changes: 57 additions & 8 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
router.GET("", GetKeyspaceGroups)
router.GET("/:id", GetKeyspaceGroupByID)
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodeForKeyspaceGroup)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
}
Expand Down Expand Up @@ -190,28 +191,28 @@ func FinishSplitKeyspaceByID(c *gin.Context) {
c.JSON(http.StatusOK, nil)
}

// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodeForKeyspaceGroupParams struct {
// AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodesForKeyspaceGroupParams struct {
Replica int `json:"replica"`
}

// AllocNodeForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodeForKeyspaceGroup(c *gin.Context) {
// AllocNodesForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
allocParams := &AllocNodeForKeyspaceGroupParams{}
allocParams := &AllocNodesForKeyspaceGroupParams{}
err = c.BindJSON(allocParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesNum() < allocParams.Replica || allocParams.Replica < 1 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [1, nodes_num]")
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]")
return
}
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
Expand All @@ -232,6 +233,54 @@ func AllocNodeForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
}

// SetNodesForKeyspaceGroup sets nodes for keyspace group.
func SetNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
setParams := &SetNodesForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
// check if keyspace group exists
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
if err != nil || keyspaceGroup == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check if nodes is less than default replica count
if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes")
return
}
// check if node exists
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
}
// set nodes
err = manager.SetNodesForKeyspaceGroup(id, setParams.Nodes)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down
Loading