Skip to content

Commit

Permalink
Load initial assignment and dynamically watch/apply keyspace groups' …
Browse files Browse the repository at this point in the history
…membership/distribution change (#6247)

ref #6232

Load initial keyspace group assignment.
Dynamically watch/apply keyspace groups' membership/distribution change.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored Apr 6, 2023
1 parent 9aa3e97 commit 9fcde12
Show file tree
Hide file tree
Showing 15 changed files with 974 additions and 114 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,9 @@ func IsLeaderChange(err error) bool {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) || strings.Contains(errMsg, errs.MismatchLeaderErr)
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

func trimHTTPPrefix(str string) string {
Expand Down
14 changes: 11 additions & 3 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ import (
)

const (
// NotLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
RetryTimeoutErr = "retry timeout"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
RetryTimeoutErr = "retry timeout"
)

// client errors
Expand Down
20 changes: 20 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
'''

["ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace groups timeout
'''

["PD:ErrEncryptionKMS"]
error = '''
KMS error
Expand Down Expand Up @@ -731,11 +741,21 @@ error = '''
get allocator failed, %s
'''

["PD:tso:ErrGetAllocatorManager"]
error = '''
get allocator manager failed, %s
'''

["PD:tso:ErrGetLocalAllocator"]
error = '''
get local allocator failed, %s
'''

["PD:tso:ErrKeyspaceGroupIDInvalid"]
error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
32 changes: 22 additions & 10 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@ package errs
import "github.com/pingcap/errors"

const (
// NotLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
)

// common error in multiple packages
Expand All @@ -31,14 +39,18 @@ var (

// tso errors
var (
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
)

// member errors
Expand Down
9 changes: 8 additions & 1 deletion pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)
Expand All @@ -31,12 +32,18 @@ func newHandler(s *Server) *Handler {
}

// ResetTS resets the ts with specified tso.
// TODO: Support multiple keyspace groups.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocator, err := h.s.GetTSOAllocatorManager().GetAllocator(tso.GlobalDCLocation)
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
}
tsoAllocator, err := tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
Expand Down
43 changes: 33 additions & 10 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ type Server struct {

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
startCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
serviceRegister *discovery.ServiceRegister
}

Expand Down Expand Up @@ -199,14 +202,30 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing implements basicserver. It returns whether the server is the leader
// if there is embedded etcd, or the primary otherwise.
// TODO: support multiple keyspace groups
func (s *Server) IsServing() bool {
return atomic.LoadInt64(&s.isRunning) == 1 && s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).IsLeader()
if atomic.LoadInt64(&s.isRunning) == 0 {
return false
}

member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
}
return member.IsLeader()
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
// The entry at the index 0 is the primary's service endpoint.
func (s *Server) GetLeaderListenUrls() []string {
return s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).GetLeaderListenUrls()
member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return nil
}

return member.GetLeaderListenUrls()
}

// AddServiceReadyCallback implements basicserver.
Expand All @@ -229,8 +248,8 @@ func (s *Server) IsClosed() bool {
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.keyspaceGroupManager.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) {
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
}

// IsLocalRequest checks if the forwarded host is the current host
Expand Down Expand Up @@ -416,11 +435,16 @@ func (s *Server) startServer() (err error) {
}

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
defaultKsgStorageTSRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.etcdClient, s.listenURL.Host, defaultKsgStorageTSRootPath, tsoSvcRootPath, s.cfg)
s.keyspaceGroupManager.Initialize()
s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
// The param `false` means that we don't initialize the keyspace group manager
// by loading the keyspace group meta from etcd.
if err := s.keyspaceGroupManager.Initialize(false); err != nil {
return err
}

s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.service = &Service{Server: s}
Expand Down Expand Up @@ -448,8 +472,7 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
serializedEntry, err := s.serviceID.Serialize()
if err != nil {
return err
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package endpoint
import (
"fmt"
"path"
"regexp"
"strconv"
"strings"

Expand Down Expand Up @@ -239,6 +240,22 @@ func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) {
pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/")
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand Down
48 changes: 48 additions & 0 deletions pkg/storage/endpoint/key_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,51 @@ func BenchmarkRegionPath(b *testing.B) {
_ = RegionPath(uint64(i))
}
}

func TestExtractKeyspaceGroupIDFromPath(t *testing.T) {
re := require.New(t)

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Error(err)
}
}
10 changes: 9 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@ import (
"go.etcd.io/etcd/clientv3"
)

// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group.
type KeyspaceGroupMember struct {
Address string `json:"address"`
}

// KeyspaceGroup is the keyspace group.
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// TODO: add `Members` field
// Members are the election members which campaign for the primary of the keyspace group.
Members []KeyspaceGroupMember `json:"members"`
// Keyspaces are the keyspace IDs which belong to the keyspace group.
Keyspaces []uint32 `json:"keyspaces"`
}

// KeyspaceGroupStorage is the interface for keyspace group storage.
Expand Down
5 changes: 3 additions & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ func (am *AllocatorManager) close() {
log.Info("closed the allocator manager")
}

func (am *AllocatorManager) getMember() *ElectionMember {
return &am.member
func (am *AllocatorManager) getMember() ElectionMember {
return am.member
}

// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location
Expand Down Expand Up @@ -1072,6 +1072,7 @@ func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
return pdpb.Timestamp{}, err
}

return allocatorGroup.allocator.GenerateTSO(count)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)

// Have dc-locations configured in the cluster, use the Global TSO generation way.
// (whit synchronization with other Local TSO Allocators)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(gta.ctx)
defer cancel()
for i := 0; i < maxRetryCount; i++ {
var (
Expand Down Expand Up @@ -237,7 +237,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
skipCheck = true
goto SETTING_PHASE
}
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valide.
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid.
if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 {
tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc()
}
Expand Down Expand Up @@ -309,7 +309,7 @@ type syncResp struct {

// SyncMaxTS is used to sync MaxTS with all Local TSO Allocator leaders in dcLocationMap.
// If maxTSO is the biggest TSO among all Local TSO Allocators, it will be written into
// each allocator and remines the same after the synchronization.
// each allocator and remains the same after the synchronization.
// If not, it will be replaced with the new max Local TSO and return.
func (gta *GlobalTSOAllocator) SyncMaxTS(
ctx context.Context,
Expand Down
Loading

0 comments on commit 9fcde12

Please sign in to comment.