Skip to content

Commit

Permalink
Fix "non-default keyspace groups use the same timestamp path by mista…
Browse files Browse the repository at this point in the history
…ke" (tikv#6457)

close tikv#6453, close tikv#6465

The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups
are available for serving tso requests from corresponding keyspaces by querying
IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id
in the query, it will always return true as the keyspace will be served by default keyspace group
before the keyspace groups are loaded.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent be42b78 commit 20571e4
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 50 deletions.
34 changes: 27 additions & 7 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ import (

const (
// GlobalDCLocation is the Global TSO Allocator's DC location label.
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
globalTSOAllocatorEtcdPrefix = "gta"
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
)

var (
Expand Down Expand Up @@ -273,6 +274,23 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
}

// GetTimestampPath returns the timestamp path in etcd for the given DCLocation.
func (am *AllocatorManager) GetTimestampPath(dcLocation string) string {
if am == nil {
return ""
}
if len(dcLocation) == 0 {
dcLocation = GlobalDCLocation
}

am.mu.RLock()
defer am.mu.RUnlock()
if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist {
return path.Join(am.rootPath, allocatorGroup.allocator.GetTimestampPath())
}
return ""
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (am *AllocatorManager) tsoAllocatorLoop() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -750,7 +768,9 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp",
zap.String("dc-location", ag.dcLocation), errs.ZapError(err))
zap.String("dc-location", ag.dcLocation),
zap.String("name", am.member.Name()),
errs.ZapError(err))
am.ResetAllocatorGroup(ag.dcLocation)
return
}
Expand Down
29 changes: 28 additions & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"path"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -45,6 +46,14 @@ type Allocator interface {
IsInitialize() bool
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// GetTimestampPath returns the timestamp path in etcd, which is:
// 1. for the default keyspace group:
// a. timestamp in /pd/{cluster_id}/timestamp
// b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 1. for the non-default keyspace groups:
// a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp
// b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
GetTimestampPath() string
// SetTSO sets the physical part with given TSO. It's mainly used for BR restore.
// Cannot set the TSO smaller than now in any case.
// if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error
Expand Down Expand Up @@ -80,6 +89,16 @@ func NewGlobalTSOAllocator(
am *AllocatorManager,
startGlobalLeaderLoop bool,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
tsPath := ""
if am.kgID != mcsutils.DefaultKeyspaceGroupID {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix)
}

ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
ctx: ctx,
Expand All @@ -89,7 +108,7 @@ func NewGlobalTSOAllocator(
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
rootPath: am.rootPath,
ltsPath: "",
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down Expand Up @@ -127,6 +146,14 @@ func (gta *GlobalTSOAllocator) getSyncRTT() int64 {
return syncRTT.(int64)
}

// GetTimestampPath returns the timestamp path in etcd.
func (gta *GlobalTSOAllocator) GetTimestampPath() string {
if gta == nil || gta.timestampOracle == nil {
return ""
}
return gta.timestampOracle.GetTimestampPath()
}

func (gta *GlobalTSOAllocator) estimateMaxTS(count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) {
physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(int64(count), 0)
if physical == 0 {
Expand Down
7 changes: 5 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ type KeyspaceGroupManager struct {
// 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary"
// 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default
// keyspace groups.
// Key: /ms/{cluster_id}/tso/{group}/gts/timestamp
// Key: /ms/{cluster_id}/tso/{group}/gta/timestamp
// Value: ts(time.Time)
// Key: /ms/{cluster_id}/tso/{group}/lts/{dc-location}/timestamp
// Key: /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
// Value: ts(time.Time)
// Note: The {group} is 5 digits integer with leading zeros.
tsoSvcRootPath string
Expand Down Expand Up @@ -425,6 +425,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
}
// Initialize all kinds of maps.
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath("")))
kgm.Lock()
group.KeyspaceLookupTable = make(map[uint32]struct{})
for _, kid := range group.Keyspaces {
Expand Down
24 changes: 22 additions & 2 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand All @@ -43,7 +44,7 @@ type LocalTSOAllocator struct {
// for election use, notice that the leadership that member holds is
// the leadership for PD leader. Local TSO Allocator's leadership is for the
// election of Local TSO Allocator leader among several PD servers and
// Local TSO Allocator only use member's some etcd and pbpd.Member info.
// Local TSO Allocator only use member's some etcd and pdpb.Member info.
// So it's not conflicted.
rootPath string
allocatorLeader atomic.Value // stored as *pdpb.Member
Expand All @@ -55,13 +56,24 @@ func NewLocalTSOAllocator(
leadership *election.Leadership,
dcLocation string,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 2. for the non-default keyspace groups:
// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
var tsPath string
if am.kgID == utils.DefaultKeyspaceGroupID {
tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation)
} else {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation)
}
return &LocalTSOAllocator{
allocatorManager: am,
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
rootPath: am.rootPath,
ltsPath: path.Join(localTSOAllocatorEtcdPrefix, dcLocation),
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand All @@ -73,6 +85,14 @@ func NewLocalTSOAllocator(
}
}

// GetTimestampPath returns the timestamp path in etcd.
func (lta *LocalTSOAllocator) GetTimestampPath() string {
if lta == nil || lta.timestampOracle == nil {
return ""
}
return lta.timestampOracle.GetTimestampPath()
}

// GetDCLocation returns the local allocator's dc-location.
func (lta *LocalTSOAllocator) GetDCLocation() string {
return lta.timestampOracle.dcLocation
Expand Down
27 changes: 18 additions & 9 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type tsoObject struct {
type timestampOracle struct {
client *clientv3.Client
rootPath string
// When ltsPath is empty, it means that it is a global timestampOracle.
ltsPath string
// When tsPath is empty, it means that it is a global timestampOracle.
tsPath string
storage endpoint.TSOStorage
// TODO: remove saveInterval
saveInterval time.Duration
Expand Down Expand Up @@ -141,8 +141,9 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int
return rawLogical<<suffixBits + int64(t.suffix)
}

func (t *timestampOracle) getTimestampPath() string {
return path.Join(t.ltsPath, timestampKey)
// GetTimestampPath returns the timestamp path in etcd.
func (t *timestampOracle) GetTimestampPath() string {
return path.Join(t.tsPath, timestampKey)
}

// SyncTimestamp is used to synchronize the timestamp.
Expand All @@ -153,7 +154,7 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
time.Sleep(time.Second)
})

last, err := t.storage.LoadTimestamp(t.ltsPath)
last, err := t.storage.LoadTimestamp(t.tsPath)
if err != nil {
return err
}
Expand All @@ -174,7 +175,7 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
}

save := next.Add(t.saveInterval)
if err = t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil {
if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
tsoCounter.WithLabelValues("err_save_sync_ts", t.dcLocation).Inc()
return err
}
Expand Down Expand Up @@ -241,7 +242,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi
// save into etcd only if nextPhysical is close to lastSavedTime
if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), nextPhysical) <= UpdateTimestampGuard {
save := nextPhysical.Add(t.saveInterval)
if err := t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil {
if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
tsoCounter.WithLabelValues("err_save_reset_ts", t.dcLocation).Inc()
return err
}
Expand Down Expand Up @@ -286,7 +287,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error

jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical)
if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold {
log.Warn("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prevPhysical), zap.Time("now", now), zap.Duration("update-physical-interval", t.updatePhysicalInterval))
log.Warn("clock offset",
zap.Duration("jet-lag", jetLag),
zap.Time("prev-physical", prevPhysical),
zap.Time("now", now),
zap.Duration("update-physical-interval", t.updatePhysicalInterval))
tsoCounter.WithLabelValues("slow_save", t.dcLocation).Inc()
}

Expand All @@ -313,7 +318,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
// The time window needs to be updated and saved to etcd.
if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= UpdateTimestampGuard {
save := next.Add(t.saveInterval)
if err := t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil {
if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
log.Warn("save timestamp failed",
zap.String("dc-location", t.dcLocation),
zap.String("timestamp-path", t.GetTimestampPath()),
zap.Error(err))
tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc()
return err
}
Expand Down
35 changes: 28 additions & 7 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package tso

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -134,6 +136,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp
}
}

// Create a client for each keyspace and make sure they can successfully discover the service
// provided by the default keyspace group.
keyspaceIDs := []uint32{0, 1, 2, 3, 1000}
clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
Expand All @@ -148,6 +152,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
// on a tso server.
re := suite.Require()

// Create keyspace groups.
params := []struct {
keyspaceGroupID uint32
keyspaceIDs []uint32
Expand Down Expand Up @@ -176,15 +181,29 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
})
}

