Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: add resolve lock logic for mvcc get key loading schema diff #48282

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, 0, nil, err
}
// fetch the commit timestamp of the schema diff
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion)
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS)
if err != nil {
logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion))
schemaTs = 0
Expand Down Expand Up @@ -259,18 +259,18 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
}

// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) {
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) {
tikvStore, ok := do.Store().(helper.Storage)
if ok {
helper := helper.NewHelper(tikvStore)
data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version))
newHelper := helper.NewHelper(tikvStore)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(m.EncodeSchemaDiffKey(version), startTS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just call another GetMvccByEncodedKey(lock.primary) to get the commit-ts? Resolving lock and retring makes GetMvcc complex and heavy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible the primary key is not committed or the ongoing internal transaction is invisible to the input start_ts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, it's better to use snapshot interfaces instead of the mvcc interfaces, I've filed an issue about it #48283

if err != nil {
return 0, err
}
if data == nil || data.Info == nil || len(data.Info.Writes) == 0 {
if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(data.Info.Writes[0].CommitTs), nil
return int64(mvccResp.Info.Writes[0].CommitTs), nil
}
return 0, errors.Errorf("cannot get store from domain")
}
Expand Down
63 changes: 58 additions & 5 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ func NewHelper(store Storage) *Helper {
}
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved.
func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) {
// A derived value from previous implementation possible experiencing value 5000ms.
MaxBackoffTimeout := 5000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract a const?

bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeout, nil)
tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey})
for {
keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey)
Expand All @@ -106,14 +108,15 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe

kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute)
if err != nil {
logutil.BgLogger().Info("get MVCC by encoded key failed",
logutil.BgLogger().Warn("get MVCC by encoded key failed",
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}

regionErr, err := kvResp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -124,20 +127,70 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe
}
continue
}

mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse)
if errMsg := mvccResp.GetError(); errMsg != "" {
logutil.BgLogger().Info("get MVCC by encoded key failed",
logutil.BgLogger().Warn("get MVCC by encoded key failed",
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp),
zap.String("error", errMsg))
return nil, errors.New(errMsg)
}
if mvccResp.Info == nil {
errMsg := "Invalid mvcc response result, the info field is nil"
logutil.BgLogger().Warn(errMsg,
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp))
return nil, errors.New(errMsg)
}

// Try to resolve the lock and retry mvcc get again if the input startTS is a valid value.
if startTS > 0 && mvccResp.Info.GetLock() != nil {
lockInfo := mvccResp.Info.GetLock()
lock := &txnlock.Lock{
Key: []byte(encodedKey),
Primary: lockInfo.GetPrimary(),
TxnID: lockInfo.GetStartTs(),
TTL: lockInfo.GetTtl(),
TxnSize: lockInfo.GetTxnSize(),
LockType: lockInfo.GetType(),
UseAsyncCommit: lockInfo.GetUseAsyncCommit(),
LockForUpdateTS: lockInfo.GetForUpdateTs(),
}
// Disable for read to avoid async resolve.
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: startTS,
Locks: []*txnlock.Lock{lock},
Lite: true,
ForRead: false,
Detail: nil,
}
resolveLockRes, err := h.Store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLocksOpts)
if err != nil {
return nil, err
}
msBeforeExpired := resolveLockRes.TTL
if msBeforeExpired > 0 {
if err = bo.BackoffWithCfgAndMaxSleep(tikv.BoTxnLock(), int(msBeforeExpired),
errors.Errorf("resolve lock fails lock: %v", lock)); err != nil {
return nil, err
}
}
continue
}
return mvccResp, nil
}
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
return h.GetMvccByEncodedKeyWithTS(encodedKey, 0)
}

// MvccKV wraps the key's mvcc info in tikv.
type MvccKV struct {
Key string `json:"key"`
Expand Down