Skip to content

Commit

Permalink
domain: add resolve lock logic for mvcc get key loading schema diff (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 24, 2023
1 parent a495716 commit 5a6b36c
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 22 deletions.
12 changes: 6 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, 0, nil, err
}
// fetch the commit timestamp of the schema diff
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion)
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS)
if err != nil {
logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion))
schemaTs = 0
Expand Down Expand Up @@ -246,18 +246,18 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
}

// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) {
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) {
tikvStore, ok := do.Store().(helper.Storage)
if ok {
helper := helper.NewHelper(tikvStore)
data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version))
newHelper := helper.NewHelper(tikvStore)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(m.EncodeSchemaDiffKey(version), startTS)
if err != nil {
return 0, err
}
if data == nil || data.Info == nil || len(data.Info.Writes) == 0 {
if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(data.Info.Writes[0].CommitTs), nil
return int64(mvccResp.Info.Writes[0].CommitTs), nil
}
return 0, errors.Errorf("cannot get store from domain")
}
Expand Down
119 changes: 103 additions & 16 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,112 @@ func NewHelper(store Storage) *Helper {
}
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
// MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms.
const MaxBackoffTimeoutForMvccGet = 5000

// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved.
func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) {
bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeoutForMvccGet, nil)
tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey})
kvResp, err := h.Store.SendReq(tikv.NewBackofferWithVars(context.Background(), 500, nil), tikvReq, keyLocation.Region, time.Minute)
if err != nil {
logutil.BgLogger().Info("get MVCC by encoded key failed",
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
for {
keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute)
if err != nil {
logutil.BgLogger().Warn("get MVCC by encoded key failed",
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}

regionErr, err := kvResp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
}
if regionErr != nil {
if err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())); err != nil {
return nil, err
}
continue
}

mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse)
if errMsg := mvccResp.GetError(); errMsg != "" {
logutil.BgLogger().Warn("get MVCC by encoded key failed",
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp),
zap.String("error", errMsg))
return nil, errors.New(errMsg)
}
if mvccResp.Info == nil {
errMsg := "Invalid mvcc response result, the info field is nil"
logutil.BgLogger().Warn(errMsg,
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp))
return nil, errors.New(errMsg)
}

// Try to resolve the lock and retry mvcc get again if the input startTS is a valid value.
if startTS > 0 && mvccResp.Info.GetLock() != nil {
latestTS, err := h.Store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
logutil.BgLogger().Warn("Failed to get latest ts", zap.Error(err))
return nil, err
}
if startTS > latestTS {
errMsg := fmt.Sprintf("Snapshot ts=%v is larger than latest allocated ts=%v, lock could not be resolved",
startTS, latestTS)
logutil.BgLogger().Warn(errMsg)
return nil, errors.New(errMsg)
}
lockInfo := mvccResp.Info.GetLock()
lock := &txnlock.Lock{
Key: []byte(encodedKey),
Primary: lockInfo.GetPrimary(),
TxnID: lockInfo.GetStartTs(),
TTL: lockInfo.GetTtl(),
TxnSize: lockInfo.GetTxnSize(),
LockType: lockInfo.GetType(),
UseAsyncCommit: lockInfo.GetUseAsyncCommit(),
LockForUpdateTS: lockInfo.GetForUpdateTs(),
}
// Disable for read to avoid async resolve.
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: startTS,
Locks: []*txnlock.Lock{lock},
Lite: true,
ForRead: false,
Detail: nil,
}
resolveLockRes, err := h.Store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLocksOpts)
if err != nil {
return nil, err
}
msBeforeExpired := resolveLockRes.TTL
if msBeforeExpired > 0 {
if err = bo.BackoffWithCfgAndMaxSleep(tikv.BoTxnLock(), int(msBeforeExpired),
errors.Errorf("resolve lock fails lock: %v", lock)); err != nil {
return nil, err
}
}
continue
}
return mvccResp, nil
}
return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
return h.GetMvccByEncodedKeyWithTS(encodedKey, 0)
}

// MvccKV wraps the key's mvcc info in tikv.
Expand Down

0 comments on commit 5a6b36c

Please sign in to comment.