Skip to content

Commit

Permalink
Add keyspace group info in the timestamp fallback log in the client.
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Jun 21, 2023
1 parent d9d9184 commit 55f6604
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
26 changes: 18 additions & 8 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type tsoDispatcher struct {
}

type lastTSO struct {
physical int64
logical int64
keyspaceGroupID uint32
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -708,7 +709,7 @@ func (c *tsoClient) processRequests(

requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
dcLocation, requests, tbc.batchStartTime)
if err != nil {
Expand All @@ -717,33 +718,42 @@ func (c *tsoClient) processRequests(
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) {
func (c *tsoClient) compareAndSwapTS(
dcLocation string, respKeyspaceGroupID uint32,
physical, firstLogical int64, suffixBits uint32, count int64,
) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
physical: physical,
keyspaceGroupID: respKeyspaceGroupID,
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical
// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d",
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+
"last keyspace group: %d, keyspace in request: %d, "+
"keyspace group in request: %d, keyspace group in response: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID()))
lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(),
c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID))
}
lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
Expand Down
8 changes: 5 additions & 3 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type tsoStream interface {
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error)
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
}

type pdTSOStream struct {
Expand All @@ -111,7 +111,7 @@ type pdTSOStream struct {

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Expand Down Expand Up @@ -149,6 +149,7 @@ func (s *pdTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = defaultKeySpaceGroupID
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}
Expand All @@ -160,7 +161,7 @@ type tsoTSOStream struct {
func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Expand Down Expand Up @@ -200,6 +201,7 @@ func (s *tsoTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}

0 comments on commit 55f6604

Please sign in to comment.