Skip to content

Commit

Permalink
mcs, tso: support multi-keyspace-group and its service discovery in E…
Browse files Browse the repository at this point in the history
…2E path (tikv#6321)

ref tikv#6232

Support multi-keyspace-group in PD(TSO) client

Signed-off-by: Bin Shi <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent ec15784 commit d826867
Show file tree
Hide file tree
Showing 36 changed files with 814 additions and 316 deletions.
23 changes: 17 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ import (
const (
// defaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap and reserved for users who haven't been assigned keyspace.
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
defaultKeyspaceID = uint32(0)
// defaultKeySpaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
defaultKeySpaceGroupID = uint32(0)
)

// Region contains information of a region's meta and its peers.
Expand Down Expand Up @@ -205,6 +209,8 @@ var (
errClosing = errors.New("[pd] closing")
// errTSOLength is returned when the number of response timestamps is inconsistent with request.
errTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
// errInvalidRespHeader is returned when the response doesn't contain service mode info unexpectedly.
errNoServiceModeReturned = errors.New("[pd] no service mode returned")
)

// ClientOption configures client.
Expand Down Expand Up @@ -380,6 +386,7 @@ func (c *client) Close() {
func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if newMode == c.serviceMode {
return
}
Expand All @@ -396,13 +403,18 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(c.ctx, MetaStorageClient(c),
c.GetClusterID(c.ctx), c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", c.serviceMode.String()), zap.Error(err))
zap.Strings("svr-urls", c.svrUrls),
zap.String("current-mode", c.serviceMode.String()),
zap.Error(err))
return
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down Expand Up @@ -602,11 +614,10 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
req.start = time.Now()
req.keyspaceID = c.keyspaceID
req.dcLocation = dcLocation

if tsoClient == nil {
req.done <- errs.ErrClientGetTSO
req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
return req
}

Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func TestUpdateURLs(t *testing.T) {
cli := &pdServiceDiscovery{option: newOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs())
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs())
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members)
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetURLs())
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs())
}

const testClientURL = "tmp://test.url:5255"
Expand Down
25 changes: 13 additions & 12 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ const (

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
)

// grpcutil errors
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a h1:PWkMSJSDaOuLNKCV84K3tQ9stZuZPN8E148jRPD9TcA=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
111 changes: 85 additions & 26 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
)

type serviceType int

const (
apiService serviceType = iota
tsoService
)

// ServiceDiscovery defines the general interface for service discovery on a quorum-based cluster
// or a primary/secondary configured cluster.
type ServiceDiscovery interface {
Expand All @@ -50,8 +57,14 @@ type ServiceDiscovery interface {
Close()
// GetClusterID returns the ID of the cluster
GetClusterID() uint64
// GetURLs returns the URLs of the servers.
GetURLs() []string
// GetKeyspaceID returns the ID of the keyspace
GetKeyspaceID() uint32
// GetKeyspaceGroupID returns the ID of the keyspace group
GetKeyspaceGroupID() uint32
// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
DiscoverMicroservice(svcType serviceType) []string
// GetServiceURLs returns the URLs of the servers providing the service
GetServiceURLs() []string
// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint
// which is the leader in a quorum-based cluster or the primary in a primary/secondary
// configured cluster.
Expand Down Expand Up @@ -174,7 +187,9 @@ func (c *pdServiceDiscovery) Init() error {
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.updateServiceMode()
if err := c.checkServiceModeChanged(); err != nil {
log.Warn("[pd] failed to check service mode and will check later", zap.Error(err))
}

c.wg.Add(2)
go c.updateMemberLoop()
Expand Down Expand Up @@ -218,7 +233,7 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetURLs()), errs.ZapError(err))
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
}
}
}
Expand All @@ -240,7 +255,11 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {
return
case <-ticker.C:
}
c.updateServiceMode()
if err := c.checkServiceModeChanged(); err != nil {
log.Error("[pd] failed to update service mode",
zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
c.ScheduleCheckMemberChanged() // check if the leader changed
}
}
}

