From d2bdfd51b71646dd7c78e437ee40458bef6649cd Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 13 May 2021 17:49:39 +0800 Subject: [PATCH] store/tikv: move transaction options out to /kv (#24619) --- ddl/backfilling.go | 2 +- ddl/column.go | 3 +- ddl/index.go | 5 +-- executor/adapter.go | 3 +- executor/analyze.go | 18 ++++---- executor/batch_point_get.go | 12 ++--- executor/insert.go | 5 +-- executor/insert_common.go | 5 +-- executor/point_get.go | 14 +++--- executor/replace.go | 5 +-- executor/simple.go | 5 +-- executor/update.go | 5 +-- kv/mock_test.go | 3 +- kv/option.go | 62 ++++++++++++++++++++++++++ meta/meta.go | 5 +-- session/session.go | 30 ++++++------- sessionctx/binloginfo/binloginfo.go | 3 +- store/driver/txn/snapshot.go | 22 ++++----- store/driver/txn/txn_driver.go | 48 ++++++++++---------- store/tikv/kv/option.go | 48 -------------------- store/tikv/tests/snapshot_fail_test.go | 2 - 21 files changed, 153 insertions(+), 152 deletions(-) create mode 100644 kv/option.go diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 56512eec6ab65..ed279c68675cf 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -677,7 +677,7 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version ver := kv.Version{Ver: version} snap := store.GetSnapshot(ver) - snap.SetOption(tikvstore.Priority, priority) + snap.SetOption(kv.Priority, priority) it, err := snap.Iter(firstKey, upperBound) if err != nil { diff --git a/ddl/column.go b/ddl/column.go index 18c23b4d9c45a..e18c0c2d37e7b 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -1346,7 +1345,7 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(tikvstore.Priority, w.priority) + txn.SetOption(kv.Priority, w.priority) rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { diff --git a/ddl/index.go b/ddl/index.go index f11a595aa8fb3..b1b4303d7a0f1 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -1117,7 +1116,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(tikvstore.Priority, w.priority) + txn.SetOption(kv.Priority, w.priority) idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { @@ -1329,7 +1328,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(tikvstore.Priority, w.priority) + txn.SetOption(kv.Priority, w.priority) idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { diff --git a/executor/adapter.go b/executor/adapter.go index 44d00cd1efa1e..784696996cb94 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -43,7 +43,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" tikverr "github.com/pingcap/tidb/store/tikv/error" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -629,7 +628,7 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error { newForUpdateTS = version.Ver } seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS) - txn.SetOption(tikvstore.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS()) + txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS()) return nil } diff --git a/executor/analyze.go b/executor/analyze.go index 9cf9c75b1261c..fec55d870bf95 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -1121,9 +1121,9 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err return nil, errors.Trace(err) } } - txn.SetOption(tikvstore.Priority, kv.PriorityLow) - txn.SetOption(tikvstore.IsolationLevel, kv.RC) - txn.SetOption(tikvstore.NotFillCache, true) + txn.SetOption(kv.Priority, kv.PriorityLow) + txn.SetOption(kv.IsolationLevel, kv.RC) + txn.SetOption(kv.NotFillCache, true) return rollbackFn, nil } @@ -1322,7 +1322,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) { snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower) + snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower) } for _, t := range e.scanTasks { iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey)) @@ -1341,11 +1341,11 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { defer e.wg.Done() snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion) - snapshot.SetOption(tikvstore.NotFillCache, true) - snapshot.SetOption(tikvstore.IsolationLevel, kv.RC) - snapshot.SetOption(tikvstore.Priority, kv.PriorityLow) + snapshot.SetOption(kv.NotFillCache, true) + snapshot.SetOption(kv.IsolationLevel, kv.RC) + snapshot.SetOption(kv.Priority, kv.PriorityLow) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower) + snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower) } rander := rand.New(rand.NewSource(e.randSeed)) @@ -1356,7 +1356,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { lower, upper := step-uint32(2*math.Sqrt(float64(step))), step step = uint32(rander.Intn(int(upper-lower))) + lower } - snapshot.SetOption(tikvstore.SampleStep, step) + snapshot.SetOption(kv.SampleStep, step) kvMap := make(map[string][]byte) var iter kv.Iterator iter, *err = snapshot.Iter(kv.Key(task.StartKey), kv.Key(task.EndKey)) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index f3ec18106fb21..23debe37404ee 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -113,17 +113,17 @@ func (e *BatchPointGetExec) Open(context.Context) error { e.stats = &runtimeStatsWithSnapshot{ SnapshotRuntimeStats: snapshotStats, } - snapshot.SetOption(tikvstore.CollectRuntimeStats, snapshotStats) + snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower) + snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower) } - snapshot.SetOption(tikvstore.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness - snapshot.SetOption(tikvstore.IsStalenessReadOnly, isStaleness) + snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { - snapshot.SetOption(tikvstore.MatchStoreLabels, []*metapb.StoreLabel{ + snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, Value: e.ctx.GetSessionVars().TxnCtx.TxnScope, @@ -149,7 +149,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { // Close implements the Executor interface. func (e *BatchPointGetExec) Close() error { if e.runtimeStats != nil && e.snapshot != nil { - e.snapshot.DelOption(tikvstore.CollectRuntimeStats) + e.snapshot.DelOption(kv.CollectRuntimeStats) } e.inited = 0 e.index = 0 diff --git a/executor/insert.go b/executor/insert.go index e8fdb9da3444e..178aefed5fb8b 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -215,8 +214,8 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D if e.collectRuntimeStatsEnabled() { if snapshot := txn.GetSnapshot(); snapshot != nil { - snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) - defer snapshot.DelOption(tikvstore.CollectRuntimeStats) + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) } } prefetchStart := time.Now() diff --git a/executor/insert_common.go b/executor/insert_common.go index 10fc6cb9edc59..258e873db89db 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" @@ -1049,8 +1048,8 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D } if e.collectRuntimeStatsEnabled() { if snapshot := txn.GetSnapshot(); snapshot != nil { - snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) - defer snapshot.DelOption(tikvstore.CollectRuntimeStats) + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) } } prefetchStart := time.Now() diff --git a/executor/point_get.go b/executor/point_get.go index c34987b7f0c1d..8857a4d253fd0 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -144,17 +144,17 @@ func (e *PointGetExecutor) Open(context.Context) error { e.stats = &runtimeStatsWithSnapshot{ SnapshotRuntimeStats: snapshotStats, } - e.snapshot.SetOption(tikvstore.CollectRuntimeStats, snapshotStats) + e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - e.snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower) + e.snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower) } - e.snapshot.SetOption(tikvstore.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness - e.snapshot.SetOption(tikvstore.IsStalenessReadOnly, isStaleness) + e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { - e.snapshot.SetOption(tikvstore.MatchStoreLabels, []*metapb.StoreLabel{ + e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, Value: e.ctx.GetSessionVars().TxnCtx.TxnScope, @@ -167,7 +167,7 @@ func (e *PointGetExecutor) Open(context.Context) error { // Close implements the Executor interface. func (e *PointGetExecutor) Close() error { if e.runtimeStats != nil && e.snapshot != nil { - e.snapshot.DelOption(tikvstore.CollectRuntimeStats) + e.snapshot.DelOption(kv.CollectRuntimeStats) } if e.idxInfo != nil && e.tblInfo != nil { actRows := int64(0) @@ -391,7 +391,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) } func (e *PointGetExecutor) verifyTxnScope() error { - txnScope := e.txn.GetOption(tikvstore.TxnScope).(string) + txnScope := e.txn.GetOption(kv.TxnScope).(string) if txnScope == "" || txnScope == oracle.GlobalTxnScope { return nil } diff --git a/executor/replace.go b/executor/replace.go index 20af75fe4a0ae..8f35be4d05dbd 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -221,8 +220,8 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { if e.collectRuntimeStatsEnabled() { if snapshot := txn.GetSnapshot(); snapshot != nil { - snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) - defer snapshot.DelOption(tikvstore.CollectRuntimeStats) + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) } } prefetchStart := time.Now() diff --git a/executor/simple.go b/executor/simple.go index 24cb857aec3d5..74063b2429c06 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -40,7 +40,6 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" tikvutil "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/types" @@ -606,10 +605,10 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { return err } if e.ctx.GetSessionVars().TxnCtx.IsPessimistic { - txn.SetOption(tikvstore.Pessimistic, true) + txn.SetOption(kv.Pessimistic, true) } if s.CausalConsistencyOnly { - txn.SetOption(tikvstore.GuaranteeLinearizability, false) + txn.SetOption(kv.GuaranteeLinearizability, false) } return nil } diff --git a/executor/update.go b/executor/update.go index b8c7e2a985142..7c4b07ab8e6f6 100644 --- a/executor/update.go +++ b/executor/update.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/store/tikv" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -261,7 +260,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { if e.collectRuntimeStatsEnabled() { txn, err := e.ctx.Txn(false) if err == nil && txn.GetSnapshot() != nil { - txn.GetSnapshot().SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) } } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { @@ -408,7 +407,7 @@ func (e *UpdateExec) Close() error { if e.runtimeStats != nil && e.stats != nil { txn, err := e.ctx.Txn(false) if err == nil && txn.GetSnapshot() != nil { - txn.GetSnapshot().DelOption(tikvstore.CollectRuntimeStats) + txn.GetSnapshot().DelOption(kv.CollectRuntimeStats) } } return e.children[0].Close() diff --git a/kv/mock_test.go b/kv/mock_test.go index 45e45d5941251..eba059e763f82 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -17,7 +17,6 @@ import ( "context" . "github.com/pingcap/check" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -35,7 +34,7 @@ func (s testMockSuite) TestInterface(c *C) { snapshot := storage.GetSnapshot(version) _, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")}) c.Check(err, IsNil) - snapshot.SetOption(tikvstore.Priority, PriorityNormal) + snapshot.SetOption(Priority, PriorityNormal) transaction, err := storage.Begin() c.Check(err, IsNil) diff --git a/kv/option.go b/kv/option.go new file mode 100644 index 0000000000000..5b04dfba06c95 --- /dev/null +++ b/kv/option.go @@ -0,0 +1,62 @@ +// Copyright 2021 PingCAP, Inc. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +// Transaction options +const ( + // BinlogInfo contains the binlog data and client. + BinlogInfo int = iota + 1 + // SchemaChecker is used for checking schema-validity. + SchemaChecker + // IsolationLevel sets isolation level for current transaction. The default level is SI. + IsolationLevel + // Priority marks the priority of this transaction. + Priority + // NotFillCache makes this request do not touch the LRU cache of the underlying storage. + NotFillCache + // SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized. + SyncLog + // KeyOnly retrieve only keys, it can be used in scan now. + KeyOnly + // Pessimistic is defined for pessimistic lock + Pessimistic + // SnapshotTS is defined to set snapshot ts. + SnapshotTS + // Set replica read + ReplicaRead + // Set task ID + TaskID + // InfoSchema is schema version used by txn startTS. + InfoSchema + // CollectRuntimeStats is used to enable collect runtime stats. + CollectRuntimeStats + // SchemaAmender is used to amend mutations for pessimistic transactions + SchemaAmender + // SampleStep skips 'SampleStep - 1' number of keys after each returned key. + SampleStep + // CommitHook is a callback function called right after the transaction gets committed + CommitHook + // EnableAsyncCommit indicates whether async commit is enabled + EnableAsyncCommit + // Enable1PC indicates whether one-phase commit is enabled + Enable1PC + // GuaranteeLinearizability indicates whether to guarantee linearizability at the cost of an extra tso request before prewrite + GuaranteeLinearizability + // TxnScope indicates which @@txn_scope this transaction will work with. + TxnScope + // StalenessReadOnly indicates whether the transaction is staleness read only transaction + IsStalenessReadOnly + // MatchStoreLabels indicates the labels the store should be matched + MatchStoreLabels +) diff --git a/meta/meta.go b/meta/meta.go index 2682ed5b47d1e..3f76d2948e9b1 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/structure" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" @@ -94,8 +93,8 @@ type Meta struct { // NewMeta creates a Meta in transaction txn. // If the current Meta needs to handle a job, jobListKey is the type of the job's list. func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta { - txn.SetOption(tikvstore.Priority, kv.PriorityHigh) - txn.SetOption(tikvstore.SyncLog, struct{}{}) + txn.SetOption(kv.Priority, kv.PriorityHigh) + txn.SetOption(kv.SyncLog, struct{}{}) t := structure.NewStructure(txn, txn, mMetaPrefix) listKey := DefaultJobListKey if len(jobListKeys) != 0 { diff --git a/session/session.go b/session/session.go index 19312e5fc391e..8fc7c4c37eac8 100644 --- a/session/session.go +++ b/session/session.go @@ -497,7 +497,7 @@ func (s *session) doCommit(ctx context.Context) error { }, Client: s.sessionVars.BinlogClient, } - s.txn.SetOption(tikvstore.BinlogInfo, info) + s.txn.SetOption(kv.BinlogInfo, info) } } @@ -508,22 +508,22 @@ func (s *session) doCommit(ctx context.Context) error { physicalTableIDs = append(physicalTableIDs, id) } // Set this option for 2 phase commit to validate schema lease. - s.txn.SetOption(tikvstore.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.GetInfoSchema().SchemaMetaVersion(), physicalTableIDs)) - s.txn.SetOption(tikvstore.InfoSchema, s.sessionVars.TxnCtx.InfoSchema) - s.txn.SetOption(tikvstore.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info }) + s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.GetInfoSchema().SchemaMetaVersion(), physicalTableIDs)) + s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema) + s.txn.SetOption(kv.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info }) if s.GetSessionVars().EnableAmendPessimisticTxn { - s.txn.SetOption(tikvstore.SchemaAmender, NewSchemaAmenderForTikvTxn(s)) + s.txn.SetOption(kv.SchemaAmender, NewSchemaAmenderForTikvTxn(s)) } - s.txn.SetOption(tikvstore.EnableAsyncCommit, s.GetSessionVars().EnableAsyncCommit) - s.txn.SetOption(tikvstore.Enable1PC, s.GetSessionVars().Enable1PC) + s.txn.SetOption(kv.EnableAsyncCommit, s.GetSessionVars().EnableAsyncCommit) + s.txn.SetOption(kv.Enable1PC, s.GetSessionVars().Enable1PC) // priority of the sysvar is lower than `start transaction with causal consistency only` - if val := s.txn.GetOption(tikvstore.GuaranteeLinearizability); val == nil || val.(bool) { + if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) { // We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions // because the property is naturally holds: // We guarantee the commitTS of any transaction must not exceed the next timestamp from the TSO. // An auto-commit transaction fetches its startTS from the TSO so its commitTS > its startTS > the commitTS // of any previously committed transactions. - s.txn.SetOption(tikvstore.GuaranteeLinearizability, + s.txn.SetOption(kv.GuaranteeLinearizability, s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability) } @@ -1883,7 +1883,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { } s.sessionVars.TxnCtx.StartTS = s.txn.StartTS() if s.sessionVars.TxnCtx.IsPessimistic { - s.txn.SetOption(tikvstore.Pessimistic, true) + s.txn.SetOption(kv.Pessimistic, true) } if !s.sessionVars.IsAutocommit() { s.sessionVars.SetInTxn(true) @@ -1891,7 +1891,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable() s.txn.SetVars(s.sessionVars.KVVars) if s.sessionVars.GetReplicaRead().IsFollowerRead() { - s.txn.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower) + s.txn.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower) } } return &s.txn, nil @@ -1955,7 +1955,7 @@ func (s *session) NewTxn(ctx context.Context) error { } txn.SetVars(s.sessionVars.KVVars) if s.GetSessionVars().GetReplicaRead().IsFollowerRead() { - txn.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower) + txn.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower) } s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() @@ -2763,7 +2763,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { if s.txn.Valid() { txnID := s.txn.StartTS() - txnScope := s.txn.GetOption(tikvstore.TxnScope).(string) + txnScope := s.txn.GetOption(kv.TxnScope).(string) err := s.CommitTxn(ctx) if err != nil { return err @@ -2803,8 +2803,8 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc return s.NewTxn(ctx) } txn.SetVars(s.sessionVars.KVVars) - txn.SetOption(tikvstore.IsStalenessReadOnly, true) - txn.SetOption(tikvstore.TxnScope, txnScope) + txn.SetOption(kv.IsStalenessReadOnly, true) + txn.SetOption(kv.TxnScope, txnScope) s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() s.sessionVars.TxnCtx = &variable.TransactionContext{ diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index 58313505e1c8e..163c22e4a6fb7 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-binlog" @@ -295,7 +294,7 @@ func SetDDLBinlog(client *pumpcli.PumpsClient, txn kv.Transaction, jobID int64, }, Client: client, } - txn.SetOption(tikvstore.BinlogInfo, info) + txn.SetOption(kv.BinlogInfo, info) } const specialPrefix = `/*T! ` diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index a6a7d752a72fd..405067f5e082b 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -66,33 +66,33 @@ func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { func (s *tikvSnapshot) SetOption(opt int, val interface{}) { switch opt { - case tikvstore.IsolationLevel: + case kv.IsolationLevel: level := getTiKVIsolationLevel(val.(kv.IsoLevel)) s.KVSnapshot.SetIsolationLevel(level) - case tikvstore.Priority: + case kv.Priority: s.KVSnapshot.SetPriority(getTiKVPriority(val.(int))) - case tikvstore.NotFillCache: + case kv.NotFillCache: s.KVSnapshot.SetNotFillCache(val.(bool)) - case tikvstore.SnapshotTS: + case kv.SnapshotTS: s.KVSnapshot.SetSnapshotTS(val.(uint64)) - case tikvstore.ReplicaRead: + case kv.ReplicaRead: s.KVSnapshot.SetReplicaRead(val.(tikvstore.ReplicaReadType)) - case tikvstore.SampleStep: + case kv.SampleStep: s.KVSnapshot.SetSampleStep(val.(uint32)) - case tikvstore.TaskID: + case kv.TaskID: s.KVSnapshot.SetTaskID(val.(uint64)) - case tikvstore.CollectRuntimeStats: + case kv.CollectRuntimeStats: s.KVSnapshot.SetRuntimeStats(val.(*tikv.SnapshotRuntimeStats)) - case tikvstore.IsStalenessReadOnly: + case kv.IsStalenessReadOnly: s.KVSnapshot.SetIsStatenessReadOnly(val.(bool)) - case tikvstore.MatchStoreLabels: + case kv.MatchStoreLabels: s.KVSnapshot.SetMatchStoreLabels(val.([]*metapb.StoreLabel)) } } func (s *tikvSnapshot) DelOption(opt int) { switch opt { - case tikvstore.CollectRuntimeStats: + case kv.CollectRuntimeStats: s.KVSnapshot.SetRuntimeStats(nil) } } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 0cd51a4480ee1..4d5ce77034312 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -126,51 +126,51 @@ func (txn *tikvTxn) GetUnionStore() kv.UnionStore { func (txn *tikvTxn) SetOption(opt int, val interface{}) { switch opt { - case tikvstore.BinlogInfo: + case kv.BinlogInfo: txn.SetBinlogExecutor(&binlogExecutor{ txn: txn.KVTxn, binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type. }) - case tikvstore.SchemaChecker: + case kv.SchemaChecker: txn.SetSchemaLeaseChecker(val.(tikv.SchemaLeaseChecker)) - case tikvstore.IsolationLevel: + case kv.IsolationLevel: level := getTiKVIsolationLevel(val.(kv.IsoLevel)) txn.KVTxn.GetSnapshot().SetIsolationLevel(level) - case tikvstore.Priority: + case kv.Priority: txn.KVTxn.SetPriority(getTiKVPriority(val.(int))) - case tikvstore.NotFillCache: + case kv.NotFillCache: txn.KVTxn.GetSnapshot().SetNotFillCache(val.(bool)) - case tikvstore.SyncLog: + case kv.SyncLog: txn.EnableForceSyncLog() - case tikvstore.Pessimistic: + case kv.Pessimistic: txn.SetPessimistic(val.(bool)) - case tikvstore.SnapshotTS: + case kv.SnapshotTS: txn.KVTxn.GetSnapshot().SetSnapshotTS(val.(uint64)) - case tikvstore.ReplicaRead: + case kv.ReplicaRead: txn.KVTxn.GetSnapshot().SetReplicaRead(val.(tikvstore.ReplicaReadType)) - case tikvstore.TaskID: + case kv.TaskID: txn.KVTxn.GetSnapshot().SetTaskID(val.(uint64)) - case tikvstore.InfoSchema: + case kv.InfoSchema: txn.SetSchemaVer(val.(tikv.SchemaVer)) - case tikvstore.CollectRuntimeStats: + case kv.CollectRuntimeStats: txn.KVTxn.GetSnapshot().SetRuntimeStats(val.(*tikv.SnapshotRuntimeStats)) - case tikvstore.SchemaAmender: + case kv.SchemaAmender: txn.SetSchemaAmender(val.(tikv.SchemaAmender)) - case tikvstore.SampleStep: + case kv.SampleStep: txn.KVTxn.GetSnapshot().SetSampleStep(val.(uint32)) - case tikvstore.CommitHook: + case kv.CommitHook: txn.SetCommitCallback(val.(func(string, error))) - case tikvstore.EnableAsyncCommit: + case kv.EnableAsyncCommit: txn.SetEnableAsyncCommit(val.(bool)) - case tikvstore.Enable1PC: + case kv.Enable1PC: txn.SetEnable1PC(val.(bool)) - case tikvstore.GuaranteeLinearizability: + case kv.GuaranteeLinearizability: txn.SetCausalConsistency(!val.(bool)) - case tikvstore.TxnScope: + case kv.TxnScope: txn.SetScope(val.(string)) - case tikvstore.IsStalenessReadOnly: + case kv.IsStalenessReadOnly: txn.KVTxn.GetSnapshot().SetIsStatenessReadOnly(val.(bool)) - case tikvstore.MatchStoreLabels: + case kv.MatchStoreLabels: txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) default: txn.KVTxn.SetOption(opt, val) @@ -179,9 +179,9 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { func (txn *tikvTxn) GetOption(opt int) interface{} { switch opt { - case tikvstore.GuaranteeLinearizability: + case kv.GuaranteeLinearizability: return !txn.KVTxn.IsCasualConsistency() - case tikvstore.TxnScope: + case kv.TxnScope: return txn.KVTxn.GetScope() default: return txn.KVTxn.GetOption(opt) @@ -190,7 +190,7 @@ func (txn *tikvTxn) GetOption(opt int) interface{} { func (txn *tikvTxn) DelOption(opt int) { switch opt { - case tikvstore.CollectRuntimeStats: + case kv.CollectRuntimeStats: txn.KVTxn.GetSnapshot().SetRuntimeStats(nil) default: txn.KVTxn.DelOption(opt) diff --git a/store/tikv/kv/option.go b/store/tikv/kv/option.go index bac9316d41773..7bd36733a568d 100644 --- a/store/tikv/kv/option.go +++ b/store/tikv/kv/option.go @@ -13,54 +13,6 @@ package kv -// Transaction options -const ( - // BinlogInfo contains the binlog data and client. - BinlogInfo int = iota + 1 - // SchemaChecker is used for checking schema-validity. - SchemaChecker - // IsolationLevel sets isolation level for current transaction. The default level is SI. - IsolationLevel - // Priority marks the priority of this transaction. - Priority - // NotFillCache makes this request do not touch the LRU cache of the underlying storage. - NotFillCache - // SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized. - SyncLog - // KeyOnly retrieve only keys, it can be used in scan now. - KeyOnly - // Pessimistic is defined for pessimistic lock - Pessimistic - // SnapshotTS is defined to set snapshot ts. - SnapshotTS - // Set replica read - ReplicaRead - // Set task ID - TaskID - // InfoSchema is schema version used by txn startTS. - InfoSchema - // CollectRuntimeStats is used to enable collect runtime stats. - CollectRuntimeStats - // SchemaAmender is used to amend mutations for pessimistic transactions - SchemaAmender - // SampleStep skips 'SampleStep - 1' number of keys after each returned key. - SampleStep - // CommitHook is a callback function called right after the transaction gets committed - CommitHook - // EnableAsyncCommit indicates whether async commit is enabled - EnableAsyncCommit - // Enable1PC indicates whether one-phase commit is enabled - Enable1PC - // GuaranteeLinearizability indicates whether to guarantee linearizability at the cost of an extra tso request before prewrite - GuaranteeLinearizability - // TxnScope indicates which @@txn_scope this transaction will work with. - TxnScope - // StalenessReadOnly indicates whether the transaction is staleness read only transaction - IsStalenessReadOnly - // MatchStoreLabels indicates the labels the store should be matched - MatchStoreLabels -) - // Priority value for transaction priority. // TODO: remove after BR update. const ( diff --git a/store/tikv/tests/snapshot_fail_test.go b/store/tikv/tests/snapshot_fail_test.go index 9892061c44b8d..ed812b4f46e00 100644 --- a/store/tikv/tests/snapshot_fail_test.go +++ b/store/tikv/tests/snapshot_fail_test.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv" tikverr "github.com/pingcap/tidb/store/tikv/error" - "github.com/pingcap/tidb/store/tikv/kv" ) type testSnapshotFailSuite struct { @@ -151,7 +150,6 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { c.Assert(err, IsNil) err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) txn.SetEnableAsyncCommit(true) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil)