Skip to content

Commit

Permalink
support config tso switch
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 31, 2024
1 parent de92fc5 commit 69c73c1
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 7 deletions.
3 changes: 3 additions & 0 deletions client/pd_service_discovery.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(serviceModeUpdateInterval)
failpoint.Inject("fastUpdateServiceMode", func() {
ticker.Reset(10 * time.Millisecond)
})
defer ticker.Stop()

for {
Expand Down
7 changes: 2 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ const (
heartbeatTaskRunner = "heartbeat-async"
miscTaskRunner = "misc-async"
logTaskRunner = "log-async"

// TODO: make it configurable
IsTSODynamicSwitchingEnabled = false
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -412,7 +409,7 @@ func (c *RaftCluster) checkSchedulingService() {
// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isAPIServiceMode {
if IsTSODynamicSwitchingEnabled {
if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
Expand All @@ -422,7 +419,7 @@ func (c *RaftCluster) checkTSOService() {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
} else {
if err := c.startTSOJobsIfNeeded(); err != nil {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
Expand Down
8 changes: 7 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,8 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {

// MicroServiceConfig is the configuration for micro service.
type MicroServiceConfig struct {
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"`
}

func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) {
Expand All @@ -874,6 +875,11 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}

// IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching.
func (c *MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool {
return c.EnableTSODynamicSwitching
}

// KeyspaceConfig is the configuration for keyspace management.
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {
// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.mode == APIServiceMode && !s.IsClosed() {
if name == constant.TSOServiceName && !cluster.IsTSODynamicSwitchingEnabled {
if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
return true
}
return s.cluster.IsServiceIndependent(name)
Expand Down
93 changes: 93 additions & 0 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -575,3 +576,95 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() {
suite.pdLeader.ResignLeader()
suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader())
}

// TestTSOServiceSwitch tests the behavior of TSO service switching when `EnableTSODynamicSwitching` is enabled.
// Initially, the TSO service should be provided by PD. After starting a TSO server, the service should switch to the TSO server.
// When the TSO server is stopped, the PD should resume providing the TSO service if `EnableTSODynamicSwitching` is enabled.
// If `EnableTSODynamicSwitching` is disabled, the PD should not provide TSO service after the TSO server is stopped.
func TestTSOServiceSwitch(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tc, err := tests.NewTestAPICluster(ctx, 1,
func(conf *config.Config, _ string) {
conf.MicroService.EnableTSODynamicSwitching = true
},
)
re.NoError(err)
defer tc.Destroy()

err = tc.RunInitialServers()
re.NoError(err)
leaderName := tc.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := tc.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()
re.NoError(pdLeader.BootstrapCluster())
pdClient, err := pd.NewClientWithContext(ctx, []string{backendEndpoints}, pd.SecurityOption{})
re.NoError(err)
re.NotNil(pdClient)
defer pdClient.Close()

var globalLastTS uint64
// Initially, TSO service should be provided by PD
re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10))

// Start TSO server
tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, pdLeader.GetAddr())
re.NoError(err)
tsoCluster.WaitForDefaultPrimaryServing(re)

// Wait for TSO server to start and PD to detect it
time.Sleep(300 * time.Millisecond)

// Verify PD is not providing TSO service
err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)
re.NoError(err)

// Disable TSO switching
cfg := pdLeader.GetServer().GetMicroServiceConfig().Clone()
cfg.EnableTSODynamicSwitching = false
pdLeader.GetServer().SetMicroServiceConfig(*cfg)

tsoCluster.Destroy()

// Wait for the configuration change to take effect
time.Sleep(300 * time.Millisecond)
// Verify PD is not providing TSO service multiple times
for i := 0; i < 10; i++ {
err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 1)
re.Error(err, "TSO service should not be available")
time.Sleep(10 * time.Millisecond)
}

// Now enable TSO switching
cfg = pdLeader.GetServer().GetMicroServiceConfig().Clone()

cfg.EnableTSODynamicSwitching = true
pdLeader.GetServer().SetMicroServiceConfig(*cfg)

// Wait for PD to detect the change
time.Sleep(300 * time.Millisecond)

// Verify PD is now providing TSO service and timestamps are monotonically increasing
re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
}

func checkTSOMonotonic(ctx context.Context, pdClient pd.Client, globalLastTS *uint64, count int) error {
fmt.Println("start to request TSO")
for i := 0; i < count; i++ {
physical, logical, err := pdClient.GetTS(ctx)
if err != nil {
return err
}
ts := (uint64(physical) << 18) + uint64(logical)
if ts <= *globalLastTS {
return fmt.Errorf("TSO is not globally increasing: last %d, current %d", globalLastTS, ts)
}
*globalLastTS = ts
}
return nil
}

0 comments on commit 69c73c1

Please sign in to comment.