Skip to content

Commit

Permalink
Merge branch 'master' into keyspace-pick
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 3, 2023
2 parents be91292 + 626d1c8 commit fe8159f
Show file tree
Hide file tree
Showing 62 changed files with 2,370 additions and 506 deletions.
1 change: 1 addition & 0 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context {
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
tlsConfig, err := tlsCfg.ToTLSConfig()
Expand Down
8 changes: 4 additions & 4 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type tsoClient struct {
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo

checkTSDeadlineCh chan struct{}
checkTSODispatcherCh chan struct{}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro
return err
}
c.tsoAllocators.Store(dcLocation, addr)
log.Info("[tso] switch dc tso allocator serving address",
log.Info("[tso] switch dc tso local allocator serving address",
zap.String("dc-location", dcLocation),
zap.String("new-address", addr),
zap.String("old-address", oldAddr))
Expand All @@ -227,7 +227,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro

func (c *tsoClient) updateTSOGlobalServAddr(addr string) error {
c.tsoAllocators.Store(globalDCLocation, addr)
log.Info("[tso] switch dc tso allocator serving address",
log.Info("[tso] switch dc tso global allocator serving address",
zap.String("dc-location", globalDCLocation),
zap.String("new-address", addr))
c.scheduleCheckTSODispatcher()
Expand Down
81 changes: 56 additions & 25 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ type tsoDispatcher struct {
tsoBatchController *tsoBatchController
}

type lastTSO struct {
physical int64
logical int64
type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -708,45 +712,72 @@ func (c *tsoClient) processRequests(

requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerAddr(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
func (c *tsoClient) compareAndSwapTS(
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID()))
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
)
}
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
Expand Down
11 changes: 6 additions & 5 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -121,7 +122,7 @@ type tsoServiceDiscovery struct {
metacli MetaStorageClient
apiSvcDiscovery ServiceDiscovery
clusterID uint64
keyspaceID uint32
keyspaceID atomic.Uint32

// defaultDiscoveryKey is the etcd path used for discovering the serving endpoints of
// the default keyspace group
Expand Down Expand Up @@ -165,12 +166,12 @@ func newTSOServiceDiscovery(
cancel: cancel,
metacli: metacli,
apiSvcDiscovery: apiSvcDiscovery,
keyspaceID: keyspaceID,
clusterID: clusterID,
tlsCfg: tlsCfg,
option: option,
checkMembershipCh: make(chan struct{}, 1),
}
c.keyspaceID.Store(keyspaceID)
c.keyspaceGroupSD = &keyspaceGroupSvcDiscovery{
primaryAddr: "",
secondaryAddrs: make([]string, 0),
Expand Down Expand Up @@ -269,12 +270,12 @@ func (c *tsoServiceDiscovery) GetClusterID() uint64 {

// GetKeyspaceID returns the ID of the keyspace
func (c *tsoServiceDiscovery) GetKeyspaceID() uint32 {
return c.keyspaceID
return c.keyspaceID.Load()
}

// SetKeyspaceID sets the ID of the keyspace
func (c *tsoServiceDiscovery) SetKeyspaceID(keyspaceID uint32) {
c.keyspaceID = keyspaceID
c.keyspaceID.Store(keyspaceID)
}

// GetKeyspaceGroupID returns the ID of the keyspace group. If the keyspace group is unknown,
Expand Down Expand Up @@ -426,7 +427,7 @@ func (c *tsoServiceDiscovery) updateMember() error {

var keyspaceGroup *tsopb.KeyspaceGroup
if len(tsoServerAddr) > 0 {
keyspaceGroup, err = c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
keyspaceGroup, err = c.findGroupByKeyspaceID(c.GetKeyspaceID(), tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
Expand Down
37 changes: 26 additions & 11 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type tsoStreamBuilderFactory interface {
type pdTSOStreamBuilderFactory struct{}

func (f *pdTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc)}
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverAddr: cc.Target()}
}

type tsoTSOStreamBuilderFactory struct{}

func (f *tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc)}
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverAddr: cc.Target()}
}

// TSO Stream Builder
Expand All @@ -51,7 +51,8 @@ type tsoStreamBuilder interface {
}

type pdTSOStreamBuilder struct {
client pdpb.PDClient
serverAddr string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
Expand All @@ -61,13 +62,14 @@ func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFun
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream}, nil
return &pdTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}

type tsoTSOStreamBuilder struct {
client tsopb.TSOClient
serverAddr string
client tsopb.TSOClient
}

func (b *tsoTSOStreamBuilder) build(
Expand All @@ -79,7 +81,7 @@ func (b *tsoTSOStreamBuilder) build(
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream}, nil
return &tsoTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}
Expand All @@ -98,20 +100,26 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
// TSO Stream

type tsoStream interface {
getServerAddr() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error)
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
serverAddr string
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Expand Down Expand Up @@ -149,18 +157,24 @@ func (s *pdTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = defaultKeySpaceGroupID
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}

type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
serverAddr string
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Expand Down Expand Up @@ -200,6 +214,7 @@ func (s *tsoTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}
17 changes: 17 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,20 @@ coverage:
# basic
target: auto
threshold: 3%

comment:
layout: "header, diff, flags"
behavior: default
require_changes: false

flag_management:
default_rules: # the rules that will be followed for any flag added, generally
carryforward: true
statuses:
- type: project
target: 85%
- type: patch
target: 85%

ignore:
- tests/** # integration test cases or tools.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceGroupIsMerging"]
error = '''
the keyspace group %d is merging
'''

["PD:tso:ErrKeyspaceGroupNotInitialized"]
error = '''
the keyspace group %d isn't initialized
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20230530111525-e4919c190b46
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 h1:/jqjj+ydlqjp144LAlHDfHtr7eWJyaNIIXX5viv0RZo=
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS"))
ErrKeyspaceGroupIsMerging = errors.Normalize("the keyspace group %d is merging", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIsMerging"))
)

// member errors
Expand Down
Loading

0 comments on commit fe8159f

Please sign in to comment.