// Wait until all keyspace groups are ready.
testutil.Eventually(re, func() bool {
for _, param := range params {
for _, keyspaceID := range param.keyspaceIDs {
served := false
for _, server := range suite.tsoCluster.GetServers() {
if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) {
tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID)
am, err := server.GetTSOAllocatorManager(param.keyspaceGroupID)
re.NoError(err)
re.NotNil(tam)
re.NotNil(am)

// Make sure every keyspace group is using the right timestamp path
// for loading/saving timestamp from/to etcd.
var timestampPath string
clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10)
if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID)
} else {
timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp",
clusterID, param.keyspaceGroupID)
}
re.Equal(timestampPath, am.GetTimestampPath(""))

served = true
}
}
Expand All @@ -196,6 +215,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
return true
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

// Create a client for each keyspace and make sure they can successfully discover the service
// provided by the corresponding keyspace group.
keyspaceIDs := make([]uint32, 0)
for _, param := range params {
keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...)
Expand Down Expand Up @@ -254,20 +275,20 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
splitTS, err = suite.requestTSO(re, 1, 222, 2)
return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0
})
splitTS, err = suite.requestTSO(re, 1, 222, 2)
re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0)
// Finish the split.
handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2)
}

func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO(
re *require.Assertions,
count, keyspaceID, keyspaceGroupID uint32,
) (pdpb.Timestamp, error) {
primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID)
tam, err := primary.GetTSOAllocatorManager(keyspaceGroupID)
kgm := primary.GetKeyspaceGroupManager()
re.NotNil(kgm)
ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, count)
re.NoError(err)
re.NotNil(tam)
return tam.HandleRequest(tsopkg.GlobalDCLocation, count)
return ts, err
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() {
Expand Down
Loading

0 comments on commit 20571e4

Please sign in to comment.