From 9f1cd12aa561b3b1928d52289000db81005c1ccf Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Wed, 8 Nov 2023 16:50:13 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #48330 Signed-off-by: ti-chi-bot --- domain/domain.go | 12 ++--- store/helper/helper.go | 108 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 6 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index f41a5dcc634b9..dfdf0e43e7ce7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -180,7 +180,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, 0, nil, err } // fetch the commit timestamp of the schema diff - schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion) + schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS) if err != nil { logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion)) schemaTs = 0 @@ -246,18 +246,18 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i } // Returns the timestamp of a schema version, which is the commit timestamp of the schema diff -func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) { +func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) { tikvStore, ok := do.Store().(helper.Storage) if ok { - helper := helper.NewHelper(tikvStore) - data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version)) + newHelper := helper.NewHelper(tikvStore) + mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(m.EncodeSchemaDiffKey(version), startTS) if err != nil { return 0, err } - if data == nil || data.Info == nil || len(data.Info.Writes) == 0 { + if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 { return 0, errors.Errorf("There is no Write MVCC info for the schema version") } - return int64(data.Info.Writes[0].CommitTs), nil + return int64(mvccResp.Info.Writes[0].CommitTs), nil } return 0, errors.Errorf("cannot get store from domain") } diff --git a/store/helper/helper.go b/store/helper/helper.go index 1b077dcec8843..84350f79da8ea 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -94,11 +94,114 @@ func NewHelper(store Storage) *Helper { } } +<<<<<<< HEAD:store/helper/helper.go // GetMvccByEncodedKey get the MVCC value by the specific encoded key. func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey) if err != nil { return nil, derr.ToTiDBErr(err) +======= +// MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms. +const MaxBackoffTimeoutForMvccGet = 5000 + +// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved. +func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) { + bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeoutForMvccGet, nil) + tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) + for { + keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } + kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.BgLogger().Warn("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + + regionErr, err := kvResp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + if err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())); err != nil { + return nil, err + } + continue + } + + mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse) + if errMsg := mvccResp.GetError(); errMsg != "" { + logutil.BgLogger().Warn("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.String("error", errMsg)) + return nil, errors.New(errMsg) + } + if mvccResp.Info == nil { + errMsg := "Invalid mvcc response result, the info field is nil" + logutil.BgLogger().Warn(errMsg, + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp)) + return nil, errors.New(errMsg) + } + + // Try to resolve the lock and retry mvcc get again if the input startTS is a valid value. + if startTS > 0 && mvccResp.Info.GetLock() != nil { + latestTS, err := h.Store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + logutil.BgLogger().Warn("Failed to get latest ts", zap.Error(err)) + return nil, err + } + if startTS > latestTS { + errMsg := fmt.Sprintf("Snapshot ts=%v is larger than latest allocated ts=%v, lock could not be resolved", + startTS, latestTS) + logutil.BgLogger().Warn(errMsg) + return nil, errors.New(errMsg) + } + lockInfo := mvccResp.Info.GetLock() + lock := &txnlock.Lock{ + Key: []byte(encodedKey), + Primary: lockInfo.GetPrimary(), + TxnID: lockInfo.GetStartTs(), + TTL: lockInfo.GetTtl(), + TxnSize: lockInfo.GetTxnSize(), + LockType: lockInfo.GetType(), + UseAsyncCommit: lockInfo.GetUseAsyncCommit(), + LockForUpdateTS: lockInfo.GetForUpdateTs(), + } + // Disable for read to avoid async resolve. + resolveLocksOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: startTS, + Locks: []*txnlock.Lock{lock}, + Lite: true, + ForRead: false, + Detail: nil, + } + resolveLockRes, err := h.Store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLocksOpts) + if err != nil { + return nil, err + } + msBeforeExpired := resolveLockRes.TTL + if msBeforeExpired > 0 { + if err = bo.BackoffWithCfgAndMaxSleep(tikv.BoTxnLock(), int(msBeforeExpired), + errors.Errorf("resolve lock fails lock: %v", lock)); err != nil { + return nil, err + } + } + continue + } + return mvccResp, nil +>>>>>>> 64e5ea06226 (domain: add resolve lock logic for mvcc get key loading schema diff (#48330)):pkg/store/helper/helper.go } tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) @@ -115,6 +218,11 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil } +// GetMvccByEncodedKey get the MVCC value by the specific encoded key. +func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { + return h.GetMvccByEncodedKeyWithTS(encodedKey, 0) +} + // MvccKV wraps the key's mvcc info in tikv. type MvccKV struct { Key string `json:"key"` From 4d435b05e180868fae9fc610e8ad4e6d56dfa3be Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 24 Nov 2023 15:10:55 +0800 Subject: [PATCH 2/2] resolve conflicts Signed-off-by: cfzjywxk --- store/helper/helper.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/store/helper/helper.go b/store/helper/helper.go index 84350f79da8ea..5ff582f75e928 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -94,13 +94,6 @@ func NewHelper(store Storage) *Helper { } } -<<<<<<< HEAD:store/helper/helper.go -// GetMvccByEncodedKey get the MVCC value by the specific encoded key. -func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey) - if err != nil { - return nil, derr.ToTiDBErr(err) -======= // MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms. const MaxBackoffTimeoutForMvccGet = 5000 @@ -201,21 +194,7 @@ func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (* continue } return mvccResp, nil ->>>>>>> 64e5ea06226 (domain: add resolve lock logic for mvcc get key loading schema diff (#48330)):pkg/store/helper/helper.go - } - - tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) - kvResp, err := h.Store.SendReq(tikv.NewBackofferWithVars(context.Background(), 500, nil), tikvReq, keyLocation.Region, time.Minute) - if err != nil { - logutil.BgLogger().Info("get MVCC by encoded key failed", - zap.Stringer("encodeKey", encodedKey), - zap.Reflect("region", keyLocation.Region), - zap.Stringer("keyLocation", keyLocation), - zap.Reflect("kvResp", kvResp), - zap.Error(err)) - return nil, errors.Trace(err) } - return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil } // GetMvccByEncodedKey get the MVCC value by the specific encoded key.