Skip to content

Commit

Permalink
TSO microservice discovery fallback path shouldn't call FindGroupByKe…
Browse files Browse the repository at this point in the history
…yspaceID (tikv#6473)

close tikv#6472

TSO microservice discovery fallback path shouldn't call FindGroupByKeyspaceID

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent 6c27fd9 commit 0e1f37f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 44 deletions.
85 changes: 58 additions & 27 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pd
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (k *keyspaceGroupSvcDiscovery) update(
keyspaceGroup *tsopb.KeyspaceGroup,
newPrimaryAddr string,
secondaryAddrs, addrs []string,
) (oldPrimaryAddr string, primarySwitched bool) {
) (oldPrimaryAddr string, primarySwitched, secondaryChanged bool) {
k.Lock()
defer k.Unlock()

Expand All @@ -79,10 +80,13 @@ func (k *keyspaceGroupSvcDiscovery) update(
k.primaryAddr = newPrimaryAddr
}

if !reflect.DeepEqual(k.secondaryAddrs, secondaryAddrs) {
k.secondaryAddrs = secondaryAddrs
secondaryChanged = true
}

k.group = keyspaceGroup
k.secondaryAddrs = secondaryAddrs
k.addrs = addrs

return
}

Expand Down Expand Up @@ -413,16 +417,43 @@ func (c *tsoServiceDiscovery) updateMember() error {
log.Error("[tso] failed to get tso server", errs.ZapError(err))
return err
}
keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))

var keyspaceGroup *tsopb.KeyspaceGroup
if len(tsoServerAddr) > 0 {
keyspaceGroup, err = c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
}
return err
}
c.tsoServerDiscovery.resetFailure()
} else {
// There is no error but no tso server address found, which means
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. In this case,
// we fall back to the old way of discovering the tso primary addresses
// from etcd directly.
log.Warn("[tso] no tso server address found,"+
" fallback to the legacy path to discover from etcd directly",
zap.String("discovery-key", c.defaultDiscoveryKey))
addrs, err := c.discoverWithLegacyPath()
if err != nil {
return err
}
if len(addrs) == 0 {
return errors.New("no tso server address found")
}
members := make([]*tsopb.KeyspaceGroupMember, 0, len(addrs))
for _, addr := range addrs {
members = append(members, &tsopb.KeyspaceGroupMember{Address: addr})
}
members[0].IsPrimary = true
keyspaceGroup = &tsopb.KeyspaceGroup{
Id: defaultKeySpaceGroupID,
Members: members,
}
return err
}
c.tsoServerDiscovery.resetFailure()

log.Info("[tso] update keyspace group", zap.String("keyspace-group", keyspaceGroup.String()))

// Initialize the serving addresses from the returned keyspace group info.
primaryAddr := ""
Expand All @@ -449,12 +480,17 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
}

oldPrimary, primarySwitched := c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs)
oldPrimary, primarySwitched, secondaryChanged :=
c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs)
if primarySwitched {
if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil {
return err
}
}
if primarySwitched || secondaryChanged {
log.Info("[tso] updated keyspace group service discovery info",
zap.String("keyspace-group-service", keyspaceGroup.String()))
}

// Even if the primary address is empty, we still updated other returned info above, including the
// keyspace group info and the secondary addresses.
Expand All @@ -470,6 +506,12 @@ func (c *tsoServiceDiscovery) updateMember() error {
func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
keyspaceID uint32, tsoSrvAddr string, timeout time.Duration,
) (*tsopb.KeyspaceGroup, error) {
failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) {
keyspaceToCheck, ok := val.(int)
if ok && keyspaceID == uint32(keyspaceToCheck) {
panic("findGroupByKeyspaceID is called unexpectedly")
}
})
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()

Expand Down Expand Up @@ -526,21 +568,10 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error)
})
if len(addrs) == 0 {
// There is no error but no tso server address found, which means
// either the server side is experiencing some problems to get the
// tso primary addresses or the server hasn't been upgraded to the
// version which processes and returns GetClusterInfoResponse.TsoUrls.
// In this case, we fall back to the old way of discovering the tso
// primary addresses from etcd directly.
log.Warn("[tso] no tso server address found,"+
" fallback to the legacy path to discover from etcd directly",
zap.String("discovery-key", c.defaultDiscoveryKey))
addrs, err = c.discoverWithLegacyPath()
if err != nil {
return "", err
}
if len(addrs) == 0 {
return "", errors.New("no tso server address found")
}
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. Return here
// and handle the fallback logic outside of this function.
return "", nil
}

log.Info("update tso server addresses", zap.Strings("addrs", addrs))
Expand Down
37 changes: 20 additions & 17 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tso

import (
"context"
"fmt"
"math"
"math/rand"
"strings"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/tikv/pd/client/testutil"
bs "github.com/tikv/pd/pkg/basicserver"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/tsoutil"
Expand Down Expand Up @@ -227,30 +229,31 @@ func (suite *tsoClientTestSuite) TestGetTSAsync() {

func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
re := suite.Require()
keyspaceID := uint32(1000000)
// Make sure this keyspace ID is not in use somewhere.
re.False(slice.Contains(suite.keyspaceIDs, keyspaceID))
failpointValue := fmt.Sprintf(`return(%d)`, keyspaceID)
// Simulate the case that the server has lower version than the client and returns no tso addrs
// in the GetClusterInfo RPC.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID", failpointValue))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID"))
}()
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
client := mcs.SetupClientWithDefaultKeyspaceName(
suite.ctx, re, strings.Split(suite.backendEndpoints, ","))
var lastTS uint64
for j := 0; j < tsoRequestRound; j++ {
physical, logical, err := client.GetTS(suite.ctx)
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
}()

ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
client := mcs.SetupClientWithKeyspaceID(
ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ","))
var lastTS uint64
for j := 0; j < tsoRequestRound; j++ {
physical, logical, err := client.GetTS(ctx)
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
wg.Wait()
}

// TestGetMinTS tests the correctness of GetMinTS.
Expand Down

0 comments on commit 0e1f37f

Please sign in to comment.