Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso/local: remove local tso completely #8864

Merged
merged 10 commits into from
Dec 11, 2024
16 changes: 8 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,10 @@ func (c *client) GetTSAsync(ctx context.Context) tso.TSFuture {
return c.inner.dispatchTSORequestWithRetry(ctx)
}

// GetLocalTSAsync implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTSAsync`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture {
return c.GetTSAsync(ctx)
}
Expand All @@ -523,10 +523,10 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
return resp.Wait()
}

// GetLocalTS implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTS`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) {
return c.GetTS(ctx)
}
Expand Down
16 changes: 8 additions & 8 deletions client/clients/tso/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ type Client interface {
// the TSO microservice.
GetMinTS(ctx context.Context) (int64, int64, error)

// GetLocalTS gets a local timestamp from PD or TSO microservice.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTS`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
GetLocalTS(ctx context.Context, _ string) (int64, int64, error)
// GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTSAsync`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
GetLocalTSAsync(ctx context.Context, _ string) TSFuture
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ type Config struct {
// the primary/leader again. Etcd only supports seconds TTL, so here is second too.
LeaderLease int64 `toml:"lease" json:"lease"`

// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
// to indicate which DC this PD belongs to.
// Deprecated
EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"`

// TSOSaveInterval is the interval to save timestamp.
Expand Down
5 changes: 0 additions & 5 deletions pkg/mcs/tso/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestConfigBasic(t *testing.T) {
re.Equal(defaultBackendEndpoints, cfg.BackendEndpoints)
re.Equal(defaultListenAddr, cfg.ListenAddr)
re.Equal(constant.DefaultLeaderLease, cfg.LeaderLease)
re.False(cfg.EnableLocalTSO)
re.True(cfg.EnableGRPCGateway)
re.Equal(defaultTSOSaveInterval, cfg.TSOSaveInterval.Duration)
re.Equal(defaultTSOUpdatePhysicalInterval, cfg.TSOUpdatePhysicalInterval.Duration)
Expand All @@ -48,7 +47,6 @@ func TestConfigBasic(t *testing.T) {
cfg.ListenAddr = "test-listen-addr"
cfg.AdvertiseListenAddr = "test-advertise-listen-addr"
cfg.LeaderLease = 123
cfg.EnableLocalTSO = true
cfg.TSOSaveInterval.Duration = time.Duration(10) * time.Second
cfg.TSOUpdatePhysicalInterval.Duration = time.Duration(100) * time.Millisecond
cfg.MaxResetTSGap.Duration = time.Duration(1) * time.Hour
Expand All @@ -58,7 +56,6 @@ func TestConfigBasic(t *testing.T) {
re.Equal("test-listen-addr", cfg.GetListenAddr())
re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr())
re.Equal(int64(123), cfg.GetLeaderLease())
re.True(cfg.EnableLocalTSO)
re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration)
re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration)
re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration)
Expand All @@ -74,7 +71,6 @@ name = "tso-test-name"
data-dir = "/var/lib/tso"
enable-grpc-gateway = false
lease = 123
enable-local-tso = true
tso-save-interval = "10s"
tso-update-physical-interval = "100ms"
max-gap-reset-ts = "1h"
Expand All @@ -92,7 +88,6 @@ max-gap-reset-ts = "1h"
re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr())
re.Equal("/var/lib/tso", cfg.DataDir)
re.Equal(int64(123), cfg.GetLeaderLease())
re.True(cfg.EnableLocalTSO)
re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration)
re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration)
re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration)
Expand Down
9 changes: 0 additions & 9 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,6 @@ func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorM
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
}

// IsLocalRequest checks if the forwarded host is the current host
func (*Server) IsLocalRequest(forwardedHost string) bool {
rleungx marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Check if the forwarded host is the current host.
// The logic is depending on etcd service mode -- if the TSO service
// uses the embedded etcd, check against ClientUrls; otherwise check
// against the cluster membership.
return forwardedHost == ""
}

// ValidateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between TSO servers internally.
// TODO: Check if the sender is from the global TSO allocator
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/endpoint/service_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ServiceMiddlewareStorage interface {

var _ ServiceMiddlewareStorage = (*StorageEndpoint)(nil)

// LoadServiceMiddlewareConfig loads service middleware config from keypath.KeyspaceGroupLocalTSPath then unmarshal it to cfg.
// LoadServiceMiddlewareConfig loads service middleware config from ServiceMiddlewarePath then unmarshal it to cfg.
func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) {
value, err := se.Load(keypath.ServiceMiddlewarePath)
if err != nil || value == "" {
Expand All @@ -42,7 +42,7 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) {
return true, nil
}

// SaveServiceMiddlewareConfig stores marshallable cfg to the keypath.KeyspaceGroupLocalTSPath.
// SaveServiceMiddlewareConfig stores marshallable cfg to the ServiceMiddlewarePath.
func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg any) error {
return se.saveJSON(keypath.ServiceMiddlewarePath, cfg)
}
6 changes: 3 additions & 3 deletions pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type TSOStorage interface {

var _ TSOStorage = (*StorageEndpoint)(nil)

// LoadTimestamp will get all time windows of Local/Global TSOs from etcd and return the biggest one.
// For the Global TSO, loadTimestamp will get all Local and Global TSO time windows persisted in etcd and choose the biggest one.
// For the Local TSO, loadTimestamp will only get its own dc-location time window persisted before.
// LoadTimestamp will get all time windows of Global TSOs from etcd and return the biggest one.
// TODO: Due to local TSO is deprecated, maybe we do not need to load timestamp
// by prefix, we can just load the timestamp by the key.
func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
Expand Down
30 changes: 0 additions & 30 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package tso

import (
"context"
"math"
"path"
"runtime/trace"
"strconv"
"sync"
Expand All @@ -43,8 +41,6 @@ const (
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
)

var (
Expand Down Expand Up @@ -217,17 +213,6 @@ func (am *AllocatorManager) getGroupIDStr() string {
return strconv.FormatUint(uint64(am.kgID), 10)
}

// GetTimestampPath returns the timestamp path in etcd.
func (am *AllocatorManager) GetTimestampPath() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

@okJiang okJiang Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetTimestampPath is only used to print log when created allocator manager now. I think

log.Info("created allocator manager",
		zap.Uint32("keyspace-group-id", group.ID))

is enough, do not have to know the TimestampPath.

Copy link
Member

@rleungx rleungx Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is important and we also use it in the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you say this comment?

	// GetTimestampPath returns the timestamp path in etcd, which is:
	// 1. for the default keyspace group:
	//     a. timestamp in /pd/{cluster_id}/timestamp
	//     b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp
	// 1. for the non-default keyspace groups:
	//     a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp
	//     b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
	GetTimestampPath() string

It is outdated. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp and {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp are removed. We can see the left in

// KeyspaceGroupGlobalTSPath constructs the timestampOracle path prefix for Global TSO, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func KeyspaceGroupGlobalTSPath(groupID uint32) string {
if groupID == constant.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. BTW, I found we still have some comments mentioning KeyspaceGroupLocalTSPath, please also remove it.

if am == nil {
return ""
}

am.mu.RLock()
defer am.mu.RUnlock()
return path.Join(am.rootPath, am.mu.allocatorGroup.allocator.GetTimestampPath())
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (am *AllocatorManager) tsoAllocatorLoop() {
defer logutil.LogPanic()
Expand All @@ -254,21 +239,6 @@ func (am *AllocatorManager) GetMember() ElectionMember {
return am.member
}

// GetSuffixBits calculates the bits of suffix sign
// by the max number of suffix so far,
// which will be used in the TSO logical part.
func (am *AllocatorManager) GetSuffixBits() int {
am.mu.RLock()
defer am.mu.RUnlock()
return CalSuffixBits(am.mu.maxSuffix)
}

// CalSuffixBits calculates the bits of suffix by the max suffix sign.
func CalSuffixBits(maxSuffix int32) int {
// maxSuffix + 1 because we have the Global TSO holds 0 as the suffix sign
return int(math.Ceil(math.Log2(float64(maxSuffix + 1))))
}

// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
Expand Down
22 changes: 3 additions & 19 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ type Allocator interface {
IsInitialize() bool
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// GetTimestampPath returns the timestamp path in etcd, which is:
// 1. for the default keyspace group:
// a. timestamp in /pd/{cluster_id}/timestamp
// b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 1. for the non-default keyspace groups:
// a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp
// b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
GetTimestampPath() string
// SetTSO sets the physical part with given TSO. It's mainly used for BR restore.
// Cannot set the TSO smaller than now in any case.
// if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error
Expand All @@ -68,6 +60,8 @@ type Allocator interface {
}

// GlobalTSOAllocator is the global single point TSO allocator.
// TODO: Local TSO allocator is deprecated now, we can update the name to
// TSOAllocator and remove the `Global` concept.
type GlobalTSOAllocator struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -132,19 +126,9 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 {
return gta.am.getGroupID()
}

// GetTimestampPath returns the timestamp path in etcd.
func (gta *GlobalTSOAllocator) GetTimestampPath() string {
if gta == nil || gta.timestampOracle == nil {
return ""
}
return gta.timestampOracle.GetTimestampPath()
}

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize(int) error {
gta.tsoAllocatorRoleGauge.Set(1)
// The suffix of a Global TSO should always be 0.
gta.timestampOracle.suffix = 0
return gta.timestampOracle.SyncTimestamp()
}

Expand Down Expand Up @@ -175,7 +159,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
}

return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0)
return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count)
}

// Reset is used to reset the TSO allocator.
Expand Down
6 changes: 2 additions & 4 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
type state struct {
syncutil.RWMutex
// ams stores the allocator managers of the keyspace groups. Each keyspace group is
// assigned with an allocator manager managing its global/local tso allocators.
// assigned with an allocator manager managing its global tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
// different keyspace groups for tso service.
ams [constant.MaxKeyspaceGroupCountInUse]*AllocatorManager
Expand Down Expand Up @@ -790,8 +790,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg)
am.startGlobalAllocatorLoop()
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath()))
zap.Uint32("keyspace-group-id", group.ID))
kgm.Lock()
group.KeyspaceLookupTable = make(map[uint32]struct{})
for _, kid := range group.Keyspaces {
Expand Down Expand Up @@ -1517,7 +1516,6 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
log.Info("delete the keyspace group tso key",
zap.Uint32("keyspace-group-id", groupID))
// Clean up the remaining TSO keys.
// TODO: support the Local TSO Allocator clean up.
err := kgm.tsoSvcStorage.DeleteTimestamp(
keypath.TimestampPath(
keypath.KeyspaceGroupGlobalTSPath(groupID),
Expand Down
1 change: 0 additions & 1 deletion pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig {
ListenAddr: addr,
AdvertiseListenAddr: addr,
LeaderLease: constant.DefaultLeaderLease,
LocalTSOEnabled: false,
TSOUpdatePhysicalInterval: 50 * time.Millisecond,
TSOSaveInterval: time.Duration(constant.DefaultLeaderLease) * time.Second,
MaxResetTSGap: time.Hour * 24,
Expand Down
1 change: 0 additions & 1 deletion pkg/tso/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type TestServiceConfig struct {
ListenAddr string // Address the service listens on.
AdvertiseListenAddr string // Address the service advertises to the clients.
LeaderLease int64 // Leader lease.
LocalTSOEnabled bool // Whether local TSO is enabled.
TSOUpdatePhysicalInterval time.Duration // Interval to update TSO in physical storage.
TSOSaveInterval time.Duration // Interval to save TSO to physical storage.
MaxResetTSGap time.Duration // Maximum gap to reset TSO.
Expand Down
Loading
Loading