From 5d62787565f7d238f386fd702e89a1921060f545 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 11 Dec 2024 16:17:27 +0800 Subject: [PATCH] tso/local: remove local tso completely (#8864) close tikv/pd#8802 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 16 ++--- client/clients/tso/client.go | 16 ++--- pkg/mcs/tso/server/config.go | 5 +- pkg/mcs/tso/server/config_test.go | 5 -- pkg/mcs/tso/server/server.go | 9 --- pkg/storage/endpoint/service_middleware.go | 4 +- pkg/storage/endpoint/tso.go | 6 +- pkg/tso/allocator_manager.go | 30 ---------- pkg/tso/global_allocator.go | 22 +------ pkg/tso/keyspace_group_manager.go | 6 +- pkg/tso/keyspace_group_manager_test.go | 1 - pkg/tso/testutil.go | 1 - pkg/tso/tso.go | 58 ++++--------------- pkg/utils/keypath/key_path.go | 14 ----- pkg/utils/tsoutil/tso_dispatcher.go | 14 ++--- pkg/utils/tsoutil/tso_request.go | 16 +++-- server/config/config.go | 7 +-- server/server.go | 5 -- tests/cluster.go | 5 -- .../mcs/tso/keyspace_group_manager_test.go | 2 - 20 files changed, 53 insertions(+), 189 deletions(-) diff --git a/client/client.go b/client/client.go index c271f10591d..519fd478bb3 100644 --- a/client/client.go +++ b/client/client.go @@ -509,10 +509,10 @@ func (c *client) GetTSAsync(ctx context.Context) tso.TSFuture { return c.inner.dispatchTSORequestWithRetry(ctx) } -// GetLocalTSAsync implements the TSOClient interface. -// -// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the -// parameters passed in, this method will default to returning the global TSO. +// Deprecated: the Local TSO feature has been deprecated. Regardless of the +// parameters passed, the behavior of this interface will be equivalent to +// `GetTSAsync`. If you want to use a separately deployed TSO service, +// please refer to the deployment of the TSO microservice. func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture { return c.GetTSAsync(ctx) } @@ -523,10 +523,10 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err return resp.Wait() } -// GetLocalTS implements the TSOClient interface. -// -// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the -// parameters passed in, this method will default to returning the global TSO. +// Deprecated: the Local TSO feature has been deprecated. Regardless of the +// parameters passed, the behavior of this interface will be equivalent to +// `GetTS`. If you want to use a separately deployed TSO service, +// please refer to the deployment of the TSO microservice. func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) { return c.GetTS(ctx) } diff --git a/client/clients/tso/client.go b/client/clients/tso/client.go index 9c7075fe3bb..68e2163d191 100644 --- a/client/clients/tso/client.go +++ b/client/clients/tso/client.go @@ -56,15 +56,15 @@ type Client interface { // the TSO microservice. GetMinTS(ctx context.Context) (int64, int64, error) - // GetLocalTS gets a local timestamp from PD or TSO microservice. - // - // Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the - // parameters passed in, this method will default to returning the global TSO. + // Deprecated: the Local TSO feature has been deprecated. Regardless of the + // parameters passed, the behavior of this interface will be equivalent to + // `GetTS`. If you want to use a separately deployed TSO service, + // please refer to the deployment of the TSO microservice. GetLocalTS(ctx context.Context, _ string) (int64, int64, error) - // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. - // - // Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the - // parameters passed in, this method will default to returning the global TSO. + // Deprecated: the Local TSO feature has been deprecated. Regardless of the + // parameters passed, the behavior of this interface will be equivalent to + // `GetTSAsync`. If you want to use a separately deployed TSO service, + // please refer to the deployment of the TSO microservice. GetLocalTSAsync(ctx context.Context, _ string) TSFuture } diff --git a/pkg/mcs/tso/server/config.go b/pkg/mcs/tso/server/config.go index 0973042b912..2aaa54114da 100644 --- a/pkg/mcs/tso/server/config.go +++ b/pkg/mcs/tso/server/config.go @@ -64,10 +64,7 @@ type Config struct { // the primary/leader again. Etcd only supports seconds TTL, so here is second too. LeaderLease int64 `toml:"lease" json:"lease"` - // EnableLocalTSO is used to enable the Local TSO Allocator feature, - // which allows the PD server to generate Local TSO for certain DC-level transactions. - // To make this feature meaningful, user has to set the "zone" label for the PD server - // to indicate which DC this PD belongs to. + // Deprecated EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"` // TSOSaveInterval is the interval to save timestamp. diff --git a/pkg/mcs/tso/server/config_test.go b/pkg/mcs/tso/server/config_test.go index 2bd27a67492..2bafec30aa9 100644 --- a/pkg/mcs/tso/server/config_test.go +++ b/pkg/mcs/tso/server/config_test.go @@ -36,7 +36,6 @@ func TestConfigBasic(t *testing.T) { re.Equal(defaultBackendEndpoints, cfg.BackendEndpoints) re.Equal(defaultListenAddr, cfg.ListenAddr) re.Equal(constant.DefaultLeaderLease, cfg.LeaderLease) - re.False(cfg.EnableLocalTSO) re.True(cfg.EnableGRPCGateway) re.Equal(defaultTSOSaveInterval, cfg.TSOSaveInterval.Duration) re.Equal(defaultTSOUpdatePhysicalInterval, cfg.TSOUpdatePhysicalInterval.Duration) @@ -48,7 +47,6 @@ func TestConfigBasic(t *testing.T) { cfg.ListenAddr = "test-listen-addr" cfg.AdvertiseListenAddr = "test-advertise-listen-addr" cfg.LeaderLease = 123 - cfg.EnableLocalTSO = true cfg.TSOSaveInterval.Duration = time.Duration(10) * time.Second cfg.TSOUpdatePhysicalInterval.Duration = time.Duration(100) * time.Millisecond cfg.MaxResetTSGap.Duration = time.Duration(1) * time.Hour @@ -58,7 +56,6 @@ func TestConfigBasic(t *testing.T) { re.Equal("test-listen-addr", cfg.GetListenAddr()) re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr()) re.Equal(int64(123), cfg.GetLeaderLease()) - re.True(cfg.EnableLocalTSO) re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration) re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration) re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration) @@ -74,7 +71,6 @@ name = "tso-test-name" data-dir = "/var/lib/tso" enable-grpc-gateway = false lease = 123 -enable-local-tso = true tso-save-interval = "10s" tso-update-physical-interval = "100ms" max-gap-reset-ts = "1h" @@ -92,7 +88,6 @@ max-gap-reset-ts = "1h" re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr()) re.Equal("/var/lib/tso", cfg.DataDir) re.Equal(int64(123), cfg.GetLeaderLease()) - re.True(cfg.EnableLocalTSO) re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration) re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration) re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 04e81c2d48e..d2974075e94 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -272,15 +272,6 @@ func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorM return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID) } -// IsLocalRequest checks if the forwarded host is the current host -func (*Server) IsLocalRequest(forwardedHost string) bool { - // TODO: Check if the forwarded host is the current host. - // The logic is depending on etcd service mode -- if the TSO service - // uses the embedded etcd, check against ClientUrls; otherwise check - // against the cluster membership. - return forwardedHost == "" -} - // ValidateInternalRequest checks if server is closed, which is used to validate // the gRPC communication between TSO servers internally. // TODO: Check if the sender is from the global TSO allocator diff --git a/pkg/storage/endpoint/service_middleware.go b/pkg/storage/endpoint/service_middleware.go index 3859dab4d62..35f0606f9d0 100644 --- a/pkg/storage/endpoint/service_middleware.go +++ b/pkg/storage/endpoint/service_middleware.go @@ -29,7 +29,7 @@ type ServiceMiddlewareStorage interface { var _ ServiceMiddlewareStorage = (*StorageEndpoint)(nil) -// LoadServiceMiddlewareConfig loads service middleware config from keypath.KeyspaceGroupLocalTSPath then unmarshal it to cfg. +// LoadServiceMiddlewareConfig loads service middleware config from ServiceMiddlewarePath then unmarshal it to cfg. func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { value, err := se.Load(keypath.ServiceMiddlewarePath) if err != nil || value == "" { @@ -42,7 +42,7 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { return true, nil } -// SaveServiceMiddlewareConfig stores marshallable cfg to the keypath.KeyspaceGroupLocalTSPath. +// SaveServiceMiddlewareConfig stores marshallable cfg to the ServiceMiddlewarePath. func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg any) error { return se.saveJSON(keypath.ServiceMiddlewarePath, cfg) } diff --git a/pkg/storage/endpoint/tso.go b/pkg/storage/endpoint/tso.go index a656f6d2945..77841529e98 100644 --- a/pkg/storage/endpoint/tso.go +++ b/pkg/storage/endpoint/tso.go @@ -37,9 +37,9 @@ type TSOStorage interface { var _ TSOStorage = (*StorageEndpoint)(nil) -// LoadTimestamp will get all time windows of Local/Global TSOs from etcd and return the biggest one. -// For the Global TSO, loadTimestamp will get all Local and Global TSO time windows persisted in etcd and choose the biggest one. -// For the Local TSO, loadTimestamp will only get its own dc-location time window persisted before. +// LoadTimestamp will get all time windows of Global TSOs from etcd and return the biggest one. +// TODO: Due to local TSO is deprecated, maybe we do not need to load timestamp +// by prefix, we can just load the timestamp by the key. func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) { prefixEnd := clientv3.GetPrefixRangeEnd(prefix) keys, values, err := se.LoadRange(prefix, prefixEnd, 0) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 8d5589143aa..65f61e819d1 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -16,8 +16,6 @@ package tso import ( "context" - "math" - "path" "runtime/trace" "strconv" "sync" @@ -43,8 +41,6 @@ const ( checkStep = time.Minute patrolStep = time.Second defaultAllocatorLeaderLease = 3 - localTSOAllocatorEtcdPrefix = "lta" - localTSOSuffixEtcdPrefix = "lts" ) var ( @@ -217,17 +213,6 @@ func (am *AllocatorManager) getGroupIDStr() string { return strconv.FormatUint(uint64(am.kgID), 10) } -// GetTimestampPath returns the timestamp path in etcd. -func (am *AllocatorManager) GetTimestampPath() string { - if am == nil { - return "" - } - - am.mu.RLock() - defer am.mu.RUnlock() - return path.Join(am.rootPath, am.mu.allocatorGroup.allocator.GetTimestampPath()) -} - // tsoAllocatorLoop is used to run the TSO Allocator updating daemon. func (am *AllocatorManager) tsoAllocatorLoop() { defer logutil.LogPanic() @@ -254,21 +239,6 @@ func (am *AllocatorManager) GetMember() ElectionMember { return am.member } -// GetSuffixBits calculates the bits of suffix sign -// by the max number of suffix so far, -// which will be used in the TSO logical part. -func (am *AllocatorManager) GetSuffixBits() int { - am.mu.RLock() - defer am.mu.RUnlock() - return CalSuffixBits(am.mu.maxSuffix) -} - -// CalSuffixBits calculates the bits of suffix by the max suffix sign. -func CalSuffixBits(maxSuffix int32) int { - // maxSuffix + 1 because we have the Global TSO holds 0 as the suffix sign - return int(math.Ceil(math.Log2(float64(maxSuffix + 1)))) -} - // AllocatorDaemon is used to update every allocator's TSO and check whether we have // any new local allocator that needs to be set up. func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 740317c676a..2fe0df3e000 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -47,14 +47,6 @@ type Allocator interface { IsInitialize() bool // UpdateTSO is used to update the TSO in memory and the time window in etcd. UpdateTSO() error - // GetTimestampPath returns the timestamp path in etcd, which is: - // 1. for the default keyspace group: - // a. timestamp in /pd/{cluster_id}/timestamp - // b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp - // 1. for the non-default keyspace groups: - // a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp - // b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp - GetTimestampPath() string // SetTSO sets the physical part with given TSO. It's mainly used for BR restore. // Cannot set the TSO smaller than now in any case. // if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error @@ -68,6 +60,8 @@ type Allocator interface { } // GlobalTSOAllocator is the global single point TSO allocator. +// TODO: Local TSO allocator is deprecated now, we can update the name to +// TSOAllocator and remove the `Global` concept. type GlobalTSOAllocator struct { ctx context.Context cancel context.CancelFunc @@ -132,19 +126,9 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 { return gta.am.getGroupID() } -// GetTimestampPath returns the timestamp path in etcd. -func (gta *GlobalTSOAllocator) GetTimestampPath() string { - if gta == nil || gta.timestampOracle == nil { - return "" - } - return gta.timestampOracle.GetTimestampPath() -} - // Initialize will initialize the created global TSO allocator. func (gta *GlobalTSOAllocator) Initialize(int) error { gta.tsoAllocatorRoleGauge.Set(1) - // The suffix of a Global TSO should always be 0. - gta.timestampOracle.suffix = 0 return gta.timestampOracle.SyncTimestamp() } @@ -175,7 +159,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr)) } - return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0) + return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count) } // Reset is used to reset the TSO allocator. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 86b43d0de45..bb5fb4587f7 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -66,7 +66,7 @@ const ( type state struct { syncutil.RWMutex // ams stores the allocator managers of the keyspace groups. Each keyspace group is - // assigned with an allocator manager managing its global/local tso allocators. + // assigned with an allocator manager managing its global tso allocators. // Use a fixed size array to maximize the efficiency of concurrent access to // different keyspace groups for tso service. ams [constant.MaxKeyspaceGroupCountInUse]*AllocatorManager @@ -790,8 +790,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) am.startGlobalAllocatorLoop() log.Info("created allocator manager", - zap.Uint32("keyspace-group-id", group.ID), - zap.String("timestamp-path", am.GetTimestampPath())) + zap.Uint32("keyspace-group-id", group.ID)) kgm.Lock() group.KeyspaceLookupTable = make(map[uint32]struct{}) for _, kid := range group.Keyspaces { @@ -1517,7 +1516,6 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() { log.Info("delete the keyspace group tso key", zap.Uint32("keyspace-group-id", groupID)) // Clean up the remaining TSO keys. - // TODO: support the Local TSO Allocator clean up. err := kgm.tsoSvcStorage.DeleteTimestamp( keypath.TimestampPath( keypath.KeyspaceGroupGlobalTSPath(groupID), diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index dea0b00f4f0..be3d53785cd 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -87,7 +87,6 @@ func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig { ListenAddr: addr, AdvertiseListenAddr: addr, LeaderLease: constant.DefaultLeaderLease, - LocalTSOEnabled: false, TSOUpdatePhysicalInterval: 50 * time.Millisecond, TSOSaveInterval: time.Duration(constant.DefaultLeaderLease) * time.Second, MaxResetTSGap: time.Hour * 24, diff --git a/pkg/tso/testutil.go b/pkg/tso/testutil.go index e3d04f55813..336d1414d98 100644 --- a/pkg/tso/testutil.go +++ b/pkg/tso/testutil.go @@ -29,7 +29,6 @@ type TestServiceConfig struct { ListenAddr string // Address the service listens on. AdvertiseListenAddr string // Address the service advertises to the clients. LeaderLease int64 // Leader lease. - LocalTSOEnabled bool // Whether local TSO is enabled. TSOUpdatePhysicalInterval time.Duration // Interval to update TSO in physical storage. TSOSaveInterval time.Duration // Interval to save TSO to physical storage. MaxResetTSGap time.Duration // Maximum gap to reset TSO. diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 38a4c989093..0210c98626b 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -43,8 +43,6 @@ const ( // When a TSO's logical time reaches this limit, // the physical time will be forced to increase. maxLogical = int64(1 << 18) - // MaxSuffixBits indicates the max number of suffix bits. - MaxSuffixBits = 4 // jetLagWarningThreshold is the warning threshold of jetLag in `timestampOracle.UpdateTimestamp`. // In case of small `updatePhysicalInterval`, the `3 * updatePhysicalInterval` would also is small, // and trigger unnecessary warnings about clock offset. @@ -55,9 +53,8 @@ const ( // tsoObject is used to store the current TSO in memory with a RWMutex lock. type tsoObject struct { syncutil.RWMutex - physical time.Time - logical int64 - updateTime time.Time + physical time.Time + logical int64 } // timestampOracle is used to maintain the logic of TSO. @@ -75,7 +72,6 @@ type timestampOracle struct { tsoMux *tsoObject // last timestamp window stored in etcd lastSavedTime atomic.Value // stored as time.Time - suffix int // pre-initialized metrics metrics *tsoMetrics @@ -92,7 +88,6 @@ func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) { if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 { t.tsoMux.physical = next t.tsoMux.logical = 0 - t.tsoMux.updateTime = time.Now() } } @@ -106,23 +101,17 @@ func (t *timestampOracle) getTSO() (time.Time, int64) { } // generateTSO will add the TSO's logical part with the given count and returns the new TSO result. -func (t *timestampOracle) generateTSO(ctx context.Context, count int64, suffixBits int) (physical int64, logical int64, lastUpdateTime time.Time) { +func (t *timestampOracle) generateTSO(ctx context.Context, count int64) (physical int64, logical int64) { defer trace.StartRegion(ctx, "timestampOracle.generateTSO").End() t.tsoMux.Lock() defer t.tsoMux.Unlock() if t.tsoMux.physical == typeutil.ZeroTime { - return 0, 0, typeutil.ZeroTime + return 0, 0 } physical = t.tsoMux.physical.UnixNano() / int64(time.Millisecond) t.tsoMux.logical += count logical = t.tsoMux.logical - if suffixBits > 0 && t.suffix >= 0 { - logical = t.calibrateLogical(logical, suffixBits) - } - // Return the last update time - lastUpdateTime = t.tsoMux.updateTime - t.tsoMux.updateTime = time.Now() - return physical, logical, lastUpdateTime + return physical, logical } func (t *timestampOracle) getLastSavedTime() time.Time { @@ -133,28 +122,6 @@ func (t *timestampOracle) getLastSavedTime() time.Time { return last.(time.Time) } -// Because the Local TSO in each Local TSO Allocator is independent, so they are possible -// to be the same at sometimes, to avoid this case, we need to use the logical part of the -// Local TSO to do some differentiating work. -// For example, we have three DCs: dc-1, dc-2 and dc-3. The bits of suffix is defined by -// the const suffixBits. Then, for dc-2, the suffix may be 1 because it's persisted -// in etcd with the value of 1. -// Once we get a normal TSO like this (18 bits): xxxxxxxxxxxxxxxxxx. We will make the TSO's -// low bits of logical part from each DC looks like: -// -// global: xxxxxxxxxx00000000 -// dc-1: xxxxxxxxxx00000001 -// dc-2: xxxxxxxxxx00000010 -// dc-3: xxxxxxxxxx00000011 -func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int64 { - return rawLogical< 0)) @@ -209,7 +176,7 @@ func (t *timestampOracle) SyncTimestamp() error { }) save := next.Add(t.saveInterval) start := time.Now() - if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + if err = t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { t.metrics.errSaveSyncTSEvent.Inc() return err } @@ -277,7 +244,7 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), nextPhysical) <= UpdateTimestampGuard { save := nextPhysical.Add(t.saveInterval) start := time.Now() - if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + if err := t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { t.metrics.errSaveResetTSEvent.Inc() return err } @@ -287,7 +254,6 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts // save into memory only if nextPhysical or nextLogical is greater. t.tsoMux.physical = nextPhysical t.tsoMux.logical = int64(nextLogical) - t.tsoMux.updateTime = time.Now() t.metrics.resetTSOOKEvent.Inc() return nil } @@ -361,10 +327,10 @@ func (t *timestampOracle) UpdateTimestamp() error { if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard { save := next.Add(t.saveInterval) start := time.Now() - if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + if err := t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { log.Warn("save timestamp failed", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), - zap.String("timestamp-path", t.GetTimestampPath()), + zap.String("timestamp-path", keypath.TimestampPath(t.tsPath)), zap.Error(err)) t.metrics.errSaveUpdateTSEvent.Inc() return err @@ -381,7 +347,7 @@ func (t *timestampOracle) UpdateTimestamp() error { var maxRetryCount = 10 // getTS is used to get a timestamp. -func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) { +func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leadership, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "timestampOracle.getTS").End() var resp pdpb.Timestamp if count == 0 { @@ -399,7 +365,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized") } // Get a new TSO result with the given count - resp.Physical, resp.Logical, _ = t.generateTSO(ctx, int64(count), suffixBits) + resp.Physical, resp.Logical = t.generateTSO(ctx, int64(count)) if resp.GetPhysical() == 0 { return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset") } @@ -416,7 +382,6 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader if !leadership.Check() { return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr)) } - resp.SuffixBits = uint32(suffixBits) return resp, nil } t.metrics.exceededMaxRetryEvent.Inc() @@ -430,6 +395,5 @@ func (t *timestampOracle) ResetTimestamp() { log.Info("reset the timestamp in memory", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0)) t.tsoMux.physical = typeutil.ZeroTime t.tsoMux.logical = 0 - t.tsoMux.updateTime = typeutil.ZeroTime t.lastSavedTime.Store(typeutil.ZeroTime) } diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index 4d59fafe16f..1a56ad3330a 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -370,20 +370,6 @@ func TimestampPath(tsPath string) string { return path.Join(tsPath, TimestampKey) } -// FullTimestampPath returns the full timestamp path. -// 1. for the default keyspace group: -// /pd/{cluster_id}/timestamp -// 2. for the non-default keyspace groups: -// /ms/{cluster_id}/tso/{group}/gta/timestamp -func FullTimestampPath(groupID uint32) string { - rootPath := TSOSvcRootPath() - tsPath := TimestampPath(KeyspaceGroupGlobalTSPath(groupID)) - if groupID == constant.DefaultKeyspaceGroupID { - rootPath = LegacyRootPath() - } - return path.Join(rootPath, tsPath) -} - const ( registryKey = "registry" ) diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index c4aa96274e1..86afe75e6c5 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -171,23 +171,23 @@ func (s *TSODispatcher) processRequests(forwardStream stream, requests []Request s.tsoProxyBatchSize.Observe(float64(count)) // Split the response ts := resp.GetTimestamp() - physical, logical, suffixBits := ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() + physical, logical := ts.GetPhysical(), ts.GetLogical() // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. // This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10, // count is 5, then the splitting results should be 5 and 10. - firstLogical := addLogical(logical, -int64(count), suffixBits) - return s.finishRequest(requests, physical, firstLogical, suffixBits) + firstLogical := addLogical(logical, -int64(count)) + return s.finishRequest(requests, physical, firstLogical) } // Because of the suffix, we need to shift the count before we add it to the logical part. -func addLogical(logical, count int64, suffixBits uint32) int64 { - return logical + count<