From 707cb0a94273cd99df04fe1b2467618c49a6f20d Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 5 Mar 2021 17:56:56 +0800 Subject: [PATCH 1/8] kv: unify the BeginWith... function into one (#23130) --- kv/fault_injection.go | 6 +++--- kv/fault_injection_test.go | 2 +- kv/interface_mock_test.go | 12 +----------- kv/kv.go | 33 +++++++++++++++++++++++++++------ session/session.go | 8 ++++---- session/txn.go | 4 ++-- store/driver/tikv_driver.go | 22 ++++------------------ store/helper/helper.go | 4 +--- store/mockstore/unistore.go | 17 +++-------------- store/tikv/2pc_test.go | 4 ++-- store/tikv/kv.go | 29 +++++++++++++++++++++-------- util/mock/context.go | 2 +- util/mock/store.go | 12 ++---------- 13 files changed, 72 insertions(+), 83 deletions(-) diff --git a/kv/fault_injection.go b/kv/fault_injection.go index 95a4d3dfb781c..218ca9cbd6966 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -63,9 +63,9 @@ func (s *InjectedStore) Begin() (Transaction, error) { }, err } -// BeginWithStartTS creates an injected Transaction with startTS. -func (s *InjectedStore) BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) { - txn, err := s.Storage.BeginWithStartTS(txnScope, startTS) +// BeginWithOption creates an injected Transaction with given option. +func (s *InjectedStore) BeginWithOption(option TransactionOption) (Transaction, error) { + txn, err := s.Storage.BeginWithOption(option) return &InjectedTransaction{ Transaction: txn, cfg: s.cfg, diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index b137481c05f1a..33b6535214b2c 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -35,7 +35,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithStartTS(oracle.GlobalTxnScope, 0) + _, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 461250901233a..9ca599e262727 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -158,7 +158,7 @@ func (s *mockStorage) Begin() (Transaction, error) { return newMockTxn(), nil } -func (s *mockStorage) BeginWithTxnScope(txnScope string) (Transaction, error) { +func (s *mockStorage) BeginWithOption(option TransactionOption) (Transaction, error) { return newMockTxn(), nil } @@ -166,16 +166,6 @@ func (*mockTxn) IsPessimistic() bool { return false } -// BeginWithStartTS begins transaction with given txnScope and startTS. -func (s *mockStorage) BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) { - return s.Begin() -} - -// BeginWithExactStaleness begins transaction with given exact staleness -func (s *mockStorage) BeginWithExactStaleness(txnScope string, prevSec uint64) (Transaction, error) { - return s.Begin() -} - func (s *mockStorage) GetSnapshot(ver Version) Snapshot { return &mockSnapshot{ store: newMemDB(), diff --git a/kv/kv.go b/kv/kv.go index de2aad488cfb0..37f5f83951cdc 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -467,17 +467,38 @@ type Driver interface { Open(path string) (Storage, error) } +// TransactionOption indicates the option when beginning a transaction +type TransactionOption struct { + TxnScope string + StartTS *uint64 + PrevSec *uint64 +} + +// SetStartTs set startTS +func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { + to.StartTS = &startTS + return to +} + +// SetPrevSec set prevSec +func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { + to.PrevSec = &prevSec + return to +} + +// SetTxnScope set txnScope +func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { + to.TxnScope = txnScope + return to +} + // Storage defines the interface for storage. // Isolation should be at least SI(SNAPSHOT ISOLATION) type Storage interface { // Begin a global transaction Begin() (Transaction, error) - // Begin a transaction with the given txnScope (local or global) - BeginWithTxnScope(txnScope string) (Transaction, error) - // BeginWithStartTS begins transaction with given txnScope and startTS. - BeginWithStartTS(txnScope string, startTS uint64) (Transaction, error) - // BeginWithStalenessTS begins transaction with given staleness - BeginWithExactStaleness(txnScope string, prevSec uint64) (Transaction, error) + // Begin a transaction with given option + BeginWithOption(option TransactionOption) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/session/session.go b/session/session.go index 0edc6ec4dc9c9..47ea068659a6d 100644 --- a/session/session.go +++ b/session/session.go @@ -1943,7 +1943,7 @@ func (s *session) NewTxn(ctx context.Context) error { zap.String("txnScope", txnScope)) } - txn, err := s.store.BeginWithTxnScope(s.sessionVars.CheckAndGetTxnScope()) + txn, err := s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err } @@ -2736,7 +2736,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } // no need to get txn from txnFutureCh since txn should init with startTs - txn, err := s.store.BeginWithStartTS(s.GetSessionVars().CheckAndGetTxnScope(), startTS) + txn, err := s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) if err != nil { return err } @@ -2769,12 +2769,12 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txnScope := s.GetSessionVars().CheckAndGetTxnScope() switch option.Mode { case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithStartTS(txnScope, option.StartTS) + txn, err = s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithExactStaleness(txnScope, option.PrevSec) + txn, err = s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) if err != nil { return err } diff --git a/session/txn.go b/session/txn.go index 4f7175c789477..bccf130fb988a 100644 --- a/session/txn.go +++ b/session/txn.go @@ -350,14 +350,14 @@ type txnFuture struct { func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() if err == nil { - return tf.store.BeginWithStartTS(tf.txnScope, startTS) + return tf.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(tf.txnScope).SetStartTs(startTS)) } else if config.GetGlobalConfig().Store == "unistore" { return nil, err } logutil.BgLogger().Warn("wait tso failed", zap.Error(err)) // It would retry get timestamp. - return tf.store.BeginWithTxnScope(tf.txnScope) + return tf.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(tf.txnScope)) } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 128de806261b3..d7fe204f7afe1 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/store/gcworker" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" pd "github.com/tikv/pd/client" @@ -293,29 +292,16 @@ func (s *tikvStore) GetMemCache() kv.MemManager { // Begin a global transaction. func (s *tikvStore) Begin() (kv.Transaction, error) { - return s.BeginWithTxnScope(oracle.GlobalTxnScope) -} - -func (s *tikvStore) BeginWithTxnScope(txnScope string) (kv.Transaction, error) { - txn, err := s.KVStore.BeginWithTxnScope(txnScope) - if err != nil { - return nil, errors.Trace(err) - } - return txn_driver.NewTiKVTxn(txn), err -} - -// BeginWithStartTS begins a transaction with startTS. -func (s *tikvStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) { - txn, err := s.KVStore.BeginWithStartTS(txnScope, startTS) + txn, err := s.KVStore.Begin() if err != nil { return nil, errors.Trace(err) } return txn_driver.NewTiKVTxn(txn), err } -// BeginWithExactStaleness begins transaction with given staleness -func (s *tikvStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) { - txn, err := s.KVStore.BeginWithExactStaleness(txnScope, prevSec) +// BeginWithOption begins a transaction with given option +func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { + txn, err := s.KVStore.BeginWithOption(option) if err != nil { return nil, errors.Trace(err) } diff --git a/store/helper/helper.go b/store/helper/helper.go index 02248061947cd..3646d92cafe62 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -48,9 +48,7 @@ import ( // Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13. type Storage interface { Begin() (kv.Transaction, error) - BeginWithTxnScope(txnScope string) (kv.Transaction, error) - BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) - BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) + BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) GetSnapshot(ver kv.Version) kv.Snapshot GetClient() kv.Client GetMPPClient() kv.MPPClient diff --git a/store/mockstore/unistore.go b/store/mockstore/unistore.go index df4a2f1856d08..28a913f5ddbbf 100644 --- a/store/mockstore/unistore.go +++ b/store/mockstore/unistore.go @@ -95,20 +95,9 @@ func (s *mockStorage) Begin() (kv.Transaction, error) { return newTiKVTxn(txn, err) } -func (s *mockStorage) BeginWithTxnScope(txnScope string) (kv.Transaction, error) { - txn, err := s.KVStore.BeginWithTxnScope(txnScope) - return newTiKVTxn(txn, err) -} - -// BeginWithStartTS begins a transaction with startTS. -func (s *mockStorage) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) { - txn, err := s.KVStore.BeginWithStartTS(txnScope, startTS) - return newTiKVTxn(txn, err) -} - -// BeginWithExactStaleness begins transaction with given staleness -func (s *mockStorage) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) { - txn, err := s.KVStore.BeginWithExactStaleness(txnScope, prevSec) +// BeginWithOption begins a transaction with given option +func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { + txn, err := s.KVStore.BeginWithOption(option) return newTiKVTxn(txn, err) } diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index db1c323a755e4..a31d88355a3d8 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -610,12 +610,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { // Use max.Uint64 to read the data and success. // That means the final commitTS > startTS+2, it's not the one we provide. // So we cover the rety commitTS logic. - txn1, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, committer.startTS+2) + txn1, err := s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(committer.startTS + 2)) c.Assert(err, IsNil) _, err = txn1.Get(bo.ctx, []byte("x")) c.Assert(kv.IsErrNotFound(err), IsTrue) - txn2, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, math.MaxUint64) + txn2, err := s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(math.MaxUint64)) c.Assert(err, IsNil) val, err := txn2.Get(bo.ctx, []byte("x")) c.Assert(err, IsNil) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index addd7dacc74b7..a2b33e7c41b70 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -167,12 +167,25 @@ func (s *KVStore) runSafePointChecker() { // Begin a global transaction. func (s *KVStore) Begin() (*KVTxn, error) { - return s.BeginWithTxnScope(oracle.GlobalTxnScope) + return s.beginWithTxnScope(oracle.GlobalTxnScope) } -// BeginWithTxnScope begins a transaction with the given txnScope (local or -// global) -func (s *KVStore) BeginWithTxnScope(txnScope string) (*KVTxn, error) { +// BeginWithOption begins a transaction with given option +func (s *KVStore) BeginWithOption(option kv.TransactionOption) (*KVTxn, error) { + txnScope := option.TxnScope + if txnScope == "" { + txnScope = oracle.GlobalTxnScope + } + if option.StartTS != nil { + return s.beginWithStartTS(txnScope, *option.StartTS) + } else if option.PrevSec != nil { + return s.beginWithExactStaleness(txnScope, *option.PrevSec) + } + return s.beginWithTxnScope(txnScope) +} + +// beginWithTxnScope begins a transaction with the given txnScope (local or global) +func (s *KVStore) beginWithTxnScope(txnScope string) (*KVTxn, error) { txn, err := newTiKVTxn(s, txnScope) if err != nil { return nil, errors.Trace(err) @@ -180,8 +193,8 @@ func (s *KVStore) BeginWithTxnScope(txnScope string) (*KVTxn, error) { return txn, nil } -// BeginWithStartTS begins a transaction with startTS. -func (s *KVStore) BeginWithStartTS(txnScope string, startTS uint64) (*KVTxn, error) { +// beginWithStartTS begins a transaction with startTS. +func (s *KVStore) beginWithStartTS(txnScope string, startTS uint64) (*KVTxn, error) { txn, err := newTiKVTxnWithStartTS(s, txnScope, startTS, s.nextReplicaReadSeed()) if err != nil { return nil, errors.Trace(err) @@ -189,8 +202,8 @@ func (s *KVStore) BeginWithStartTS(txnScope string, startTS uint64) (*KVTxn, err return txn, nil } -// BeginWithExactStaleness begins transaction with given staleness -func (s *KVStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (*KVTxn, error) { +// beginWithExactStaleness begins transaction with given staleness +func (s *KVStore) beginWithExactStaleness(txnScope string, prevSec uint64) (*KVTxn, error) { txn, err := newTiKVTxnWithExactStaleness(s, txnScope, prevSec) if err != nil { return nil, errors.Trace(err) diff --git a/util/mock/context.go b/util/mock/context.go index 4350de1d81529..046998c8c3907 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -211,7 +211,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithStartTS(oracle.GlobalTxnScope, startTS) + txn, err := c.Store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) if err != nil { return errors.Trace(err) } diff --git a/util/mock/store.go b/util/mock/store.go index 6139fcb9ea1e1..804f3d6a3f2d3 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -37,16 +37,8 @@ func (s *Store) GetOracle() oracle.Oracle { return nil } // Begin implements kv.Storage interface. func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } -// BeginWithTxnScope implements kv.Storage interface. -func (s *Store) BeginWithTxnScope(txnScope string) (kv.Transaction, error) { return nil, nil } - -// BeginWithStartTS implements kv.Storage interface. -func (s *Store) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) { - return s.Begin() -} - -// BeginWithExactStaleness implements kv.Storage interface -func (s *Store) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) { +// BeginWithOption implements kv.Storage interface. +func (s *Store) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { return s.Begin() } From 29908c67e793205cec045f855048ad2eab048690 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 5 Mar 2021 18:12:55 +0800 Subject: [PATCH 2/8] store, executor: support stale read for tikv RPCContext (#22176) --- distsql/request_builder.go | 10 ++++++++ executor/batch_point_get.go | 13 ++++++++++ executor/executor_test.go | 48 +++++++++++++++++++++++++++++++++++ executor/point_get.go | 12 +++++++++ kv/kv.go | 7 +++++ store/copr/coprocessor.go | 9 ++++++- store/tikv/client_helper.go | 12 +++++++-- store/tikv/region_cache.go | 16 ++++++++++++ store/tikv/region_request.go | 6 +++-- store/tikv/snapshot.go | 32 +++++++++++++++++++---- store/tikv/tikvrpc/tikvrpc.go | 7 +++++ 11 files changed, 162 insertions(+), 10 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 869f255e55705..310de50149eeb 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -19,6 +19,7 @@ import ( "sort" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/infoschema" @@ -235,6 +236,15 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion } builder.txnScope = sv.TxnCtx.TxnScope + builder.IsStaleness = sv.TxnCtx.IsStaleness + if builder.IsStaleness && builder.txnScope != oracle.GlobalTxnScope { + builder.MatchStoreLabels = []*metapb.StoreLabel{ + { + Key: placement.DCLabelKey, + Value: builder.txnScope, + }, + } + } return builder } diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 5ea0c2712ec8e..ca4416658306b 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -20,12 +20,15 @@ import ( "sync/atomic" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -113,6 +116,16 @@ func (e *BatchPointGetExec) Open(context.Context) error { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness + snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ + { + Key: placement.DCLabelKey, + Value: e.ctx.GetSessionVars().TxnCtx.TxnScope, + }, + }) + } var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/executor_test.go b/executor/executor_test.go index 43d96b1a06730..b3c917323e5d8 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7634,6 +7634,54 @@ func (s *testSerialSuite) TestStalenessTransaction(c *C) { } } +func (s *testSerialSuite) TestStaleReadKVRequest(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key);") + defer tk.MustExec(`drop table if exists t`) + testcases := []struct { + name string + sql string + txnScope string + zone string + }{ + { + name: "coprocessor read", + sql: "select * from t", + txnScope: "local", + zone: "sh", + }, + { + name: "point get read", + sql: "select * from t where id = 1", + txnScope: "local", + zone: "bj", + }, + { + name: "batch point get read", + sql: "select * from t where id in (1,2,3)", + txnScope: "local", + zone: "hz", + }, + } + for _, testcase := range testcases { + c.Log(testcase.name) + tk.MustExec(fmt.Sprintf("set @@txn_scope=%v", testcase.txnScope)) + failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone)) + failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v")`, testcase.txnScope)) + failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`) + tk.MustQuery(testcase.sql) + tk.MustExec(`commit`) + failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag") + } +} + func (s *testSuite) TestStalenessAndHistoryRead(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) defer func() { diff --git a/executor/point_get.go b/executor/point_get.go index 91369979ed32f..0ca261b439efe 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -19,9 +19,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -148,6 +150,16 @@ func (e *PointGetExecutor) Open(context.Context) error { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness + e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ + { + Key: placement.DCLabelKey, + Value: e.ctx.GetSessionVars().TxnCtx.TxnScope, + }, + }) + } return nil } diff --git a/kv/kv.go b/kv/kv.go index 37f5f83951cdc..bcebff808ca1f 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" @@ -70,6 +71,8 @@ const ( TxnScope // StalenessReadOnly indicates whether the transaction is staleness read only transaction IsStalenessReadOnly + // MatchStoreLabels indicates the labels the store should be matched + MatchStoreLabels ) // Priority value for transaction priority. @@ -418,6 +421,10 @@ type Request struct { TaskID uint64 // TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances. TiDBServerID uint64 + // IsStaleness indicates whether the request read staleness data + IsStaleness bool + // MatchStoreLabels indicates the labels the store should be matched + MatchStoreLabels []*metapb.StoreLabel } // ResultSubset represents a result subset from a single storage unit. diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 4b891da6f8433..90427eed8d217 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -703,7 +703,14 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas if worker.Stats == nil { worker.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats) } - resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, task.storeType, task.storeAddr) + if worker.req.IsStaleness { + req.EnableStaleRead() + } + var ops []tikv.StoreSelectorOption + if len(worker.req.MatchStoreLabels) > 0 { + ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels)) + } + resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, task.storeType, task.storeAddr, ops...) if err != nil { if task.storeType == kv.TiDB { err = worker.handleTiDBSendReqErr(err, task, ch) diff --git a/store/tikv/client_helper.go b/store/tikv/client_helper.go index 8f1ee3cb1b331..aea3e7137e878 100644 --- a/store/tikv/client_helper.go +++ b/store/tikv/client_helper.go @@ -16,6 +16,7 @@ package tikv import ( "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" @@ -73,13 +74,20 @@ func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks } // SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context. -func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string) (*tikvrpc.Response, *RPCContext, string, error) { +func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string, opts ...StoreSelectorOption) (*tikvrpc.Response, *RPCContext, string, error) { sender := NewRegionRequestSender(ch.regionCache, ch.client) if len(directStoreAddr) > 0 { sender.SetStoreAddr(directStoreAddr) } sender.Stats = ch.Stats req.Context.ResolvedLocks = ch.resolvedLocks.GetAll() - resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) + failpoint.Inject("assertStaleReadFlag", func(val failpoint.Value) { + if val.(bool) { + if len(opts) > 0 && !req.StaleRead { + panic("req.StaleRead shouldn't be false when opts is not empty") + } + } + }) + resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType, opts...) return resp, ctx, sender.GetStoreAddr(), err } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 8ac86a9b0af2d..ba99e18d3d882 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/logutil" @@ -380,6 +381,21 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe for _, op := range opts { op(options) } + failpoint.Inject("assertStoreLabels", func(val failpoint.Value) { + if len(opts) > 0 { + value := val.(string) + v := "" + for _, label := range options.labels { + if label.Key == placement.DCLabelKey { + v = label.Value + break + } + } + if v != value { + panic(fmt.Sprintf("StoreSelectorOption's label %v is not %v", v, value)) + } + } + }) switch replicaRead { case kv.ReplicaReadFollower: store, peer, accessIdx, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed, options) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 74290ad3efbc9..a3ae52c38b82c 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -193,6 +193,7 @@ func (s *RegionRequestSender) getRPCContext( req *tikvrpc.Request, regionID RegionVerID, sType kv.StoreType, + opts ...StoreSelectorOption, ) (*RPCContext, error) { switch sType { case kv.TiKV: @@ -200,7 +201,7 @@ func (s *RegionRequestSender) getRPCContext( if req.ReplicaReadSeed != nil { seed = *req.ReplicaReadSeed } - return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed) + return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...) case kv.TiFlash: return s.regionCache.GetTiFlashRPCContext(bo, regionID) case kv.TiDB: @@ -217,6 +218,7 @@ func (s *RegionRequestSender) SendReqCtx( regionID RegionVerID, timeout time.Duration, sType kv.StoreType, + opts ...StoreSelectorOption, ) ( resp *tikvrpc.Response, rpcCtx *RPCContext, @@ -262,7 +264,7 @@ func (s *RegionRequestSender) SendReqCtx( logutil.Logger(bo.ctx).Warn("retry get ", zap.Uint64("region = ", regionID.GetID()), zap.Int("times = ", tryTimes)) } - rpcCtx, err = s.getRPCContext(bo, req, regionID, sType) + rpcCtx, err = s.getRPCContext(bo, req, regionID, sType, opts...) if err != nil { return nil, nil, err } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 21c3000a459b9..2585835d8d88c 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" @@ -60,7 +61,10 @@ type tikvSnapshot struct { keyOnly bool vars *kv.Variables replicaReadSeed uint32 - resolvedLocks *util.TSSet + isStaleness bool + // MatchStoreLabels indicates the labels the store should be matched + matchStoreLabels []*metapb.StoreLabel + resolvedLocks *util.TSSet // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -282,8 +286,14 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll TaskId: s.mu.taskID, }) s.mu.RUnlock() - - resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "") + var ops []StoreSelectorOption + if s.isStaleness { + req.EnableStaleRead() + } + if len(s.matchStoreLabels) > 0 { + ops = append(ops, WithMatchLabels(s.matchStoreLabels)) + } + resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "", ops...) if err != nil { return errors.Trace(err) @@ -430,13 +440,19 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte TaskId: s.mu.taskID, }) s.mu.RUnlock() - + var ops []StoreSelectorOption + if s.isStaleness { + req.EnableStaleRead() + } + if len(s.matchStoreLabels) > 0 { + ops = append(ops, WithMatchLabels(s.matchStoreLabels)) + } for { loc, err := s.store.regionCache.LocateKey(bo, k) if err != nil { return nil, errors.Trace(err) } - resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV, "") + resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV, "", ops...) if err != nil { return nil, errors.Trace(err) } @@ -552,6 +568,12 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { s.mu.Unlock() case kv.SampleStep: s.sampleStep = val.(uint32) + case kv.IsStalenessReadOnly: + s.mu.Lock() + s.isStaleness = val.(bool) + s.mu.Unlock() + case kv.MatchStoreLabels: + s.matchStoreLabels = val.([]*metapb.StoreLabel) case kv.TxnScope: s.txnScope = val.(string) } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 3d19d7d64bb44..c695f8ecb3827 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -389,6 +389,13 @@ func (req *Request) TxnHeartBeat() *kvrpcpb.TxnHeartBeatRequest { return req.Req.(*kvrpcpb.TxnHeartBeatRequest) } +// EnableStaleRead enables stale read +func (req *Request) EnableStaleRead() { + req.StaleRead = true + req.ReplicaReadType = kv.ReplicaReadMixed + req.ReplicaRead = false +} + // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request { switch req.Type { From 280d07c4a0bab259ed21466f1262e2409805daa5 Mon Sep 17 00:00:00 2001 From: Zijie Lu Date: Fri, 5 Mar 2021 18:58:55 +0800 Subject: [PATCH 3/8] sessionctx, executor: make the upper limit of tidb_ddl_reorg_worker_cnt work (#23064) --- executor/ddl_test.go | 3 +++ sessionctx/variable/sysvar.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/executor/ddl_test.go b/executor/ddl_test.go index f1978e5a33196..2a9f459a89d84 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -1167,6 +1167,9 @@ func (s *testSuite6) TestSetDDLReorgWorkerCnt(c *C) { tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100") res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") res.Check(testkit.Rows("100")) + + _, err = tk.Exec("set @@global.tidb_ddl_reorg_worker_cnt = 129") + c.Assert(terror.ErrorEqual(err, variable.ErrWrongValueForVar), IsTrue, Commentf("err %v", err)) } func (s *testSuite6) TestSetDDLReorgBatchSize(c *C) { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 1121ab9ab3833..efced6ec1bd62 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -676,7 +676,7 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, MinValue: 0, MaxValue: 1}, {Scope: ScopeSession, Name: TiDBDDLSlowOprThreshold, Value: strconv.Itoa(DefTiDBDDLSlowOprThreshold)}, {Scope: ScopeSession, Name: TiDBConfig, Value: "", ReadOnly: true}, - {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64}, + {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: uint64(maxDDLReorgWorkerCount)}, {Scope: ScopeGlobal, Name: TiDBDDLReorgBatchSize, Value: strconv.Itoa(DefTiDBDDLReorgBatchSize), Type: TypeUnsigned, MinValue: int64(MinDDLReorgBatchSize), MaxValue: uint64(MaxDDLReorgBatchSize), AutoConvertOutOfRange: true}, {Scope: ScopeGlobal, Name: TiDBDDLErrorCountLimit, Value: strconv.Itoa(DefTiDBDDLErrorCountLimit), Type: TypeUnsigned, MinValue: 0, MaxValue: uint64(math.MaxInt64), AutoConvertOutOfRange: true}, {Scope: ScopeSession, Name: TiDBDDLReorgPriority, Value: "PRIORITY_LOW"}, From 4184f279b32b3860631f0d8a7aac3e4294e4c433 Mon Sep 17 00:00:00 2001 From: crazycs Date: Sun, 7 Mar 2021 15:30:54 +0800 Subject: [PATCH 4/8] test: make test TestUpdateWithTableReadLockWillFail stable (#23122) --- planner/core/point_get_plan_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/planner/core/point_get_plan_test.go b/planner/core/point_get_plan_test.go index 2478b2f54b4e3..ed079646f5d8e 100644 --- a/planner/core/point_get_plan_test.go +++ b/planner/core/point_get_plan_test.go @@ -478,12 +478,10 @@ func (s *testPointGetSuite) TestSelectInMultiColumns(c *C) { } func (s *testPointGetSuite) TestUpdateWithTableReadLockWillFail(c *C) { - gcfg := config.GetGlobalConfig() - etl := gcfg.EnableTableLock - gcfg.EnableTableLock = true - defer func() { - gcfg.EnableTableLock = etl - }() + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.EnableTableLock = true + }) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("create table tbllock(id int, c int);") From 038264bbec4dc80318509605986cd1be7d73d1eb Mon Sep 17 00:00:00 2001 From: Nayuta Yanagisawa Date: Sun, 7 Mar 2021 17:33:46 +0900 Subject: [PATCH 5/8] docs/design: fix broken link in 2018-10-08-online-DDL.md (#23124) --- docs/design/2018-10-08-online-DDL.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/design/2018-10-08-online-DDL.md b/docs/design/2018-10-08-online-DDL.md index 732ced186bfc8..87f746a0d0796 100644 --- a/docs/design/2018-10-08-online-DDL.md +++ b/docs/design/2018-10-08-online-DDL.md @@ -73,4 +73,4 @@ This operation requires writing the corresponding column data in the table to th To reduce the impact on other read and write operations, its default priority is low. ## Compatibility -About specific compatibility, you can view [MySQL’s compatibility](https://github.com/pingcap/docs/blob/master/sql/mysql-compatibility.md#ddl). \ No newline at end of file +About specific compatibility, you can view [MySQL’s compatibility](https://github.com/pingcap/docs/blob/master/mysql-compatibility.md#ddl). From e4d2499d1f5d3ac01bb3aa6c78671c23b91959a5 Mon Sep 17 00:00:00 2001 From: dongjunduo Date: Sun, 7 Mar 2021 16:48:54 +0800 Subject: [PATCH 6/8] expression: fix linter --enable=deadcode check error (#23118) --- expression/bench_test.go | 4 ---- expression/builtin_time_vec_test.go | 4 ---- expression/constant_test.go | 17 ----------------- 3 files changed, 25 deletions(-) diff --git a/expression/bench_test.go b/expression/bench_test.go index 9f7c9fab154f1..f3b71abce94cd 100644 --- a/expression/bench_test.go +++ b/expression/bench_test.go @@ -706,10 +706,6 @@ type dateTimeGener struct { randGen *defaultRandGen } -func newDateTimeGener(fsp, year, month, day int) *dateTimeGener { - return &dateTimeGener{fsp, year, month, day, newDefaultRandGen()} -} - func (g *dateTimeGener) gen() interface{} { if g.Year == 0 { g.Year = 1970 + g.randGen.Intn(100) diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index 2e993bf8e2a42..5cb99af440060 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -68,10 +68,6 @@ type dateTimeUnitStrGener struct { randGen *defaultRandGen } -func newDateTimeUnitStrGener() *dateTimeUnitStrGener { - return &dateTimeUnitStrGener{newDefaultRandGen()} -} - // tzStrGener is used to generate strings which are timezones type tzStrGener struct{} diff --git a/expression/constant_test.go b/expression/constant_test.go index 50b58c93b6635..b0d6abbb5f3c6 100644 --- a/expression/constant_test.go +++ b/expression/constant_test.go @@ -50,23 +50,6 @@ func newLonglong(value int64) *Constant { } } -func newDate(year, month, day int) *Constant { - return newTimeConst(year, month, day, 0, 0, 0, mysql.TypeDate) -} - -func newTimestamp(yy, mm, dd, hh, min, ss int) *Constant { - return newTimeConst(yy, mm, dd, hh, min, ss, mysql.TypeTimestamp) -} - -func newTimeConst(yy, mm, dd, hh, min, ss int, tp uint8) *Constant { - var tmp types.Datum - tmp.SetMysqlTime(types.NewTime(types.FromDate(yy, mm, dd, 0, 0, 0, 0), tp, types.DefaultFsp)) - return &Constant{ - Value: tmp, - RetType: types.NewFieldType(tp), - } -} - func newFunction(funcName string, args ...Expression) Expression { typeLong := types.NewFieldType(mysql.TypeLonglong) return NewFunctionInternal(mock.NewContext(), funcName, typeLong, args...) From 37503208992ac088ec2b262a8d606cb5174d2745 Mon Sep 17 00:00:00 2001 From: Tjianke <34013484+Tjianke@users.noreply.github.com> Date: Sun, 7 Mar 2021 17:54:55 +0800 Subject: [PATCH 7/8] executor: add close recordSet in executor (#22714) --- executor/ddl_test.go | 5 +-- executor/distsql_test.go | 6 ++-- executor/executor_test.go | 52 +++++++++++++++++++++------ executor/inspection_common_test.go | 1 + executor/join_test.go | 6 +++- executor/memtable_reader_test.go | 3 ++ executor/point_get_test.go | 2 +- executor/seqtest/seq_executor_test.go | 2 +- 8 files changed, 58 insertions(+), 19 deletions(-) diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 2a9f459a89d84..bc848091998a9 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -135,6 +135,7 @@ func (s *testSuite6) TestCreateTable(c *C) { c.Assert(req.GetRow(0).GetString(1), Equals, "double") } } + c.Assert(rs.Close(), IsNil) // test multiple collate specified in column when create. tk.MustExec("drop table if exists test_multiple_column_collate;") @@ -470,7 +471,7 @@ func (s *testSuite6) TestAlterTableAddColumn(c *C) { row := req.GetRow(0) c.Assert(row.Len(), Equals, 1) c.Assert(now, GreaterEqual, row.GetTime(0).String()) - r.Close() + c.Assert(r.Close(), IsNil) tk.MustExec("alter table alter_test add column c3 varchar(50) default 'CURRENT_TIMESTAMP'") tk.MustQuery("select c3 from alter_test").Check(testkit.Rows("CURRENT_TIMESTAMP")) tk.MustExec("create or replace view alter_view as select c1,c2 from alter_test") @@ -497,7 +498,7 @@ func (s *testSuite6) TestAlterTableAddColumns(c *C) { c.Assert(err, IsNil) row := req.GetRow(0) c.Assert(row.Len(), Equals, 1) - r.Close() + c.Assert(r.Close(), IsNil) tk.MustQuery("select c3 from alter_test").Check(testkit.Rows("CURRENT_TIMESTAMP")) tk.MustExec("create or replace view alter_view as select c1,c2 from alter_test") _, err = tk.Exec("alter table alter_view add column (c4 varchar(50), c5 varchar(50))") diff --git a/executor/distsql_test.go b/executor/distsql_test.go index d027534021e73..c98f7f9e2f70c 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -80,7 +80,7 @@ func (s *testSuite3) TestCopClientSend(c *C) { err = rs.Next(ctx, req) c.Assert(err, IsNil) c.Assert(req.GetRow(0).GetMyDecimal(0).String(), Equals, "499500") - rs.Close() + c.Assert(rs.Close(), IsNil) // Split one region. key := tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(500)) @@ -95,7 +95,7 @@ func (s *testSuite3) TestCopClientSend(c *C) { err = rs.Next(ctx, req) c.Assert(err, IsNil) c.Assert(req.GetRow(0).GetMyDecimal(0).String(), Equals, "499500") - rs.Close() + c.Assert(rs.Close(), IsNil) // Check there is no goroutine leak. rs, err = tk.Exec("select * from copclient order by id") @@ -103,7 +103,7 @@ func (s *testSuite3) TestCopClientSend(c *C) { req = rs.NewChunk() err = rs.Next(ctx, req) c.Assert(err, IsNil) - rs.Close() + c.Assert(rs.Close(), IsNil) keyword := "(*copIterator).work" c.Check(checkGoroutineExists(keyword), IsFalse) } diff --git a/executor/executor_test.go b/executor/executor_test.go index b3c917323e5d8..62c44c3f0bc3b 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -650,6 +650,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields := rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "NULL") + c.Assert(rs.Close(), IsNil) sql = `select "\N";` r = tk.MustQuery(sql) @@ -659,6 +660,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `N`) + c.Assert(rs.Close(), IsNil) tk.MustExec("use test;") tk.MustExec("create table test (`\\N` int);") @@ -672,6 +674,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `\N`) + c.Assert(rs.Close(), IsNil) sql = `select \N from test;` r = tk.MustQuery(sql) @@ -682,6 +685,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { c.Check(err, IsNil) c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `NULL`) + c.Assert(rs.Close(), IsNil) sql = `select (\N) from test;` r = tk.MustQuery(sql) @@ -691,6 +695,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `NULL`) + c.Assert(rs.Close(), IsNil) sql = "select `\\N` from test;" r = tk.MustQuery(sql) @@ -700,6 +705,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `\N`) + c.Assert(rs.Close(), IsNil) sql = "select (`\\N`) from test;" r = tk.MustQuery(sql) @@ -709,6 +715,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `\N`) + c.Assert(rs.Close(), IsNil) sql = `select '\N' from test;` r = tk.MustQuery(sql) @@ -718,6 +725,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `N`) + c.Assert(rs.Close(), IsNil) sql = `select ('\N') from test;` r = tk.MustQuery(sql) @@ -727,6 +735,7 @@ func (s *testSuiteP1) TestSelectBackslashN(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `N`) + c.Assert(rs.Close(), IsNil) } // TestSelectNull Issue #4053. @@ -741,6 +750,7 @@ func (s *testSuiteP1) TestSelectNull(c *C) { fields := rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `NULL`) + c.Assert(rs.Close(), IsNil) sql = `select (null);` r = tk.MustQuery(sql) @@ -750,6 +760,7 @@ func (s *testSuiteP1) TestSelectNull(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `NULL`) + c.Assert(rs.Close(), IsNil) sql = `select null+NULL;` r = tk.MustQuery(sql) @@ -760,6 +771,7 @@ func (s *testSuiteP1) TestSelectNull(c *C) { c.Check(err, IsNil) c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `null+NULL`) + c.Assert(rs.Close(), IsNil) } // TestSelectStringLiteral Issue #3686. @@ -774,6 +786,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields := rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `abc`) + c.Assert(rs.Close(), IsNil) sql = `select (('abc'));` r = tk.MustQuery(sql) @@ -783,6 +796,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `abc`) + c.Assert(rs.Close(), IsNil) sql = `select 'abc'+'def';` r = tk.MustQuery(sql) @@ -792,6 +806,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, `'abc'+'def'`) + c.Assert(rs.Close(), IsNil) // Below checks whether leading invalid chars are trimmed. sql = "select '\n';" @@ -802,6 +817,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "") + c.Assert(rs.Close(), IsNil) sql = "select '\t col';" // Lowercased letter is a valid char. rs, err = tk.Exec(sql) @@ -809,6 +825,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "col") + c.Assert(rs.Close(), IsNil) sql = "select '\t Col';" // Uppercased letter is a valid char. rs, err = tk.Exec(sql) @@ -816,6 +833,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "Col") + c.Assert(rs.Close(), IsNil) sql = "select '\n\t 中文 col';" // Chinese char is a valid char. rs, err = tk.Exec(sql) @@ -823,6 +841,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "中文 col") + c.Assert(rs.Close(), IsNil) sql = "select ' \r\n .col';" // Punctuation is a valid char. rs, err = tk.Exec(sql) @@ -830,6 +849,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, ".col") + c.Assert(rs.Close(), IsNil) sql = "select ' 😆col';" // Emoji is a valid char. rs, err = tk.Exec(sql) @@ -837,6 +857,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "😆col") + c.Assert(rs.Close(), IsNil) // Below checks whether trailing invalid chars are preserved. sql = `select 'abc ';` @@ -845,6 +866,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "abc ") + c.Assert(rs.Close(), IsNil) sql = `select ' abc 123 ';` rs, err = tk.Exec(sql) @@ -852,6 +874,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "abc 123 ") + c.Assert(rs.Close(), IsNil) // Issue #4239. sql = `select 'a' ' ' 'string';` @@ -862,6 +885,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "a") + c.Assert(rs.Close(), IsNil) sql = `select 'a' " " "string";` r = tk.MustQuery(sql) @@ -871,6 +895,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "a") + c.Assert(rs.Close(), IsNil) sql = `select 'string' 'string';` r = tk.MustQuery(sql) @@ -880,6 +905,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "string") + c.Assert(rs.Close(), IsNil) sql = `select "ss" "a";` r = tk.MustQuery(sql) @@ -889,6 +915,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "ss") + c.Assert(rs.Close(), IsNil) sql = `select "ss" "a" "b";` r = tk.MustQuery(sql) @@ -898,6 +925,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "ss") + c.Assert(rs.Close(), IsNil) sql = `select "ss" "a" ' ' "b";` r = tk.MustQuery(sql) @@ -907,6 +935,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "ss") + c.Assert(rs.Close(), IsNil) sql = `select "ss" "a" ' ' "b" ' ' "d";` r = tk.MustQuery(sql) @@ -916,6 +945,7 @@ func (s *testSuiteP1) TestSelectStringLiteral(c *C) { fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.O, Equals, "ss") + c.Assert(rs.Close(), IsNil) } func (s *testSuiteP1) TestSelectLimit(c *C) { @@ -1111,7 +1141,7 @@ func (s *testSuiteP1) TestIssue2612(c *C) { err = rs.Next(context.Background(), req) c.Assert(err, IsNil) c.Assert(req.GetRow(0).GetDuration(0, 0).String(), Equals, "-46:09:02") - rs.Close() + c.Assert(rs.Close(), IsNil) } // TestIssue345 is related with https://github.com/pingcap/tidb/issues/345 @@ -2536,14 +2566,14 @@ func (s *testSuiteP2) TestColumnName(c *C) { c.Check(fields[0].ColumnAsName.L, Equals, "1 + c") c.Check(fields[1].Column.Name.L, Equals, "count(*)") c.Check(fields[1].ColumnAsName.L, Equals, "count(*)") - rs.Close() + c.Assert(rs.Close(), IsNil) rs, err = tk.Exec("select (c) > all (select c from t) from t") c.Check(err, IsNil) fields = rs.Fields() c.Check(len(fields), Equals, 1) c.Check(fields[0].Column.Name.L, Equals, "(c) > all (select c from t)") c.Check(fields[0].ColumnAsName.L, Equals, "(c) > all (select c from t)") - rs.Close() + c.Assert(rs.Close(), IsNil) tk.MustExec("begin") tk.MustExec("insert t values(1,1)") rs, err = tk.Exec("select c d, d c from t") @@ -2554,7 +2584,7 @@ func (s *testSuiteP2) TestColumnName(c *C) { c.Check(fields[0].ColumnAsName.L, Equals, "d") c.Check(fields[1].Column.Name.L, Equals, "d") c.Check(fields[1].ColumnAsName.L, Equals, "c") - rs.Close() + c.Assert(rs.Close(), IsNil) // Test case for query a column of a table. // In this case, all attributes have values. rs, err = tk.Exec("select c as a from t as t2") @@ -2565,7 +2595,7 @@ func (s *testSuiteP2) TestColumnName(c *C) { c.Check(fields[0].Table.Name.L, Equals, "t") c.Check(fields[0].TableAsName.L, Equals, "t2") c.Check(fields[0].DBName.L, Equals, "test") - rs.Close() + c.Assert(rs.Close(), IsNil) // Test case for query a expression which only using constant inputs. // In this case, the table, org_table and database attributes will all be empty. rs, err = tk.Exec("select hour(1) as a from t as t2") @@ -2576,7 +2606,7 @@ func (s *testSuiteP2) TestColumnName(c *C) { c.Check(fields[0].Table.Name.L, Equals, "") c.Check(fields[0].TableAsName.L, Equals, "") c.Check(fields[0].DBName.L, Equals, "") - rs.Close() + c.Assert(rs.Close(), IsNil) // Test case for query a column wrapped with parentheses and unary plus. // In this case, the column name should be its original name. rs, err = tk.Exec("select (c), (+c), +(c), +(+(c)), ++c from t") @@ -2586,7 +2616,7 @@ func (s *testSuiteP2) TestColumnName(c *C) { c.Check(fields[i].Column.Name.L, Equals, "c") c.Check(fields[i].ColumnAsName.L, Equals, "c") } - rs.Close() + c.Assert(rs.Close(), IsNil) // Test issue https://github.com/pingcap/tidb/issues/9639 . // Both window function and expression appear in final result field. @@ -2599,7 +2629,7 @@ func (s *testSuiteP2) TestColumnName(c *C) { c.Assert(fields[1].Column.Name.L, Equals, "num") c.Assert(fields[1].ColumnAsName.L, Equals, "num") tk.MustExec("set @@tidb_enable_window_function = 0") - rs.Close() + c.Assert(rs.Close(), IsNil) rs, err = tk.Exec("select if(1,c,c) from t;") c.Check(err, IsNil) @@ -2607,6 +2637,7 @@ func (s *testSuiteP2) TestColumnName(c *C) { c.Assert(fields[0].Column.Name.L, Equals, "if(1,c,c)") // It's a compatibility issue. Should be empty instead. c.Assert(fields[0].ColumnAsName.L, Equals, "if(1,c,c)") + c.Assert(rs.Close(), IsNil) } func (s *testSuiteP2) TestSelectVar(c *C) { @@ -4025,7 +4056,7 @@ func (s *testSuite3) TestUnsignedDecimalOverflow(c *C) { c.Assert(err, IsNil) } if res != nil { - res.Close() + c.Assert(res.Close(), IsNil) } } @@ -4187,8 +4218,7 @@ func (s *testSuite3) TestMaxOneRow(c *C) { err = rs.Next(context.TODO(), rs.NewChunk()) c.Assert(err.Error(), Equals, "[executor:1242]Subquery returns more than 1 row") - err = rs.Close() - c.Assert(err, IsNil) + c.Assert(rs.Close(), IsNil) } func (s *testSuiteP2) TestCurrentTimestampValueSelection(c *C) { diff --git a/executor/inspection_common_test.go b/executor/inspection_common_test.go index ab6c382e38001..eb4f8db8ceca6 100644 --- a/executor/inspection_common_test.go +++ b/executor/inspection_common_test.go @@ -55,5 +55,6 @@ func (s *inspectionSummarySuite) TestInspectionRules(c *C) { rules, err := session.ResultSetToStringSlice(context.Background(), tk.Se, rs) c.Assert(err, IsNil) c.Assert(len(rules), Equals, ca.ruleCount) + c.Assert(rs.Close(), IsNil) } } diff --git a/executor/join_test.go b/executor/join_test.go index 3f59a8bdae6b0..daa93ef680c7b 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -1109,7 +1109,7 @@ func (s *testSuiteJoin1) TestJoinLeak(c *C) { err = result.Next(context.Background(), req) c.Assert(err, IsNil) time.Sleep(time.Millisecond) - result.Close() + c.Assert(result.Close(), IsNil) tk.MustExec("set @@tidb_hash_join_concurrency=5") } @@ -2293,6 +2293,7 @@ func (s *testSuite9) TestIssue18572_1(c *C) { c.Assert(err, IsNil) _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue) + c.Assert(rs.Close(), IsNil) } func (s *testSuite9) TestIssue18572_2(c *C) { @@ -2311,6 +2312,7 @@ func (s *testSuite9) TestIssue18572_2(c *C) { c.Assert(err, IsNil) _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue) + c.Assert(rs.Close(), IsNil) } func (s *testSuite9) TestIssue18572_3(c *C) { @@ -2329,6 +2331,7 @@ func (s *testSuite9) TestIssue18572_3(c *C) { c.Assert(err, IsNil) _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue) + c.Assert(rs.Close(), IsNil) } func (s *testSuite9) TestApplyOuterAggEmptyInput(c *C) { @@ -2558,6 +2561,7 @@ func (s *testSuiteJoinSerial) TestIssue20779(c *C) { c.Assert(err, IsNil) _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(err.Error(), Matches, "testIssue20779") + c.Assert(rs.Close(), IsNil) } func (s *testSuiteJoinSerial) TestIssue20219(c *C) { diff --git a/executor/memtable_reader_test.go b/executor/memtable_reader_test.go index 4c5f70224dba4..cbcd8e98fd9b9 100644 --- a/executor/memtable_reader_test.go +++ b/executor/memtable_reader_test.go @@ -909,6 +909,7 @@ func (s *testMemTableReaderSuite) TestTiDBClusterLogError(c *C) { _, err = session.ResultSetToStringSlice(context.Background(), tk.Se, rs) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "denied to scan logs, please specified the start time, such as `time > '2020-01-01 00:00:00'`") + c.Assert(rs.Close(), IsNil) // Test without end time error. rs, err = tk.Exec("select * from information_schema.cluster_log where time>='2019/08/26 06:18:13.011'") @@ -916,6 +917,7 @@ func (s *testMemTableReaderSuite) TestTiDBClusterLogError(c *C) { _, err = session.ResultSetToStringSlice(context.Background(), tk.Se, rs) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "denied to scan logs, please specified the end time, such as `time < '2020-01-01 00:00:00'`") + c.Assert(rs.Close(), IsNil) // Test without specified message error. rs, err = tk.Exec("select * from information_schema.cluster_log where time>='2019/08/26 06:18:13.011' and time<'2019/08/26 16:18:13.011'") @@ -923,4 +925,5 @@ func (s *testMemTableReaderSuite) TestTiDBClusterLogError(c *C) { _, err = session.ResultSetToStringSlice(context.Background(), tk.Se, rs) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "denied to scan full logs (use `SELECT * FROM cluster_log WHERE message LIKE '%'` explicitly if intentionally)") + c.Assert(rs.Close(), IsNil) } diff --git a/executor/point_get_test.go b/executor/point_get_test.go index 10f53b71f8816..a30c0f511e645 100644 --- a/executor/point_get_test.go +++ b/executor/point_get_test.go @@ -91,7 +91,7 @@ func (s *testPointGetSuite) TestPointGet(c *C) { c.Assert(err, IsNil) fields := result.Fields() c.Assert(fields[0].ColumnAsName.O, Equals, "ident") - result.Close() + c.Assert(result.Close(), IsNil) tk.MustExec("CREATE TABLE tab3(pk INTEGER PRIMARY KEY, col0 INTEGER, col1 FLOAT, col2 TEXT, col3 INTEGER, col4 FLOAT, col5 TEXT);") tk.MustExec("CREATE UNIQUE INDEX idx_tab3_0 ON tab3 (col4);") diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index a6b1d9a3d31c8..ef1909bfad487 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -712,7 +712,7 @@ func (s *seqTestSuite) TestIndexDoubleReadClose(c *C) { c.Assert(err, IsNil) c.Assert(err, IsNil) keyword := "pickAndExecTask" - rs.Close() + c.Assert(rs.Close(), IsNil) time.Sleep(time.Millisecond * 10) c.Check(checkGoroutineExists(keyword), IsFalse) atomic.StoreInt32(&executor.LookupTableTaskChannelSize, originSize) From 4218f2836bb38ec79fd080fa88d09d3fe3766c3a Mon Sep 17 00:00:00 2001 From: Tjianke <34013484+Tjianke@users.noreply.github.com> Date: Sun, 7 Mar 2021 19:40:34 +0800 Subject: [PATCH 8/8] expression: fix wrong error info (#22760) --- types/overflow.go | 2 +- types/overflow_test.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/types/overflow.go b/types/overflow.go index 6bb9d658acae8..c1533d4a4451c 100644 --- a/types/overflow.go +++ b/types/overflow.go @@ -200,7 +200,7 @@ func DivUintWithInt(a uint64, b int64) (uint64, error) { func DivIntWithUint(a int64, b uint64) (uint64, error) { if a < 0 { if uint64(-a) >= b { - return 0, ErrOverflow.GenWithStackByArgs("BIGINT", fmt.Sprintf("(%d, %d)", a, b)) + return 0, ErrOverflow.GenWithStackByArgs("BIGINT UNSIGNED", fmt.Sprintf("(%d, %d)", a, b)) } return 0, nil diff --git a/types/overflow_test.go b/types/overflow_test.go index 27b7c2f80dd70..78d214f39bf3b 100644 --- a/types/overflow_test.go +++ b/types/overflow_test.go @@ -329,16 +329,18 @@ func (s *testOverflowSuite) TestDiv(c *C) { rsh uint64 ret uint64 overflow bool + err string }{ - {math.MinInt64, math.MaxInt64, 0, true}, - {0, 1, 0, false}, - {-1, math.MaxInt64, 0, false}, + {math.MinInt64, math.MaxInt64, 0, true, "*BIGINT UNSIGNED value is out of range in '\\(-9223372036854775808, 9223372036854775807\\)'"}, + {0, 1, 0, false, ""}, + {-1, math.MaxInt64, 0, false, ""}, } for _, t := range tblInt2 { ret, err := DivIntWithUint(t.lsh, t.rsh) if t.overflow { c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, t.err) } else { c.Assert(ret, Equals, t.ret) }