Skip to content

Commit

Permalink
*: fix the split problem caused by no enough replicas (tikv#6555)
Browse files Browse the repository at this point in the history
close tikv#6550

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 2, 2023
1 parent f0fb9b1 commit 4b23e72
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 6 deletions.
8 changes: 7 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
ticker = time.NewTicker(time.Millisecond * 100)
})
defer ticker.Stop()
log.Info("start to alloc nodes to all keyspace groups")
for {
select {
case <-ctx.Done():
Expand All @@ -181,6 +182,10 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
log.Error("failed to load all keyspace groups", zap.Error(err))
continue
}
// if the default keyspace is not initialized, we should wait for the default keyspace to be initialized.
if len(groups) == 0 {
continue
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount {
Expand All @@ -195,14 +200,15 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}
if !withError {
// all keyspace groups have equal or more than default replica count
log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node")
return
}
}
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down
File renamed without changes.
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,13 +556,14 @@ func (s *Server) Run() error {
if err := s.startEtcd(s.ctx); err != nil {
return err
}
failpoint.Inject("delayStartServer", func() {
time.Sleep(2 * time.Second)
})

if err := s.startServer(s.ctx); err != nil {
return err
}

failpoint.Inject("delayStartServerLoop", func() {
time.Sleep(2 * time.Second)
})
s.startServerLoop(s.ctx)

return nil
Expand Down
33 changes: 33 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
Expand Down Expand Up @@ -85,6 +86,38 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err
return createTestServer(ctx, cfg, []string{utils.APIServiceName})
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tsoserver.Server, func(), error) {
cfg := tsoserver.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tsoserver.GenerateConfig(cfg)
re.NoError(err)
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return nil, nil, err
}
zapLogOnce.Do(func() {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
})
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tsoserver.Config) (*tsoserver.Server, testutil.CleanupFunc, error) {
s := tsoserver.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) {
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map

func TestTSOServerStartFirst(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServer", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -170,5 +170,5 @@ func TestTSOServerStartFirst(t *testing.T) {
re.Len(group.Keyspaces, 2)
re.Len(group.Members, 2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServer"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}

0 comments on commit 4b23e72

Please sign in to comment.