From 69c73c18a438ac338c7a56311c15264182727228 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 31 Oct 2024 11:41:33 +0800 Subject: [PATCH] support config tso switch Signed-off-by: Ryan Leung --- client/pd_service_discovery.go | 3 + server/cluster/cluster.go | 7 +- server/config/config.go | 8 +- server/server.go | 2 +- tests/integrations/mcs/tso/server_test.go | 93 +++++++++++++++++++++++ 5 files changed, 106 insertions(+), 7 deletions(-) mode change 100644 => 100755 client/pd_service_discovery.go diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go old mode 100644 new mode 100755 index 872b8e0ad0b5..1c42f9b46c02 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -575,6 +575,9 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() ticker := time.NewTicker(serviceModeUpdateInterval) + failpoint.Inject("fastUpdateServiceMode", func() { + ticker.Reset(10 * time.Millisecond) + }) defer ticker.Stop() for { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 46a525a3e09d..39692f11e4ae 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -114,9 +114,6 @@ const ( heartbeatTaskRunner = "heartbeat-async" miscTaskRunner = "misc-async" logTaskRunner = "log-async" - - // TODO: make it configurable - IsTSODynamicSwitchingEnabled = false ) // Server is the interface for cluster. @@ -412,7 +409,7 @@ func (c *RaftCluster) checkSchedulingService() { // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { if c.isAPIServiceMode { - if IsTSODynamicSwitchingEnabled { + if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) if err != nil || len(servers) == 0 { if err := c.startTSOJobsIfNeeded(); err != nil { @@ -422,7 +419,7 @@ func (c *RaftCluster) checkTSOService() { log.Info("TSO is provided by PD") c.UnsetServiceIndependent(constant.TSOServiceName) } else { - if err := c.startTSOJobsIfNeeded(); err != nil { + if err := c.stopTSOJobsIfNeeded(); err != nil { log.Error("failed to stop TSO jobs", errs.ZapError(err)) return } diff --git a/server/config/config.go b/server/config/config.go index c64ee3831b00..0b80cbebbfd8 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -854,7 +854,8 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { // MicroServiceConfig is the configuration for micro service. type MicroServiceConfig struct { - EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"` } func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) { @@ -874,6 +875,11 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { return c.EnableSchedulingFallback } +// IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching. +func (c *MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool { + return c.EnableTSODynamicSwitching +} + // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/server.go b/server/server.go index c88871658dc7..029c85694c36 100644 --- a/server/server.go +++ b/server/server.go @@ -1411,7 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { if s.mode == APIServiceMode && !s.IsClosed() { - if name == constant.TSOServiceName && !cluster.IsTSODynamicSwitchingEnabled { + if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { return true } return s.cluster.IsServiceIndependent(name) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index aa767ecfbef6..49fb3150dadd 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -40,6 +40,7 @@ import ( "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" clientv3 "go.etcd.io/etcd/client/v3" @@ -575,3 +576,95 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() { suite.pdLeader.ResignLeader() suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) } + +// TestTSOServiceSwitch tests the behavior of TSO service switching when `EnableTSODynamicSwitching` is enabled. +// Initially, the TSO service should be provided by PD. After starting a TSO server, the service should switch to the TSO server. +// When the TSO server is stopped, the PD should resume providing the TSO service if `EnableTSODynamicSwitching` is enabled. +// If `EnableTSODynamicSwitching` is disabled, the PD should not provide TSO service after the TSO server is stopped. +func TestTSOServiceSwitch(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, err := tests.NewTestAPICluster(ctx, 1, + func(conf *config.Config, _ string) { + conf.MicroService.EnableTSODynamicSwitching = true + }, + ) + re.NoError(err) + defer tc.Destroy() + + err = tc.RunInitialServers() + re.NoError(err) + leaderName := tc.WaitLeader() + re.NotEmpty(leaderName) + pdLeader := tc.GetServer(leaderName) + backendEndpoints := pdLeader.GetAddr() + re.NoError(pdLeader.BootstrapCluster()) + pdClient, err := pd.NewClientWithContext(ctx, []string{backendEndpoints}, pd.SecurityOption{}) + re.NoError(err) + re.NotNil(pdClient) + defer pdClient.Close() + + var globalLastTS uint64 + // Initially, TSO service should be provided by PD + re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)) + + // Start TSO server + tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, pdLeader.GetAddr()) + re.NoError(err) + tsoCluster.WaitForDefaultPrimaryServing(re) + + // Wait for TSO server to start and PD to detect it + time.Sleep(300 * time.Millisecond) + + // Verify PD is not providing TSO service + err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10) + re.NoError(err) + + // Disable TSO switching + cfg := pdLeader.GetServer().GetMicroServiceConfig().Clone() + cfg.EnableTSODynamicSwitching = false + pdLeader.GetServer().SetMicroServiceConfig(*cfg) + + tsoCluster.Destroy() + + // Wait for the configuration change to take effect + time.Sleep(300 * time.Millisecond) + // Verify PD is not providing TSO service multiple times + for i := 0; i < 10; i++ { + err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 1) + re.Error(err, "TSO service should not be available") + time.Sleep(10 * time.Millisecond) + } + + // Now enable TSO switching + cfg = pdLeader.GetServer().GetMicroServiceConfig().Clone() + + cfg.EnableTSODynamicSwitching = true + pdLeader.GetServer().SetMicroServiceConfig(*cfg) + + // Wait for PD to detect the change + time.Sleep(300 * time.Millisecond) + + // Verify PD is now providing TSO service and timestamps are monotonically increasing + re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) +} + +func checkTSOMonotonic(ctx context.Context, pdClient pd.Client, globalLastTS *uint64, count int) error { + fmt.Println("start to request TSO") + for i := 0; i < count; i++ { + physical, logical, err := pdClient.GetTS(ctx) + if err != nil { + return err + } + ts := (uint64(physical) << 18) + uint64(logical) + if ts <= *globalLastTS { + return fmt.Errorf("TSO is not globally increasing: last %d, current %d", globalLastTS, ts) + } + *globalLastTS = ts + } + return nil +}