Skip to content

Commit

Permalink
tests, tso: add more TSO split tests (tikv#6338)
Browse files Browse the repository at this point in the history
ref tikv#6232

Add more TSO split tests.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent c95ae05 commit 9363e4e
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 96 deletions.
10 changes: 5 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ error = '''
init file log error, %s
'''

["PD:member:ErrCheckCampaign"]
error = '''
check campaign failed
'''

["PD:member:ErrEtcdLeaderNotFound"]
error = '''
etcd leader not found
Expand All @@ -491,11 +496,6 @@ error = '''
marshal leader failed
'''

["PD:member:ErrPreCheckCampaign"]
error = '''
pre-check campaign failed
'''

["PD:netstat:ErrNetstatTCPSocks"]
error = '''
TCP socks error
Expand Down
2 changes: 1 addition & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (
var (
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrPreCheckCampaign = errors.Normalize("pre-check campaign failed", errors.RFCCodeText("PD:member:ErrPreCheckCampaign"))
ErrCheckCampaign = errors.Normalize("check campaign failed", errors.RFCCodeText("PD:member:ErrCheckCampaign"))
)

// core errors
Expand Down
9 changes: 9 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ func (s *Server) GetLeaderListenUrls() []string {
return member.GetLeaderListenUrls()
}

// GetMember returns the election member of the given keyspace and keyspace group.
func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMember, error) {
member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID)
if err != nil {
return nil, err
}
return member, nil
}

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
Expand Down
30 changes: 21 additions & 9 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type Participant struct {
// leader key when this participant is successfully elected as the leader of
// the group. Every write will use it to check the leadership.
memberValue string
// preCampaignChecker is called before the campaign. If it returns false, the
// campaign will be skipped.
preCampaignChecker leadershipCheckFunc
// campaignChecker is used to check whether the additional constraints for a
// campaign are satisfied. If it returns false, the campaign will fail.
campaignChecker atomic.Value // Store as leadershipCheckFunc
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (m *Participant) Client() *clientv3.Client {
// IsLeader returns whether the participant is the leader or not by checking its leadership's
// lease and leader info.
func (m *Participant) IsLeader() bool {
return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId()
return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId() && m.campaignCheck()
}

// IsLeaderElected returns true if the leader exists; otherwise false
Expand Down Expand Up @@ -181,8 +181,8 @@ func (m *Participant) GetLeadership() *election.Leadership {

// CampaignLeader is used to campaign the leadership and make it become a leader.
func (m *Participant) CampaignLeader(leaseTimeout int64) error {
if m.preCampaignChecker != nil && !m.preCampaignChecker(m.leadership) {
return errs.ErrPreCheckCampaign
if !m.campaignCheck() {
return errs.ErrCheckCampaign
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}
Expand Down Expand Up @@ -351,7 +351,19 @@ func (m *Participant) GetLeaderPriority(id uint64) (int, error) {
return int(priority), nil
}

// SetPreCampaignChecker sets the pre-campaign checker.
func (m *Participant) SetPreCampaignChecker(checker leadershipCheckFunc) {
m.preCampaignChecker = checker
func (m *Participant) campaignCheck() bool {
checker := m.campaignChecker.Load()
if checker == nil {
return true
}
checkerFunc, ok := checker.(leadershipCheckFunc)
if !ok || checkerFunc == nil {
return true
}
return checkerFunc(m.leadership)
}

// SetCampaignChecker sets the pre-campaign checker.
func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) {
m.campaignChecker.Store(checker)
}
3 changes: 2 additions & 1 deletion pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ func (am *AllocatorManager) close() {
log.Info("closed the allocator manager")
}

func (am *AllocatorManager) getMember() ElectionMember {
// GetMember returns the ElectionMember of this AllocatorManager.
func (am *AllocatorManager) GetMember() ElectionMember {
return am.member
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
if errors.Is(err, errs.ErrEtcdTxnConflict) {
log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully",
zap.String("campaign-tso-primary-name", gta.member.Name()))
} else if errors.Is(err, errs.ErrPreCheckCampaign) {
} else if errors.Is(err, errs.ErrCheckCampaign) {
log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split",
zap.String("campaign-tso-primary-name", gta.member.Name()))
} else {
Expand Down
8 changes: 4 additions & 4 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
zap.Uint32("source", splitSource))
return
}
participant.SetPreCampaignChecker(func(leadership *election.Leadership) bool {
return splitSourceAM.getMember().IsLeader()
participant.SetCampaignChecker(func(leadership *election.Leadership) bool {
return splitSourceAM.GetMember().IsLeader()
})
}
// Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp.
Expand Down Expand Up @@ -673,7 +673,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
}
// Check if the split is completed.
if oldGroup.IsSplitTarget() && !newGroup.IsSplitting() {
kgm.ams[groupID].getMember().(*member.Participant).SetPreCampaignChecker(nil)
kgm.ams[groupID].GetMember().(*member.Participant).SetCampaignChecker(nil)
}
kgm.kgs[groupID] = newGroup
}
Expand Down Expand Up @@ -728,7 +728,7 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
if err != nil {
return nil, err
}
return am.getMember(), nil
return am.GetMember(), nil
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group.
Expand Down
43 changes: 27 additions & 16 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/stretchr/testify/require"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
)

// TestCluster is a test cluster for TSO.
type TestCluster struct {
// TestTSOCluster is a test cluster for TSO.
type TestTSOCluster struct {
ctx context.Context

backendEndpoints string
Expand All @@ -35,8 +36,8 @@ type TestCluster struct {
}

// NewTestTSOCluster creates a new TSO test cluster.
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestCluster, err error) {
tc = &TestCluster{
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestTSOCluster, err error) {
tc = &TestTSOCluster{
ctx: ctx,
backendEndpoints: backendEndpoints,
servers: make(map[string]*tso.Server, initialServerCount),
Expand All @@ -52,7 +53,7 @@ func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpo
}

// AddServer adds a new TSO server to the test cluster.
func (tc *TestCluster) AddServer(addr string) error {
func (tc *TestTSOCluster) AddServer(addr string) error {
cfg := tso.NewConfig()
cfg.BackendEndpoints = tc.backendEndpoints
cfg.ListenAddr = addr
Expand All @@ -75,7 +76,7 @@ func (tc *TestCluster) AddServer(addr string) error {
}

// Destroy stops and destroy the test cluster.
func (tc *TestCluster) Destroy() {
func (tc *TestTSOCluster) Destroy() {
for _, cleanup := range tc.cleanupFuncs {
cleanup()
}
Expand All @@ -84,14 +85,14 @@ func (tc *TestCluster) Destroy() {
}

// DestroyServer stops and destroy the test server by the given address.
func (tc *TestCluster) DestroyServer(addr string) {
func (tc *TestTSOCluster) DestroyServer(addr string) {
tc.cleanupFuncs[addr]()
delete(tc.cleanupFuncs, addr)
delete(tc.servers, addr)
}

// GetPrimary returns the primary TSO server.
func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
Expand All @@ -101,12 +102,12 @@ func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Serve
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader of the given keyspace.
func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) string {
var primary string
func (tc *TestTSOCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) *tso.Server {
var primary *tso.Server
testutil.Eventually(re, func() bool {
for name, s := range tc.servers {
if s.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = name
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = server
return true
}
}
Expand All @@ -117,12 +118,12 @@ func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID,
}

// WaitForDefaultPrimaryServing waits for one of servers being elected to be the primary/leader of the default keyspace.
func (tc *TestCluster) WaitForDefaultPrimaryServing(re *require.Assertions) string {
func (tc *TestTSOCluster) WaitForDefaultPrimaryServing(re *require.Assertions) *tso.Server {
return tc.WaitForPrimaryServing(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
}

// GetServer returns the TSO server by the given address.
func (tc *TestCluster) GetServer(addr string) *tso.Server {
func (tc *TestTSOCluster) GetServer(addr string) *tso.Server {
for srvAddr, server := range tc.servers {
if srvAddr == addr {
return server
Expand All @@ -132,6 +133,16 @@ func (tc *TestCluster) GetServer(addr string) *tso.Server {
}

// GetServers returns all TSO servers.
func (tc *TestCluster) GetServers() map[string]*tso.Server {
func (tc *TestTSOCluster) GetServers() map[string]*tso.Server {
return tc.servers
}

// GetKeyspaceGroupMember converts the TSO servers to KeyspaceGroupMember and returns.
func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceGroupMember) {
for _, server := range tc.servers {
members = append(members, endpoint.KeyspaceGroupMember{
Address: server.GetAddr(),
})
}
return
}
Loading

0 comments on commit 9363e4e

Please sign in to comment.