Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix the split problem caused by no enough replicas #6555

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,11 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
ticker = time.NewTicker(time.Millisecond * 100)
})
defer ticker.Stop()
log.Info("start to alloc nodes to all keyspace groups")
for {
select {
case <-m.ctx.Done():
log.Info("stop to alloc nodes to all keyspace groups")
return
case <-ticker.C:
}
Expand All @@ -170,6 +172,10 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
log.Error("failed to load all keyspace groups", zap.Error(err))
continue
}
// if the default keyspace is not initialized, we should wait for the default keyspace to be initialized.
if len(groups) == 0 {
continue
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
Expand All @@ -184,14 +190,15 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
}
if !withError {
// all keyspace groups have equal or more than default replica count
log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node")
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := discoveryPath(clusterID, serviceName) + "/"
endKey := clientv3.GetPrefixRangeEnd(key) + "/"
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
resp, err := etcdutil.EtcdKVGet(cli, key, withRange)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/registry_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// ServiceRegistryEntry is the registry entry of a service
type ServiceRegistryEntry struct {
ServiceAddr string `json:"serviceAddr"`
ServiceAddr string `json:"service-addr"`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pay attention to it.

}

// Serialize this service registry entry
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,14 @@ func (s *Server) Run() error {
if err := s.startEtcd(s.ctx); err != nil {
return err
}
failpoint.Inject("delayStartServer", func() {
time.Sleep(2 * time.Second)
})

if err := s.startServer(s.ctx); err != nil {
return err
}

failpoint.Inject("delayStartServerLoop", func() {
time.Sleep(2 * time.Second)
})
s.startServerLoop(s.ctx)

return nil
Expand Down
33 changes: 33 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
Expand Down Expand Up @@ -83,6 +84,38 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err
return createTestServer(ctx, cfg, []string{utils.APIServiceName})
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tsoserver.Server, func(), error) {
cfg := tsoserver.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tsoserver.GenerateConfig(cfg)
re.NoError(err)
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return nil, nil, err
}
zapLogOnce.Do(func() {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
})
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tsoserver.Config) (*tsoserver.Server, testutil.CleanupFunc, error) {
s := tsoserver.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) {
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map

func TestTSOServerStartFirst(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServer", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -171,5 +171,5 @@ func TestTSOServerStartFirst(t *testing.T) {
re.Len(group.Keyspaces, 2)
re.Len(group.Members, 2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServer"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}
45 changes: 45 additions & 0 deletions tests/pdctl/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
Expand Down Expand Up @@ -82,3 +88,42 @@ func TestKeyspaceGroup(t *testing.T) {
re.Equal(uint32(2), keyspaceGroup.ID)
re.Equal(keyspaceGroup.Keyspaces, []uint32{222, 333})
}

func TestSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = []string{"keyspace_a", "keyspace_b"}
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

_, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc())
defer tsoServerCleanup1()
re.NoError(err)
_, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc())
defer tsoServerCleanup2()
re.NoError(err)
cmd := pdctlCmd.GetRootCmd()

time.Sleep(2 * time.Second)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

// split keyspace group.
testutil.Eventually(re, func() bool {
args := []string{"-u", pdAddr, "keyspace-group", "split", "0", "1", "2"}
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
return strings.Contains(string(output), "Success")
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}