diff --git a/domain/domain.go b/domain/domain.go index e1cba50ba9fbe..24aacaef58e9d 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -180,7 +180,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, 0, nil, err } // fetch the commit timestamp of the schema diff - schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion) + schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS) if err != nil { logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion)) schemaTs = 0 @@ -246,18 +246,18 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i } // Returns the timestamp of a schema version, which is the commit timestamp of the schema diff -func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) { +func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) { tikvStore, ok := do.Store().(helper.Storage) if ok { - helper := helper.NewHelper(tikvStore) - data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version)) + newHelper := helper.NewHelper(tikvStore) + mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(m.EncodeSchemaDiffKey(version), startTS) if err != nil { return 0, err } - if data == nil || data.Info == nil || len(data.Info.Writes) == 0 { + if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 { return 0, errors.Errorf("There is no Write MVCC info for the schema version") } - return int64(data.Info.Writes[0].CommitTs), nil + return int64(mvccResp.Info.Writes[0].CommitTs), nil } return 0, errors.Errorf("cannot get store from domain") } diff --git a/store/helper/helper.go b/store/helper/helper.go index 1b077dcec8843..5ff582f75e928 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -94,25 +94,112 @@ func NewHelper(store Storage) *Helper { } } -// GetMvccByEncodedKey get the MVCC value by the specific encoded key. -func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey) - if err != nil { - return nil, derr.ToTiDBErr(err) - } +// MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms. +const MaxBackoffTimeoutForMvccGet = 5000 +// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved. +func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) { + bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeoutForMvccGet, nil) tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) - kvResp, err := h.Store.SendReq(tikv.NewBackofferWithVars(context.Background(), 500, nil), tikvReq, keyLocation.Region, time.Minute) - if err != nil { - logutil.BgLogger().Info("get MVCC by encoded key failed", - zap.Stringer("encodeKey", encodedKey), - zap.Reflect("region", keyLocation.Region), - zap.Stringer("keyLocation", keyLocation), - zap.Reflect("kvResp", kvResp), - zap.Error(err)) - return nil, errors.Trace(err) + for { + keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } + kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.BgLogger().Warn("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + + regionErr, err := kvResp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + if err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())); err != nil { + return nil, err + } + continue + } + + mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse) + if errMsg := mvccResp.GetError(); errMsg != "" { + logutil.BgLogger().Warn("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.String("error", errMsg)) + return nil, errors.New(errMsg) + } + if mvccResp.Info == nil { + errMsg := "Invalid mvcc response result, the info field is nil" + logutil.BgLogger().Warn(errMsg, + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp)) + return nil, errors.New(errMsg) + } + + // Try to resolve the lock and retry mvcc get again if the input startTS is a valid value. + if startTS > 0 && mvccResp.Info.GetLock() != nil { + latestTS, err := h.Store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + logutil.BgLogger().Warn("Failed to get latest ts", zap.Error(err)) + return nil, err + } + if startTS > latestTS { + errMsg := fmt.Sprintf("Snapshot ts=%v is larger than latest allocated ts=%v, lock could not be resolved", + startTS, latestTS) + logutil.BgLogger().Warn(errMsg) + return nil, errors.New(errMsg) + } + lockInfo := mvccResp.Info.GetLock() + lock := &txnlock.Lock{ + Key: []byte(encodedKey), + Primary: lockInfo.GetPrimary(), + TxnID: lockInfo.GetStartTs(), + TTL: lockInfo.GetTtl(), + TxnSize: lockInfo.GetTxnSize(), + LockType: lockInfo.GetType(), + UseAsyncCommit: lockInfo.GetUseAsyncCommit(), + LockForUpdateTS: lockInfo.GetForUpdateTs(), + } + // Disable for read to avoid async resolve. + resolveLocksOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: startTS, + Locks: []*txnlock.Lock{lock}, + Lite: true, + ForRead: false, + Detail: nil, + } + resolveLockRes, err := h.Store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLocksOpts) + if err != nil { + return nil, err + } + msBeforeExpired := resolveLockRes.TTL + if msBeforeExpired > 0 { + if err = bo.BackoffWithCfgAndMaxSleep(tikv.BoTxnLock(), int(msBeforeExpired), + errors.Errorf("resolve lock fails lock: %v", lock)); err != nil { + return nil, err + } + } + continue + } + return mvccResp, nil } - return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil +} + +// GetMvccByEncodedKey get the MVCC value by the specific encoded key. +func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { + return h.GetMvccByEncodedKeyWithTS(encodedKey, 0) } // MvccKV wraps the key's mvcc info in tikv.