Skip to content

Commit

Permalink
mcs, tso: handle null keyspace (tikv#6476)
Browse files Browse the repository at this point in the history
ref tikv#5895

For API V1 and legacy path (NewClientWithContext w/o keyspace id/name),
using Null Keypsace ID (uint32max) instead of default keyspace id and
make sure it can be served by the default keyspace group's timeline. Modifying test accordingly.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent 0e1f37f commit 62ec3a1
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 47 deletions.
59 changes: 49 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ import (
const (
// defaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized
// when PD bootstrap and reserved for users who haven't been assigned keyspace.
defaultKeyspaceID = uint32(0)
maxKeyspaceID = uint32(0xFFFFFF)
// nullKeyspaceID is used for api v1 or legacy path where is keyspace agnostic.
nullKeyspaceID = uint32(0xFFFFFFFF)
// defaultKeySpaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
defaultKeySpaceGroupID = uint32(0)
Expand Down Expand Up @@ -303,17 +306,37 @@ type SecurityOption struct {
}

// NewClient creates a PD client.
func NewClient(svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
func NewClient(
svrAddrs []string, security SecurityOption, opts ...ClientOption,
) (Client, error) {
return NewClientWithContext(context.Background(), svrAddrs, security, opts...)
}

// NewClientWithContext creates a PD client with context. This API uses the default keyspace id 0.
func NewClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
return NewClientWithKeyspace(ctx, defaultKeyspaceID, svrAddrs, security, opts...)
func NewClientWithContext(
ctx context.Context, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
return createClientWithKeyspace(ctx, nullKeyspaceID, svrAddrs, security, opts...)
}

// NewClientWithKeyspace creates a client with context and the specified keyspace id.
func NewClientWithKeyspace(ctx context.Context, keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
func NewClientWithKeyspace(
ctx context.Context, keyspaceID uint32, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
if keyspaceID < defaultKeyspaceID || keyspaceID > maxKeyspaceID {
return nil, errors.Errorf("invalid keyspace id %d. It must be in the range of [%d, %d]",
keyspaceID, defaultKeyspaceID, maxKeyspaceID)
}
return createClientWithKeyspace(ctx, keyspaceID, svrAddrs, security, opts...)
}

// createClientWithKeyspace creates a client with context and the specified keyspace id.
func createClientWithKeyspace(
ctx context.Context, keyspaceID uint32, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
Expand All @@ -340,7 +363,9 @@ func NewClientWithKeyspace(ctx context.Context, keyspaceID uint32, svrAddrs []st
opt(c)
}

c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, c.setServiceMode, c.svrUrls, c.tlsCfg, c.option)
c.pdSvcDiscovery = newPDServiceDiscovery(
clientCtx, clientCancel, &c.wg, c.setServiceMode,
keyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
return nil, err
Expand All @@ -350,8 +375,17 @@ func NewClientWithKeyspace(ctx context.Context, keyspaceID uint32, svrAddrs []st
}

// NewClientWithKeyspaceName creates a client with context and the specified keyspace name.
func NewClientWithKeyspaceName(ctx context.Context, keyspace string, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace", zap.Strings("pd-address", svrAddrs), zap.String("keyspace", keyspace))
func NewClientWithKeyspaceName(
ctx context.Context, keyspace string, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace", keyspace))

// if keyspace is empty, fall back to the legacy API
if len(keyspace) == 0 {
return NewClientWithContext(ctx, svrAddrs, security, opts...)
}

tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
Expand All @@ -378,14 +412,19 @@ func NewClientWithKeyspaceName(ctx context.Context, keyspace string, svrAddrs []
opt(c)
}

c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, c.setServiceMode, c.svrUrls, c.tlsCfg, c.option)
// Create a PD service discovery with null keyspace id, then query the real id wth the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
c.pdSvcDiscovery = newPDServiceDiscovery(
clientCtx, clientCancel, &c.wg, c.setServiceMode, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
return nil, err
}
if err := c.initRetry(c.loadKeyspaceMeta, keyspace); err != nil {
return nil, err
}
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)

return c, nil
}

Expand Down
15 changes: 12 additions & 3 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type ServiceDiscovery interface {
GetClusterID() uint64
// GetKeyspaceID returns the ID of the keyspace
GetKeyspaceID() uint32
// SetKeyspaceID sets the ID of the keyspace
SetKeyspaceID(keyspaceID uint32)
// GetKeyspaceGroupID returns the ID of the keyspace group
GetKeyspaceGroupID() uint32
// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
Expand Down Expand Up @@ -147,7 +149,8 @@ type pdServiceDiscovery struct {
cancel context.CancelFunc
closeOnce sync.Once

tlsCfg *tlsutil.TLSConfig
keyspaceID uint32
tlsCfg *tlsutil.TLSConfig
// Client option.
option *option
}
Expand All @@ -157,6 +160,7 @@ func newPDServiceDiscovery(
ctx context.Context, cancel context.CancelFunc,
wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode),
keyspaceID uint32,
urls []string, tlsCfg *tlsutil.TLSConfig, option *option,
) *pdServiceDiscovery {
pdsd := &pdServiceDiscovery{
Expand All @@ -165,6 +169,7 @@ func newPDServiceDiscovery(
cancel: cancel,
wg: wg,
serviceModeUpdateCb: serviceModeUpdateCb,
keyspaceID: keyspaceID,
tlsCfg: tlsCfg,
option: option,
}
Expand Down Expand Up @@ -288,8 +293,12 @@ func (c *pdServiceDiscovery) GetClusterID() uint64 {

// GetKeyspaceID returns the ID of the keyspace
func (c *pdServiceDiscovery) GetKeyspaceID() uint32 {
// PD/API service only supports the default keyspace
return defaultKeyspaceID
return c.keyspaceID
}

// SetKeyspaceID sets the ID of the keyspace
func (c *pdServiceDiscovery) SetKeyspaceID(keyspaceID uint32) {
c.keyspaceID = keyspaceID
}

// GetKeyspaceGroupID returns the ID of the keyspace group
Expand Down
5 changes: 5 additions & 0 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ func (c *tsoServiceDiscovery) GetKeyspaceID() uint32 {
return c.keyspaceID
}

// SetKeyspaceID sets the ID of the keyspace
func (c *tsoServiceDiscovery) SetKeyspaceID(keyspaceID uint32) {
c.keyspaceID = keyspaceID
}

// GetKeyspaceGroupID returns the ID of the keyspace group. If the keyspace group is unknown,
// it returns the default keyspace group ID.
func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 {
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ const (
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
DefaultKeyspaceID = uint32(0)

// NullKeyspaceID is used for api v1 or legacy path where is keyspace agnostic.
NullKeyspaceID = uint32(0xFFFFFFFF)
// DefaultKeyspaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)
Expand Down
21 changes: 13 additions & 8 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,34 +395,39 @@ func (suite *keyspaceGroupManagerTestSuite) TestGetKeyspaceGroupMetaWithCheck()
err = mgr.Initialize()
re.NoError(err)

// Should be able to get AM for keyspace 0, 1, 2 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(0, 0)
// Should be able to get AM for the default/null keyspace and keyspace 1, 2 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(mcsutils.DefaultKeyspaceID, 0)
re.NoError(err)
re.Equal(uint32(0), kgid)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(mcsutils.NullKeyspaceID, 0)
re.NoError(err)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(1, 0)
re.NoError(err)
re.Equal(uint32(0), kgid)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(2, 0)
re.NoError(err)
re.Equal(uint32(0), kgid)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)
// Should still succeed even keyspace 3 isn't explicitly assigned to any
// keyspace group. It will be assigned to the default keyspace group.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(3, 0)
re.NoError(err)
re.Equal(uint32(0), kgid)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)
// Should succeed and get the meta of keyspace group 0, because keyspace 0
// belongs to group 0, though the specified group 1 doesn't exist.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(0, 1)
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(mcsutils.DefaultKeyspaceID, 1)
re.NoError(err)
re.Equal(uint32(0), kgid)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)
// Should fail because keyspace 3 isn't explicitly assigned to any keyspace
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.2
github.com/tikv/pd v0.0.0-00010101000000-000000000000
github.com/tikv/pd/client v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -120,6 +119,7 @@ require (
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
Expand Down
21 changes: 7 additions & 14 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
Expand Down Expand Up @@ -139,19 +138,13 @@ func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Serve
}

// WaitForTSOServiceAvailable waits for the pd client being served by the tso server side
func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error {
var err error
for i := 0; i < 30; i++ {
if _, _, err := pdClient.GetTS(ctx); err == nil {
return nil
}
select {
case <-ctx.Done():
return err
case <-time.After(100 * time.Millisecond):
}
}
return errors.WithStack(err)
func WaitForTSOServiceAvailable(
ctx context.Context, re *require.Assertions, client pd.Client,
) {
testutil.Eventually(re, func() bool {
_, _, err := client.GetTS(ctx)
return err == nil
})
}

// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces.
Expand Down
16 changes: 8 additions & 8 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,20 +495,20 @@ func (suite *APIServerForwardTestSuite) checkUnavailableTSO() {
}

func (suite *APIServerForwardTestSuite) checkAvailableTSO() {
err := mcs.WaitForTSOServiceAvailable(suite.ctx, suite.pdClient)
suite.NoError(err)
re := suite.Require()
mcs.WaitForTSOServiceAvailable(suite.ctx, re, suite.pdClient)
// try to get ts
_, _, err = suite.pdClient.GetTS(suite.ctx)
suite.NoError(err)
_, _, err := suite.pdClient.GetTS(suite.ctx)
re.NoError(err)
// try to update gc safe point
min, err := suite.pdClient.UpdateServiceGCSafePoint(context.Background(), "a", 1000, 1)
suite.NoError(err)
suite.Equal(uint64(0), min)
re.NoError(err)
re.Equal(uint64(0), min)
// try to set external ts
ts, err := suite.pdClient.GetExternalTimestamp(suite.ctx)
suite.NoError(err)
re.NoError(err)
err = suite.pdClient.SetExternalTimestamp(suite.ctx, ts+1)
suite.NoError(err)
re.NoError(err)
}

type CommonTestSuite struct {
Expand Down
8 changes: 6 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ func (suite *tsoClientTestSuite) SetupSuite() {
if suite.legacy {
client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{})
re.NoError(err)
suite.keyspaceIDs = append(suite.keyspaceIDs, 0)
innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery })
re.True(ok)
re.Equal(mcsutils.NullKeyspaceID, innerClient.GetServiceDiscovery().GetKeyspaceID())
re.Equal(mcsutils.DefaultKeyspaceGroupID, innerClient.GetServiceDiscovery().GetKeyspaceGroupID())
mcs.WaitForTSOServiceAvailable(suite.ctx, re, client)
suite.clients = make([]pd.Client, 0)
suite.clients = append(suite.clients, client)
} else {
Expand All @@ -111,7 +115,7 @@ func (suite *tsoClientTestSuite) SetupSuite() {
keyspaceGroupID uint32
keyspaceIDs []uint32
}{
{0, []uint32{0, 10}},
{0, []uint32{mcsutils.DefaultKeyspaceID, 10}},
{1, []uint32{1, 11}},
{2, []uint32{2}},
}
Expand Down

0 comments on commit 62ec3a1

Please sign in to comment.