From 20571e4a625762fe6c563b4bc44a44b2af32895a Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Tue, 16 May 2023 10:51:17 +0800 Subject: [PATCH] Fix "non-default keyspace groups use the same timestamp path by mistake" (#6457) close tikv/pd#6453, close tikv/pd#6465 The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups are available for serving tso requests from corresponding keyspaces by querying IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id in the query, it will always return true as the keyspace will be served by default keyspace group before the keyspace groups are loaded. Signed-off-by: Bin Shi --- pkg/tso/allocator_manager.go | 34 ++++++++++--- pkg/tso/global_allocator.go | 29 ++++++++++- pkg/tso/keyspace_group_manager.go | 7 ++- pkg/tso/local_allocator.go | 24 ++++++++- pkg/tso/tso.go | 27 ++++++---- .../mcs/tso/keyspace_group_manager_test.go | 35 ++++++++++--- tests/integrations/tso/client_test.go | 51 +++++++++++-------- tests/integrations/tso/testutil.go | 2 +- 8 files changed, 159 insertions(+), 50 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index a780e7da74e..f2381e9e7d8 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -46,12 +46,13 @@ import ( const ( // GlobalDCLocation is the Global TSO Allocator's DC location label. - GlobalDCLocation = "global" - checkStep = time.Minute - patrolStep = time.Second - defaultAllocatorLeaderLease = 3 - localTSOAllocatorEtcdPrefix = "lta" - localTSOSuffixEtcdPrefix = "lts" + GlobalDCLocation = "global" + checkStep = time.Minute + patrolStep = time.Second + defaultAllocatorLeaderLease = 3 + globalTSOAllocatorEtcdPrefix = "gta" + localTSOAllocatorEtcdPrefix = "lta" + localTSOSuffixEtcdPrefix = "lts" ) var ( @@ -273,6 +274,23 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc go am.allocatorLeaderLoop(parentCtx, localTSOAllocator) } +// GetTimestampPath returns the timestamp path in etcd for the given DCLocation. +func (am *AllocatorManager) GetTimestampPath(dcLocation string) string { + if am == nil { + return "" + } + if len(dcLocation) == 0 { + dcLocation = GlobalDCLocation + } + + am.mu.RLock() + defer am.mu.RUnlock() + if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist { + return path.Join(am.rootPath, allocatorGroup.allocator.GetTimestampPath()) + } + return "" +} + // tsoAllocatorLoop is used to run the TSO Allocator updating daemon. func (am *AllocatorManager) tsoAllocatorLoop() { defer logutil.LogPanic() @@ -750,7 +768,9 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { } if err := ag.allocator.UpdateTSO(); err != nil { log.Warn("failed to update allocator's timestamp", - zap.String("dc-location", ag.dcLocation), errs.ZapError(err)) + zap.String("dc-location", ag.dcLocation), + zap.String("name", am.member.Name()), + errs.ZapError(err)) am.ResetAllocatorGroup(ag.dcLocation) return } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 0227f2c1a64..1d961fd1b95 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "path" "sync" "sync/atomic" "time" @@ -45,6 +46,14 @@ type Allocator interface { IsInitialize() bool // UpdateTSO is used to update the TSO in memory and the time window in etcd. UpdateTSO() error + // GetTimestampPath returns the timestamp path in etcd, which is: + // 1. for the default keyspace group: + // a. timestamp in /pd/{cluster_id}/timestamp + // b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp + // 1. for the non-default keyspace groups: + // a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp + // b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp + GetTimestampPath() string // SetTSO sets the physical part with given TSO. It's mainly used for BR restore. // Cannot set the TSO smaller than now in any case. // if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error @@ -80,6 +89,16 @@ func NewGlobalTSOAllocator( am *AllocatorManager, startGlobalLeaderLoop bool, ) Allocator { + // Construct the timestampOracle path prefix, which is: + // 1. for the default keyspace group: + // "" in /pd/{cluster_id}/timestamp + // 2. for the non-default keyspace groups: + // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp + tsPath := "" + if am.kgID != mcsutils.DefaultKeyspaceGroupID { + tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix) + } + ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ ctx: ctx, @@ -89,7 +108,7 @@ func NewGlobalTSOAllocator( timestampOracle: ×tampOracle{ client: am.member.GetLeadership().GetClient(), rootPath: am.rootPath, - ltsPath: "", + tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, @@ -127,6 +146,14 @@ func (gta *GlobalTSOAllocator) getSyncRTT() int64 { return syncRTT.(int64) } +// GetTimestampPath returns the timestamp path in etcd. +func (gta *GlobalTSOAllocator) GetTimestampPath() string { + if gta == nil || gta.timestampOracle == nil { + return "" + } + return gta.timestampOracle.GetTimestampPath() +} + func (gta *GlobalTSOAllocator) estimateMaxTS(count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) { physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(int64(count), 0) if physical == 0 { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7038ef8e373..4d32ae92c80 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -186,9 +186,9 @@ type KeyspaceGroupManager struct { // 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary" // 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default // keyspace groups. - // Key: /ms/{cluster_id}/tso/{group}/gts/timestamp + // Key: /ms/{cluster_id}/tso/{group}/gta/timestamp // Value: ts(time.Time) - // Key: /ms/{cluster_id}/tso/{group}/lts/{dc-location}/timestamp + // Key: /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp // Value: ts(time.Time) // Note: The {group} is 5 digits integer with leading zeros. tsoSvcRootPath string @@ -425,6 +425,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro } // Initialize all kinds of maps. am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + log.Info("created allocator manager", + zap.Uint32("keyspace-group-id", group.ID), + zap.String("timestamp-path", am.GetTimestampPath(""))) kgm.Lock() group.KeyspaceLookupTable = make(map[uint32]struct{}) for _, kid := range group.Keyspaces { diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index c3bb4c02aad..9c2867966bc 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -43,7 +44,7 @@ type LocalTSOAllocator struct { // for election use, notice that the leadership that member holds is // the leadership for PD leader. Local TSO Allocator's leadership is for the // election of Local TSO Allocator leader among several PD servers and - // Local TSO Allocator only use member's some etcd and pbpd.Member info. + // Local TSO Allocator only use member's some etcd and pdpb.Member info. // So it's not conflicted. rootPath string allocatorLeader atomic.Value // stored as *pdpb.Member @@ -55,13 +56,24 @@ func NewLocalTSOAllocator( leadership *election.Leadership, dcLocation string, ) Allocator { + // Construct the timestampOracle path prefix, which is: + // 1. for the default keyspace group: + // lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp + // 2. for the non-default keyspace groups: + // {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp + var tsPath string + if am.kgID == utils.DefaultKeyspaceGroupID { + tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation) + } else { + tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation) + } return &LocalTSOAllocator{ allocatorManager: am, leadership: leadership, timestampOracle: ×tampOracle{ client: leadership.GetClient(), rootPath: am.rootPath, - ltsPath: path.Join(localTSOAllocatorEtcdPrefix, dcLocation), + tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, @@ -73,6 +85,14 @@ func NewLocalTSOAllocator( } } +// GetTimestampPath returns the timestamp path in etcd. +func (lta *LocalTSOAllocator) GetTimestampPath() string { + if lta == nil || lta.timestampOracle == nil { + return "" + } + return lta.timestampOracle.GetTimestampPath() +} + // GetDCLocation returns the local allocator's dc-location. func (lta *LocalTSOAllocator) GetDCLocation() string { return lta.timestampOracle.dcLocation diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 525385b42eb..aa1a424d8cd 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -62,8 +62,8 @@ type tsoObject struct { type timestampOracle struct { client *clientv3.Client rootPath string - // When ltsPath is empty, it means that it is a global timestampOracle. - ltsPath string + // When tsPath is empty, it means that it is a global timestampOracle. + tsPath string storage endpoint.TSOStorage // TODO: remove saveInterval saveInterval time.Duration @@ -141,8 +141,9 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int return rawLogical< 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold { - log.Warn("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prevPhysical), zap.Time("now", now), zap.Duration("update-physical-interval", t.updatePhysicalInterval)) + log.Warn("clock offset", + zap.Duration("jet-lag", jetLag), + zap.Time("prev-physical", prevPhysical), + zap.Time("now", now), + zap.Duration("update-physical-interval", t.updatePhysicalInterval)) tsoCounter.WithLabelValues("slow_save", t.dcLocation).Inc() } @@ -313,7 +318,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error // The time window needs to be updated and saved to etcd. if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= UpdateTimestampGuard { save := next.Add(t.saveInterval) - if err := t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil { + if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + log.Warn("save timestamp failed", + zap.String("dc-location", t.dcLocation), + zap.String("timestamp-path", t.GetTimestampPath()), + zap.Error(err)) tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc() return err } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 799fccd42e4..4b650bf1e25 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,6 +16,8 @@ package tso import ( "context" + "fmt" + "strconv" "strings" "sync" "testing" @@ -134,6 +136,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp } } + // Create a client for each keyspace and make sure they can successfully discover the service + // provided by the default keyspace group. keyspaceIDs := []uint32{0, 1, 2, 3, 1000} clients := mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) @@ -148,6 +152,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe // on a tso server. re := suite.Require() + // Create keyspace groups. params := []struct { keyspaceGroupID uint32 keyspaceIDs []uint32 @@ -176,15 +181,29 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe }) } + // Wait until all keyspace groups are ready. testutil.Eventually(re, func() bool { for _, param := range params { for _, keyspaceID := range param.keyspaceIDs { served := false for _, server := range suite.tsoCluster.GetServers() { if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) { - tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID) + am, err := server.GetTSOAllocatorManager(param.keyspaceGroupID) re.NoError(err) - re.NotNil(tam) + re.NotNil(am) + + // Make sure every keyspace group is using the right timestamp path + // for loading/saving timestamp from/to etcd. + var timestampPath string + clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10) + if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID { + timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID) + } else { + timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp", + clusterID, param.keyspaceGroupID) + } + re.Equal(timestampPath, am.GetTimestampPath("")) + served = true } } @@ -196,6 +215,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe return true }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + // Create a client for each keyspace and make sure they can successfully discover the service + // provided by the corresponding keyspace group. keyspaceIDs := make([]uint32, 0) for _, param := range params { keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...) @@ -254,9 +275,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { splitTS, err = suite.requestTSO(re, 1, 222, 2) return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0 }) + splitTS, err = suite.requestTSO(re, 1, 222, 2) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) - // Finish the split. - handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) } func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( @@ -264,10 +284,11 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( count, keyspaceID, keyspaceGroupID uint32, ) (pdpb.Timestamp, error) { primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID) - tam, err := primary.GetTSOAllocatorManager(keyspaceGroupID) + kgm := primary.GetKeyspaceGroupManager() + re.NotNil(kgm) + ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, count) re.NoError(err) - re.NotNil(tam) - return tam.HandleRequest(tsopkg.GlobalDCLocation, count) + return ts, err } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() { diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 861aac9acd4..c57e3a032d1 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -114,6 +114,10 @@ func (suite *tsoClientTestSuite) SetupSuite() { {2, []uint32{2}}, } + for _, keyspaceGroup := range suite.keyspaceGroups { + suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) + } + for _, param := range suite.keyspaceGroups { if param.keyspaceGroupID == 0 { // we have already created default keyspace group, so we can skip it. @@ -133,16 +137,22 @@ func (suite *tsoClientTestSuite) SetupSuite() { }) } - for _, keyspaceGroup := range suite.keyspaceGroups { - suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) - } + suite.waitForAllKeyspaceGroupsInServing(re) + } +} - // Make sure all keyspace groups are available. - testutil.Eventually(re, func() bool { - for _, keyspaceID := range suite.keyspaceIDs { +func (suite *tsoClientTestSuite) waitForAllKeyspaceGroupsInServing(re *require.Assertions) { + // The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups + // are available for serving tso requests from corresponding keyspaces by querying + // IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id + // in the query, it will always return true as the keyspace will be served by default keyspace + // group before the keyspace groups are loaded. + testutil.Eventually(re, func() bool { + for _, keyspaceGroup := range suite.keyspaceGroups { + for _, keyspaceID := range keyspaceGroup.keyspaceIDs { served := false for _, server := range suite.tsoCluster.GetServers() { - if server.IsKeyspaceServing(keyspaceID, mcsutils.DefaultKeyspaceGroupID) { + if server.IsKeyspaceServing(keyspaceID, keyspaceGroup.keyspaceGroupID) { served = true break } @@ -151,14 +161,14 @@ func (suite *tsoClientTestSuite) SetupSuite() { return false } } - return true - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - // Create clients and make sure they all have discovered the tso service. - suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( - suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) - re.Equal(len(suite.keyspaceIDs), len(suite.clients)) - } + // Create clients and make sure they all have discovered the tso service. + suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } func (suite *tsoClientTestSuite) TearDownSuite() { @@ -245,9 +255,8 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { // TestGetMinTS tests the correctness of GetMinTS. func (suite *tsoClientTestSuite) TestGetMinTS() { - // Skip this test for the time being due to https://github.com/tikv/pd/issues/6453 - // TODO: fix it #6453 - suite.T().SkipNow() + re := suite.Require() + suite.waitForAllKeyspaceGroupsInServing(re) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) @@ -258,9 +267,9 @@ func (suite *tsoClientTestSuite) TestGetMinTS() { var lastMinTS uint64 for j := 0; j < tsoRequestRound; j++ { physical, logical, err := client.GetMinTS(suite.ctx) - suite.NoError(err) + re.NoError(err) minTS := tsoutil.ComposeTS(physical, logical) - suite.Less(lastMinTS, minTS) + re.Less(lastMinTS, minTS) lastMinTS = minTS // Now we check whether the returned ts is the minimum one @@ -268,9 +277,9 @@ func (suite *tsoClientTestSuite) TestGetMinTS() { // less than the new timestamps of all keyspace groups. for _, client := range suite.clients { physical, logical, err := client.GetTS(suite.ctx) - suite.NoError(err) + re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) - suite.Less(minTS, ts) + re.Less(minTS, ts) } } }(client) diff --git a/tests/integrations/tso/testutil.go b/tests/integrations/tso/testutil.go index 55e24fd1e58..2a4e5eabd90 100644 --- a/tests/integrations/tso/testutil.go +++ b/tests/integrations/tso/testutil.go @@ -22,7 +22,7 @@ import ( const ( serverCount = 3 tsoRequestConcurrencyNumber = 5 - tsoRequestRound = 30 + tsoRequestRound = 300 tsoCount = 10 )