Skip to content

Commit

Permalink
Add API interface to obtain the TSO keyspace group member info
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Apr 24, 2023
1 parent 7350cd2 commit 47dc10b
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 54 deletions.
70 changes: 55 additions & 15 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ import (
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
"go.uber.org/zap"
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/tso/api/v1"
const (
// APIPathPrefix is the prefix of the API path.
APIPathPrefix = "/tso/api/v1"
)

var (
once sync.Once
Expand All @@ -46,14 +52,14 @@ var (
func init() {
tsoserver.SetUpRestHandler = func(srv *tsoserver.Service) (http.Handler, apiutil.APIServiceGroup) {
s := NewService(srv)
return s.handler(), apiServiceGroup
return s.apiHandlerEngine, apiServiceGroup
}
}

// Service is the tso service.
type Service struct {
apiHandlerEngine *gin.Engine
baseEndpoint *gin.RouterGroup
root *gin.RouterGroup

srv *tsoserver.Service
rd *render.Render
Expand All @@ -77,30 +83,64 @@ func NewService(srv *tsoserver.Service) *Service {
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set("service", srv)
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
endpoint := apiHandlerEngine.Group(APIPathPrefix)
root := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
srv: srv,
apiHandlerEngine: apiHandlerEngine,
baseEndpoint: endpoint,
root: root,
rd: createIndentRender(),
}
s.RegisterRouter()
s.RegisterAdminRouter()
s.RegisterKeyspaceGroupRouter()
return s
}

// RegisterRouter registers the router of the service.
func (s *Service) RegisterRouter() {
// RegisterAdminRouter registers the router of the TSO admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
tsoAdminHandler := tso.NewAdminHandler(s.srv.GetHandler(), s.rd)
s.baseEndpoint.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
router.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
}

func (s *Service) handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.apiHandlerEngine.ServeHTTP(w, r)
})
// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler.
func (s *Service) RegisterKeyspaceGroupRouter() {
router := s.root.Group("keyspace-groups")
router.GET("/members", GetKeyspaceGroupMembers)
}

// KeyspaceGroupMember contains the keyspace group and its member information.
type KeyspaceGroupMember struct {
Group *endpoint.KeyspaceGroup
Member *tsopb.Participant
IsPrimary bool `json:"is_primary"`
PrimaryID uint64 `json:"primary_id"`
}

// GetKeyspaceGroupMembers gets the keyspace group members that the TSO service is serving.
func GetKeyspaceGroupMembers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
kgm := svr.GetKeyspaceGroupManager()
keyspaceGroups := kgm.GetKeyspaceGroups()
members := make(map[uint32]*KeyspaceGroupMember, len(keyspaceGroups))
for id, group := range keyspaceGroups {
am, err := kgm.GetAllocatorManager(id)
if err != nil {
log.Error("failed to get allocator manager",
zap.Uint32("keyspace-group-id", id), zap.Error(err))
continue
}
member := am.GetMember()
members[id] = &KeyspaceGroupMember{
Group: group,
Member: member.GetMember().(*tsopb.Participant),
IsPrimary: member.IsLeader(),
PrimaryID: member.GetLeaderID(),
}
}
c.IndentedJSON(http.StatusOK, members)
}
5 changes: 5 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (s *Server) IsClosed() bool {
return atomic.LoadInt64(&s.isRunning) == 0
}

// GetKeyspaceGroupManager returns the manager of keyspace group.
func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager {
return s.keyspaceGroupManager
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) {
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
Expand Down
29 changes: 27 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsTimeout time.Duration
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -228,6 +231,7 @@ func NewKeyspaceGroupManager(
loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout,
loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize,
loadFromEtcdMaxRetryTimes: defaultLoadFromEtcdMaxRetryTimes,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
}
kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
Expand Down Expand Up @@ -500,6 +504,11 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
}
}
}
// Retry the groups that are not initialized successfully before.
for id, group := range kgm.groupUpdateRetryList {
delete(kgm.groupUpdateRetryList, id)
kgm.updateKeyspaceGroup(group)
}
revision = wresp.Header.Revision + 1
}

Expand Down Expand Up @@ -566,13 +575,15 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
if group.IsSplitTarget() {
splitSource := group.SplitSource()
log.Info("keyspace group is in split",
zap.Uint32("keyspace-group-id", group.ID),
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
splitSourceAM, _ := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil {
// TODO: guarantee that the split source keyspace group is initialized before.
log.Error("the split source keyspace group is not initialized",
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
// Put the group into the retry list to retry later.
kgm.groupUpdateRetryList[group.ID] = group
return
}
participant.SetCampaignChecker(func(leadership *election.Leadership) bool {
Expand Down Expand Up @@ -731,6 +742,20 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
return am.GetMember(), nil
}

// GetKeyspaceGroups returns all keyspace groups managed by the current keyspace group manager.
func (kgm *KeyspaceGroupManager) GetKeyspaceGroups() map[uint32]*endpoint.KeyspaceGroup {
kgm.RLock()
defer kgm.RUnlock()
keyspaceGroups := make(map[uint32]*endpoint.KeyspaceGroup)
for _, keyspaceGroupID := range kgm.keyspaceLookupTable {
if _, ok := keyspaceGroups[keyspaceGroupID]; ok {
continue
}
keyspaceGroups[keyspaceGroupID] = kgm.kgs[keyspaceGroupID]
}
return keyspaceGroups
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group.
func (kgm *KeyspaceGroupManager) HandleTSORequest(
keyspaceID, keyspaceGroupID uint32,
Expand Down
Loading

0 comments on commit 47dc10b

Please sign in to comment.