Expand All @@ -263,13 +282,50 @@ func (c *pdServiceDiscovery) GetClusterID() uint64 {
return c.clusterID
}

// GetURLs returns the URLs of the servers.
// GetKeyspaceID returns the ID of the keyspace
func (c *pdServiceDiscovery) GetKeyspaceID() uint32 {
// PD/API service only supports the default keyspace
return defaultKeyspaceID
}

// GetKeyspaceGroupID returns the ID of the keyspace group
func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
// PD/API service only supports the default keyspace group
return defaultKeySpaceGroupID
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string) {
switch svcType {
case apiService:
urls = c.GetServiceURLs()
case tsoService:
leaderAddr := c.getLeaderAddr()
if len(leaderAddr) > 0 {
clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout)
if err != nil {
log.Error("[pd] failed to get cluster info",
zap.String("leader-addr", leaderAddr), errs.ZapError(err))
return nil
}
urls = clusterInfo.TsoUrls
} else {
log.Error("[pd] failed to get leader addr")
}
default:
panic("invalid service type")
}

return urls
}

// GetServiceURLs returns the URLs of the servers.
// For testing use. It should only be called when the client is closed.
func (c *pdServiceDiscovery) GetURLs() []string {
func (c *pdServiceDiscovery) GetServiceURLs() []string {
return c.urls.Load().([]string)
}

// GetServingAddr returns the grpc client connection of the serving endpoint
// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint
// which is the leader in a quorum-based cluster or the primary in a primary/secondary
// configured cluster.
func (c *pdServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn {
Expand Down Expand Up @@ -360,7 +416,7 @@ func (c *pdServiceDiscovery) initClusterID() error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
clusterID := uint64(0)
for _, url := range c.GetURLs() {
for _, url := range c.GetServiceURLs() {
members, err := c.getMembers(ctx, url, c.option.timeout)
if err != nil || members.GetHeader() == nil {
log.Warn("[pd] failed to get cluster id", zap.String("url", url), errs.ZapError(err))
Expand All @@ -386,29 +442,32 @@ func (c *pdServiceDiscovery) initClusterID() error {
return nil
}

func (c *pdServiceDiscovery) updateServiceMode() {
func (c *pdServiceDiscovery) checkServiceModeChanged() error {
leaderAddr := c.getLeaderAddr()
if len(leaderAddr) > 0 {
clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout)
// If the method is not supported, we set it to pd mode.
if err != nil {
if len(leaderAddr) == 0 {
return errors.New("no leader found")
}

clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout)
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we set it to pd mode.
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
if strings.Contains(err.Error(), "Unimplemented") {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
} else {
log.Warn("[pd] failed to get cluster info for the leader", zap.String("leader-addr", leaderAddr), errs.ZapError(err))
}
return
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
return nil
}
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
} else {
log.Warn("[pd] no leader found")
return err
}
if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 {
return errors.WithStack(errNoServiceModeReturned)
}
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
return nil
}

func (c *pdServiceDiscovery) updateMember() error {
for i, url := range c.GetURLs() {
for i, url := range c.GetServiceURLs() {
failpoint.Inject("skipFirstUpdateMember", func() {
if i == 0 {
failpoint.Continue()
Expand All @@ -424,7 +483,7 @@ func (c *pdServiceDiscovery) updateMember() error {
var errTSO error
if err == nil {
if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader address don't exist")
err = errs.ErrClientGetLeader.FastGenByArgs("leader address doesn't exist")
}
// Still need to update TsoAllocatorLeaders, even if there is no PD leader
errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders())
Expand Down Expand Up @@ -501,7 +560,7 @@ func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) {
}

sort.Strings(urls)
oldURLs := c.GetURLs()
oldURLs := c.GetServiceURLs()
// the url list is same.
if reflect.DeepEqual(oldURLs, urls) {
return
Expand Down
1 change: 0 additions & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type tsoRequest struct {
done chan error
physical int64
logical int64
keyspaceID uint32
dcLocation string
}

Expand Down
Loading

0 comments on commit d826867

Please sign in to comment.