From 784a0829c1ac03b38efc2a3de16c251521493a98 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 17 May 2021 21:44:58 +0800 Subject: [PATCH] kv: move TxnScope into kv Signed-off-by: disksing --- ddl/db_integration_test.go | 3 +- ddl/delete_range.go | 3 +- ddl/reorg.go | 3 +- distsql/request_builder.go | 9 ++- domain/domain.go | 3 +- domain/domain_test.go | 4 +- domain/infosync/info.go | 2 +- executor/batch_point_get.go | 3 +- executor/executor_test.go | 4 +- executor/point_get.go | 5 +- executor/stale_txn_test.go | 3 +- kv/fault_injection_test.go | 3 +- kv/kv.go | 2 +- kv/mock_test.go | 3 +- kv/txn_scope_var.go | 73 +++++++++++++++++++++++ meta/meta_test.go | 3 +- server/server.go | 6 +- session/pessimistic_test.go | 4 +- session/schema_amender_test.go | 3 +- session/session.go | 5 +- session/session_test.go | 25 ++++---- sessionctx/variable/session.go | 10 ++-- sessionctx/variable/sysvar.go | 13 ++-- store/gcworker/gc_worker.go | 6 +- store/mockstore/mockcopr/executor_test.go | 2 +- store/store_test.go | 3 +- store/tikv/extract_start_ts_test.go | 6 +- store/tikv/oracle/oracle.go | 47 --------------- util/mock/context.go | 3 +- 29 files changed, 135 insertions(+), 124 deletions(-) create mode 100644 kv/txn_scope_var.go diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 405ada57f15ec..0c2f851da4413 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -41,7 +41,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -1343,7 +1342,7 @@ func getMaxTableHandle(ctx *testMaxTableRowIDContext, store kv.Storage) (kv.Hand c := ctx.c d := ctx.d tbl := ctx.tbl - curVer, err := store.CurrentVersion(oracle.GlobalTxnScope) + curVer, err := store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) maxHandle, emptyTable, err := d.GetTableMaxHandle(curVer.Ver, tbl.(table.PhysicalTable)) c.Assert(err, IsNil) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index e64c122d41e4d..1aeb5ab0354da 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" @@ -451,7 +450,7 @@ func doBatchInsert(s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint // getNowTS gets the current timestamp, in TSO. func getNowTSO(ctx sessionctx.Context) (uint64, error) { - currVer, err := ctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) + currVer, err := ctx.GetStore().CurrentVersion(kv.GlobalTxnScope) if err != nil { return 0, errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index fbe42573dbbf7..37bfe82d4ce3d 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -534,7 +533,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior } func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) { - ver, err = store.CurrentVersion(oracle.GlobalTxnScope) + ver, err = store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return ver, errors.Trace(err) } else if ver.Ver <= 0 { diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 69a6da548ec60..aced1a71aaa7b 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" @@ -236,7 +235,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req } builder.txnScope = sv.TxnCtx.TxnScope builder.IsStaleness = sv.TxnCtx.IsStaleness - if builder.IsStaleness && builder.txnScope != oracle.GlobalTxnScope { + if builder.IsStaleness && builder.txnScope != kv.GlobalTxnScope { builder.MatchStoreLabels = []*metapb.StoreLabel{ { Key: placement.DCLabelKey, @@ -279,9 +278,9 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ func (builder *RequestBuilder) verifyTxnScope() error { if builder.txnScope == "" { - builder.txnScope = oracle.GlobalTxnScope + builder.txnScope = kv.GlobalTxnScope } - if builder.txnScope == oracle.GlobalTxnScope || builder.is == nil { + if builder.txnScope == kv.GlobalTxnScope || builder.is == nil { return nil } visitPhysicalTableID := make(map[int64]struct{}) @@ -600,7 +599,7 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra // VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation. func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSchema) bool { - if txnScope == "" || txnScope == oracle.GlobalTxnScope { + if txnScope == "" || txnScope == kv.GlobalTxnScope { return true } bundle, ok := is.BundleByName(placement.GroupID(physicalTableID)) diff --git a/domain/domain.go b/domain/domain.go index 44f6df1aa9086..a22d647066ea1 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -48,7 +48,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" @@ -336,7 +335,7 @@ func (do *Domain) Reload() error { defer do.m.Unlock() startTime := time.Now() - ver, err := do.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := do.store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return err } diff --git a/domain/domain_test.go b/domain/domain_test.go index 82a583866aad3..51e0948d30715 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -347,7 +347,7 @@ func (*testSuite) TestT(c *C) { // for schemaValidator schemaVer := dom.SchemaValidator.(*schemaValidator).LatestSchemaVersion() - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) ts := ver.Ver @@ -360,7 +360,7 @@ func (*testSuite) TestT(c *C) { c.Assert(succ, Equals, ResultSucc) time.Sleep(ddlLease) - ver, err = store.CurrentVersion(oracle.GlobalTxnScope) + ver, err = store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) ts = ver.Ver _, succ = dom.SchemaValidator.Check(ts, schemaVer, nil) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index cbd76d4e6f266..be8d80246e96b 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -554,7 +554,7 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { pl := is.manager.ShowProcessList() // Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC. - currentVer, err := store.CurrentVersion(oracle.GlobalTxnScope) + currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) if err != nil { logutil.BgLogger().Error("update minStartTS failed", zap.Error(err)) return diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 2137884c69745..726603a0ff88f 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -30,7 +30,6 @@ import ( driver "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -122,7 +121,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) - if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, diff --git a/executor/executor_test.go b/executor/executor_test.go index 7fa0d7b0d10bd..fe56e798fa1d9 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2693,11 +2693,11 @@ func (s *testSuiteP2) TestHistoryRead(c *C) { // SnapshotTS Is not updated if check failed. c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) - curVer1, _ := s.store.CurrentVersion(oracle.GlobalTxnScope) + curVer1, _ := s.store.CurrentVersion(kv.GlobalTxnScope) time.Sleep(time.Millisecond) snapshotTime := time.Now() time.Sleep(time.Millisecond) - curVer2, _ := s.store.CurrentVersion(oracle.GlobalTxnScope) + curVer2, _ := s.store.CurrentVersion(kv.GlobalTxnScope) tk.MustExec("insert history_read values (2)") tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2")) tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'") diff --git a/executor/point_get.go b/executor/point_get.go index fc8326555bf01..dccb72bdebb17 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -153,7 +152,7 @@ func (e *PointGetExecutor) Open(context.Context) error { e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) - if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, @@ -392,7 +391,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) func (e *PointGetExecutor) verifyTxnScope() error { txnScope := e.txn.GetOption(kv.TxnScope).(string) - if txnScope == "" || txnScope == oracle.GlobalTxnScope { + if txnScope == "" || txnScope == kv.GlobalTxnScope { return nil } var tblID int64 diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 7cf235bd3c0f7..64b334b15bf94 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -20,6 +20,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/testkit" ) @@ -76,7 +77,7 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`, sql: "begin", IsStaleness: false, - txnScope: oracle.GlobalTxnScope, + txnScope: kv.GlobalTxnScope, zone: "", }, } diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 33b6535214b2c..c2041fec186af 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -19,7 +19,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/store/tikv/oracle" ) type testFaultInjectionSuite struct{} @@ -35,7 +34,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) + _, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(GlobalTxnScope).SetStartTs(0)) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/kv.go b/kv/kv.go index 572fe104024bc..4453d3f862665 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -352,7 +352,7 @@ type TransactionOption struct { // DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used func DefaultTransactionOption() TransactionOption { - return TransactionOption{TxnScope: oracle.GlobalTxnScope} + return TransactionOption{TxnScope: GlobalTxnScope} } // SetMaxPrevSec set maxPrevSec diff --git a/kv/mock_test.go b/kv/mock_test.go index eba059e763f82..e09c291d5de95 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -17,7 +17,6 @@ import ( "context" . "github.com/pingcap/check" - "github.com/pingcap/tidb/store/tikv/oracle" ) var _ = Suite(testMockSuite{}) @@ -29,7 +28,7 @@ func (s testMockSuite) TestInterface(c *C) { storage := newMockStorage() storage.GetClient() storage.UUID() - version, err := storage.CurrentVersion(oracle.GlobalTxnScope) + version, err := storage.CurrentVersion(GlobalTxnScope) c.Check(err, IsNil) snapshot := storage.GetSnapshot(version) _, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")}) diff --git a/kv/txn_scope_var.go b/kv/txn_scope_var.go new file mode 100644 index 0000000000000..941fdaff5f26f --- /dev/null +++ b/kv/txn_scope_var.go @@ -0,0 +1,73 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv/oracle" +) + +// TxnScopeVar indicates the used txnScope for oracle +type TxnScopeVar struct { + // varValue indicates the value of @@txn_scope, which can only be `global` or `local` + varValue string + // txnScope indicates the value which the tidb-server holds to request tso to pd + txnScope string +} + +// GetTxnScopeVar gets TxnScopeVar from config +func GetTxnScopeVar() TxnScopeVar { + isGlobal, location := config.GetTxnScopeFromConfig() + if isGlobal { + return NewGlobalTxnScopeVar() + } + return NewLocalTxnScopeVar(location) +} + +// NewGlobalTxnScopeVar creates a Global TxnScopeVar +func NewGlobalTxnScopeVar() TxnScopeVar { + return newTxnScopeVar(GlobalTxnScope, GlobalTxnScope) +} + +// NewLocalTxnScopeVar creates a Local TxnScopeVar with given real txnScope value. +func NewLocalTxnScopeVar(txnScope string) TxnScopeVar { + return newTxnScopeVar(LocalTxnScope, txnScope) +} + +// GetVarValue returns the value of @@txn_scope which can only be `global` or `local` +func (t TxnScopeVar) GetVarValue() string { + return t.varValue +} + +// GetTxnScope returns the value of the tidb-server holds to request tso to pd. +func (t TxnScopeVar) GetTxnScope() string { + return t.txnScope +} + +func newTxnScopeVar(varValue string, txnScope string) TxnScopeVar { + return TxnScopeVar{ + varValue: varValue, + txnScope: txnScope, + } +} + +// Transaction scopes constants. +const ( + // GlobalTxnScope is synced with PD's define of global scope. + // If we want to remove the dependency on store/tikv here, we need to map + // the two GlobalTxnScopes in the driver layer. + GlobalTxnScope = oracle.GlobalTxnScope + // LocalTxnScope indicates the transaction should use local ts. + LocalTxnScope = "local" +) diff --git a/meta/meta_test.go b/meta/meta_test.go index 590e85fc2a21e..4ba54f1935a3a 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/testleak" . "github.com/pingcap/tidb/util/testutil" ) @@ -291,7 +290,7 @@ func (s *testSuite) TestSnapshot(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - ver1, _ := store.CurrentVersion(oracle.GlobalTxnScope) + ver1, _ := store.CurrentVersion(kv.GlobalTxnScope) time.Sleep(time.Millisecond) txn, _ = store.Begin() m = meta.NewMeta(txn) diff --git a/server/server.go b/server/server.go index 29f5307895cc2..935abbb7bc693 100644 --- a/server/server.go +++ b/server/server.go @@ -54,11 +54,11 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/fastrand" @@ -311,9 +311,9 @@ func setSSLVariable(ca, key, cert string) { func setTxnScope() { variable.SetSysVar("txn_scope", func() string { if isGlobal, _ := config.GetTxnScopeFromConfig(); isGlobal { - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } - return oracle.LocalTxnScope + return kv.LocalTxnScope }()) } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 72853d86208a9..2e8c01c75577b 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2029,14 +2029,14 @@ func (s *testPessimisticSuite) TestSelectForUpdateConflictRetry(c *C) { tsCh := make(chan uint64) go func() { tk3.MustExec("update tk set c2 = c2 + 1 where c1 = 1") - lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope}) c.Assert(err, IsNil) tsCh <- lastTS tk3.MustExec("commit") tsCh <- lastTS }() // tk2LastTS should be its forUpdateTS - tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope}) c.Assert(err, IsNil) tk2.MustExec("commit") diff --git a/session/schema_amender_test.go b/session/schema_amender_test.go index ca05f4a74dbff..eda0e5e387e05 100644 --- a/session/schema_amender_test.go +++ b/session/schema_amender_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -426,7 +425,7 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) { } c.Assert(err, IsNil) } - curVer, err := se.store.CurrentVersion(oracle.GlobalTxnScope) + curVer, err := se.store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) se.sessionVars.TxnCtx.SetForUpdateTS(curVer.Ver + 1) mutationVals, err := txn.BatchGet(ctx, checkKeys) diff --git a/session/session.go b/session/session.go index 78a60a6ebaecf..84dae9e13e340 100644 --- a/session/session.go +++ b/session/session.go @@ -69,7 +69,6 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/oracle" tikvutil "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" @@ -2928,9 +2927,9 @@ func (s *session) checkPlacementPolicyBeforeCommit() error { // Get the txnScope of the transaction we're going to commit. txnScope := s.GetSessionVars().TxnCtx.TxnScope if txnScope == "" { - txnScope = oracle.GlobalTxnScope + txnScope = kv.GlobalTxnScope } - if txnScope != oracle.GlobalTxnScope { + if txnScope != kv.GlobalTxnScope { is := s.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) deltaMap := s.GetSessionVars().TxnCtx.TableDeltaMap for physicalTableID := range deltaMap { diff --git a/session/session_test.go b/session/session_test.go index f7267e3a13259..2d3ca0309c18d 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -52,7 +52,6 @@ import ( "github.com/pingcap/tidb/store/mockstore/mockcopr" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" - "github.com/pingcap/tidb/store/tikv/oracle" tikvutil "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -2075,7 +2074,7 @@ func (s *testSchemaSerialSuite) TestLoadSchemaFailed(c *C) { _, err = tk1.Exec("commit") c.Check(err, NotNil) - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(ver, NotNil) @@ -3338,26 +3337,26 @@ func (s *testSessionSerialSuite) TestSetTxnScope(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) // assert default value result := tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) - c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, oracle.GlobalTxnScope) + result.Check(testkit.Rows(kv.GlobalTxnScope)) + c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, kv.GlobalTxnScope) // assert set sys variable tk.MustExec("set @@session.txn_scope = 'local';") result = tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) - c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, oracle.GlobalTxnScope) + result.Check(testkit.Rows(kv.GlobalTxnScope)) + c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, kv.GlobalTxnScope) failpoint.Disable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope") failpoint.Enable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope", `return("bj")`) defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope") tk = testkit.NewTestKitWithInit(c, s.store) // assert default value result = tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.LocalTxnScope)) + result.Check(testkit.Rows(kv.LocalTxnScope)) c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, "bj") // assert set sys variable tk.MustExec("set @@session.txn_scope = 'global';") result = tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) - c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, oracle.GlobalTxnScope) + result.Check(testkit.Rows(kv.GlobalTxnScope)) + c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, kv.GlobalTxnScope) // assert set invalid txn_scope err := tk.ExecToErr("set @@txn_scope='foo'") @@ -3414,9 +3413,9 @@ PARTITION BY RANGE (c) ( setBundle("p1", "dc-2") // set txn_scope to global - tk.MustExec(fmt.Sprintf("set @@session.txn_scope = '%s';", oracle.GlobalTxnScope)) + tk.MustExec(fmt.Sprintf("set @@session.txn_scope = '%s';", kv.GlobalTxnScope)) result := tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) + result.Check(testkit.Rows(kv.GlobalTxnScope)) // test global txn auto commit tk.MustExec("insert into t1 (c) values (1)") // write dc-1 with global scope @@ -3427,7 +3426,7 @@ PARTITION BY RANGE (c) ( tk.MustExec("begin") txn, err := tk.Se.Txn(true) c.Assert(err, IsNil) - c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, oracle.GlobalTxnScope) + c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, kv.GlobalTxnScope) c.Assert(txn.Valid(), IsTrue) tk.MustExec("insert into t1 (c) values (1)") // write dc-1 with global scope result = tk.MustQuery("select * from t1") // read dc-1 and dc-2 with global scope @@ -3441,7 +3440,7 @@ PARTITION BY RANGE (c) ( tk.MustExec("begin") txn, err = tk.Se.Txn(true) c.Assert(err, IsNil) - c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, oracle.GlobalTxnScope) + c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, kv.GlobalTxnScope) c.Assert(txn.Valid(), IsTrue) tk.MustExec("insert into t1 (c) values (101)") // write dc-2 with global scope result = tk.MustQuery("select * from t1") // read dc-1 and dc-2 with global scope diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c474e7905fa7b..d38881a8ddcfa 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -800,7 +800,7 @@ type SessionVars struct { PartitionPruneMode atomic2.String // TxnScope indicates the scope of the transactions. It should be `global` or equal to `dc-location` in configuration. - TxnScope oracle.TxnScope + TxnScope kv.TxnScopeVar // EnabledRateLimitAction indicates whether enabled ratelimit action during coprocessor EnabledRateLimitAction bool @@ -863,12 +863,12 @@ func (s *SessionVars) IsMPPEnforced() bool { // CheckAndGetTxnScope will return the transaction scope we should use in the current session. func (s *SessionVars) CheckAndGetTxnScope() string { if s.InRestrictedSQL { - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } - if s.TxnScope.GetVarValue() == oracle.LocalTxnScope { + if s.TxnScope.GetVarValue() == kv.LocalTxnScope { return s.TxnScope.GetTxnScope() } - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } // UseDynamicPartitionPrune indicates whether use new dynamic partition prune. @@ -1055,7 +1055,7 @@ func NewSessionVars() *SessionVars { EnableAlterPlacement: DefTiDBEnableAlterPlacement, EnableAmendPessimisticTxn: DefTiDBEnableAmendPessimisticTxn, PartitionPruneMode: *atomic2.NewString(DefTiDBPartitionPruneMode), - TxnScope: oracle.GetTxnScope(), + TxnScope: kv.GetTxnScopeVar(), EnabledRateLimitAction: DefTiDBEnableRateLimitAction, EnableAsyncCommit: DefTiDBEnableAsyncCommit, Enable1PC: DefTiDBEnable1PC, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 99c3da8233d68..f239c00ef94b0 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" @@ -734,15 +733,15 @@ var defaultSysVars = []*SysVar{ /* TiDB specific variables */ {Scope: ScopeSession, Name: TiDBTxnScope, Value: func() string { if isGlobal, _ := config.GetTxnScopeFromConfig(); isGlobal { - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } - return oracle.LocalTxnScope + return kv.LocalTxnScope }(), SetSession: func(s *SessionVars, val string) error { switch val { - case oracle.GlobalTxnScope: - s.TxnScope = oracle.NewGlobalTxnScope() - case oracle.LocalTxnScope: - s.TxnScope = oracle.GetTxnScope() + case kv.GlobalTxnScope: + s.TxnScope = kv.NewGlobalTxnScopeVar() + case kv.LocalTxnScope: + s.TxnScope = kv.GetTxnScopeVar() default: return ErrWrongValueForVar.GenWithStack("@@txn_scope value should be global or local") } diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index b408f279be98a..b5b42df1838b9 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -74,7 +74,7 @@ type GCWorker struct { // NewGCWorker creates a GCWorker instance. func NewGCWorker(store kv.Storage, pdClient pd.Client) (*GCWorker, error) { - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return nil, errors.Trace(err) } @@ -429,7 +429,7 @@ func (w *GCWorker) calcSafePointByMinStartTS(ctx context.Context, safePoint uint } func (w *GCWorker) getOracleTime() (time.Time, error) { - currentVer, err := w.store.CurrentVersion(oracle.GlobalTxnScope) + currentVer, err := w.store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return time.Time{}, errors.Trace(err) } @@ -1984,7 +1984,7 @@ type MockGCWorker struct { // NewMockGCWorker creates a MockGCWorker instance ONLY for test. func NewMockGCWorker(store kv.Storage) (*MockGCWorker, error) { - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return nil, errors.Trace(err) } diff --git a/store/mockstore/mockcopr/executor_test.go b/store/mockstore/mockcopr/executor_test.go index af9ac45beae96..7437b8d995997 100644 --- a/store/mockstore/mockcopr/executor_test.go +++ b/store/mockstore/mockcopr/executor_test.go @@ -83,7 +83,7 @@ func (s *testExecutorSuite) TestResolvedLargeTxnLocks(c *C) { tk.MustExec("insert into t values (1, 1)") o := s.store.GetOracle() - tso, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + tso, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope}) c.Assert(err, IsNil) key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) diff --git a/store/store_test.go b/store/store_test.go index 627a214badee7..3f4a44cecc189 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -26,7 +26,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/testleak" ) @@ -543,7 +542,7 @@ func (s *testKVSuite) TestDBClose(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(kv.MaxVersion.Cmp(ver), Equals, 1) diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/extract_start_ts_test.go index b392ca365cde8..8b3efcccd9553 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/extract_start_ts_test.go @@ -42,7 +42,7 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { labels: []*metapb.StoreLabel{ { Key: DCLabelKey, - Value: oracle.LocalTxnScope, + Value: "local1", }, }, } @@ -78,7 +78,7 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { // MinStartTS setted, global {101, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MinStartTS setted, local - {102, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {102, kv.TransactionOption{TxnScope: "local1", StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MaxPrevSec setted // however we need to add more cases to check the behavior when it fall backs to MinStartTS setted // see `TestMaxPrevSecFallback` @@ -105,7 +105,7 @@ func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { option kv.TransactionOption }{ {0x8000000000000001, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, - {0x8000000000000002, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000002, kv.TransactionOption{TxnScope: "local1", StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, } for _, cs := range cases { result, _ := extractStartTs(s.store, cs.option) diff --git a/store/tikv/oracle/oracle.go b/store/tikv/oracle/oracle.go index daf00c66814ca..1b08129d412aa 100644 --- a/store/tikv/oracle/oracle.go +++ b/store/tikv/oracle/oracle.go @@ -18,7 +18,6 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/logutil" "go.uber.org/zap" ) @@ -45,57 +44,11 @@ type Future interface { Wait() (uint64, error) } -// TxnScope indicates the used txnScope for oracle -type TxnScope struct { - // varValue indicates the value of @@txn_scope, which can only be `global` or `local` - varValue string - // txnScope indicates the value which the tidb-server holds to request tso to pd - txnScope string -} - -// GetTxnScope gets oracle.TxnScope from config -func GetTxnScope() TxnScope { - isGlobal, location := config.GetTxnScopeFromConfig() - if isGlobal { - return NewGlobalTxnScope() - } - return NewLocalTxnScope(location) -} - -// NewGlobalTxnScope creates a Global TxnScope -func NewGlobalTxnScope() TxnScope { - return newTxnScope(GlobalTxnScope, GlobalTxnScope) -} - -// NewLocalTxnScope creates a Local TxnScope with given real txnScope value. -func NewLocalTxnScope(txnScope string) TxnScope { - return newTxnScope(LocalTxnScope, txnScope) -} - -// GetVarValue returns the value of @@txn_scope which can only be `global` or `local` -func (t TxnScope) GetVarValue() string { - return t.varValue -} - -// GetTxnScope returns the value of the tidb-server holds to request tso to pd. -func (t TxnScope) GetTxnScope() string { - return t.txnScope -} - -func newTxnScope(varValue string, txnScope string) TxnScope { - return TxnScope{ - varValue: varValue, - txnScope: txnScope, - } -} - const ( physicalShiftBits = 18 logicalBits = (1 << physicalShiftBits) - 1 // GlobalTxnScope is the default transaction scope for a Oracle service. GlobalTxnScope = "global" - // LocalTxnScope indicates the local txn scope for a Oracle service. - LocalTxnScope = "local" ) // ComposeTS creates a ts from physical and logical parts. diff --git a/util/mock/context.go b/util/mock/context.go index d6a5f1d913902..f234359809230 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/kvcache" @@ -204,7 +203,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) + txn, err := c.Store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(kv.GlobalTxnScope).SetStartTs(startTS)) if err != nil { return errors.Trace(err) }