diff --git a/domain/domain.go b/domain/domain.go index db857f314e9ed..d1a445345cc9b 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -191,7 +191,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, 0, nil, err } // fetch the commit timestamp of the schema diff - schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion) + schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS) if err != nil { logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion)) schemaTs = 0 @@ -259,18 +259,18 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i } // Returns the timestamp of a schema version, which is the commit timestamp of the schema diff -func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) { +func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) { tikvStore, ok := do.Store().(helper.Storage) if ok { - helper := helper.NewHelper(tikvStore) - data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version)) + newHelper := helper.NewHelper(tikvStore) + mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(m.EncodeSchemaDiffKey(version), startTS) if err != nil { return 0, err } - if data == nil || data.Info == nil || len(data.Info.Writes) == 0 { + if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 { return 0, errors.Errorf("There is no Write MVCC info for the schema version") } - return int64(data.Info.Writes[0].CommitTs), nil + return int64(mvccResp.Info.Writes[0].CommitTs), nil } return 0, errors.Errorf("cannot get store from domain") } diff --git a/store/helper/helper.go b/store/helper/helper.go index 28276d050953b..c7f4ba1e66532 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -94,9 +94,12 @@ func NewHelper(store Storage) *Helper { } } -// GetMvccByEncodedKey get the MVCC value by the specific encoded key. -func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) +// MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms. +const MaxBackoffTimeoutForMvccGet = 5000 + +// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved. +func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) { + bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeoutForMvccGet, nil) tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) for { keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) @@ -106,7 +109,7 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) if err != nil { - logutil.BgLogger().Info("get MVCC by encoded key failed", + logutil.BgLogger().Warn("get MVCC by encoded key failed", zap.Stringer("encodeKey", encodedKey), zap.Reflect("region", keyLocation.Region), zap.Stringer("keyLocation", keyLocation), @@ -114,6 +117,7 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe zap.Error(err)) return nil, errors.Trace(err) } + regionErr, err := kvResp.GetRegionError() if err != nil { return nil, errors.Trace(err) @@ -124,9 +128,10 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe } continue } + mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse) if errMsg := mvccResp.GetError(); errMsg != "" { - logutil.BgLogger().Info("get MVCC by encoded key failed", + logutil.BgLogger().Warn("get MVCC by encoded key failed", zap.Stringer("encodeKey", encodedKey), zap.Reflect("region", keyLocation.Region), zap.Stringer("keyLocation", keyLocation), @@ -134,10 +139,59 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe zap.String("error", errMsg)) return nil, errors.New(errMsg) } + if mvccResp.Info == nil { + errMsg := "Invalid mvcc response result, the info field is nil" + logutil.BgLogger().Warn(errMsg, + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp)) + return nil, errors.New(errMsg) + } + + // Try to resolve the lock and retry mvcc get again if the input startTS is a valid value. + if startTS > 0 && mvccResp.Info.GetLock() != nil { + lockInfo := mvccResp.Info.GetLock() + lock := &txnlock.Lock{ + Key: []byte(encodedKey), + Primary: lockInfo.GetPrimary(), + TxnID: lockInfo.GetStartTs(), + TTL: lockInfo.GetTtl(), + TxnSize: lockInfo.GetTxnSize(), + LockType: lockInfo.GetType(), + UseAsyncCommit: lockInfo.GetUseAsyncCommit(), + LockForUpdateTS: lockInfo.GetForUpdateTs(), + } + // Disable for read to avoid async resolve. + resolveLocksOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: startTS, + Locks: []*txnlock.Lock{lock}, + Lite: true, + ForRead: false, + Detail: nil, + } + resolveLockRes, err := h.Store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLocksOpts) + if err != nil { + return nil, err + } + msBeforeExpired := resolveLockRes.TTL + if msBeforeExpired > 0 { + if err = bo.BackoffWithCfgAndMaxSleep(tikv.BoTxnLock(), int(msBeforeExpired), + errors.Errorf("resolve lock fails lock: %v", lock)); err != nil { + return nil, err + } + } + continue + } return mvccResp, nil } } +// GetMvccByEncodedKey get the MVCC value by the specific encoded key. +func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { + return h.GetMvccByEncodedKeyWithTS(encodedKey, 0) +} + // MvccKV wraps the key's mvcc info in tikv. type MvccKV struct { Key string `json:"key"`