Skip to content

Commit

Permalink
Merge branch 'master' into fix_race_error
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Mar 1, 2021
2 parents 5b00a48 + febac51 commit e8fe146
Show file tree
Hide file tree
Showing 35 changed files with 373 additions and 216 deletions.
7 changes: 4 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -261,7 +262,7 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) {
func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
Expand Down Expand Up @@ -334,7 +335,7 @@ func (r *selectResult) Close() error {
// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats.
type CopRuntimeStats interface {
// GetCopRuntimeStats gets the cop runtime stats information.
GetCopRuntimeStats() *tikv.CopRuntimeStats
GetCopRuntimeStats() *copr.CopRuntimeStats
}

type selectResultRuntimeStats struct {
Expand All @@ -347,7 +348,7 @@ type selectResultRuntimeStats struct {
CoprCacheHitNum int64
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) {
func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
s.copRespTime = append(s.copRespTime, respTime)
if copStats.ScanDetail != nil {
s.procKeys = append(s.procKeys, copStats.ScanDetail.ProcessedKeys)
Expand Down
8 changes: 4 additions & 4 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -31,7 +31,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
sr := selectResult{ctx: ctx, storeType: kv.TiKV}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
t := uint64(1)
Expand All @@ -41,12 +41,12 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
},
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse)

sr.copPlanIDs = []int{sr.rootPlanID}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String(), Equals, "tikv_task:{time:1ns, loops:1}, scan_detail: {total_process_keys: 0, total_keys: 0, rocksdb: {delete_skipped_count: 0, key_skipped_count: 0, block: {cache_hit_count: 0, read_count: 0, read_byte: 0 Bytes}}}")
}
15 changes: 11 additions & 4 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ func (ts *dbTestSuite) TestIntegration(c *C) {
lease := 50 * time.Millisecond
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer store.Close()
defer func() {
err := store.Close()
c.Assert(err, IsNil)
}()
session.SetSchemaLease(lease)
domain, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
Expand All @@ -48,9 +51,13 @@ func (ts *dbTestSuite) TestIntegration(c *C) {
c.Assert(err, IsNil)

// for BindHandle
se.Execute(context.Background(), "use test")
se.Execute(context.Background(), "drop table if exists t")
se.Execute(context.Background(), "create table t(i int, s varchar(20), index index_t(i, s))")
_, err = se.Execute(context.Background(), "use test")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "drop table if exists t")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "create table t(i int, s varchar(20), index index_t(i, s))")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "create global binding for select * from t where i>100 using select * from t use index(index_t) where i>100")
c.Assert(err, IsNil)
c.Assert(err, IsNil, Commentf("err %v", err))
}
3 changes: 2 additions & 1 deletion domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ func (*testSuite) TestT(c *C) {
dom.autoAnalyzeWorker(nil)
counter := metrics.PanicCounter.WithLabelValues(metrics.LabelDomain)
pb := &dto.Metric{}
counter.Write(pb)
err = counter.Write(pb)
c.Assert(err, IsNil)
c.Assert(pb.GetCounter().GetValue(), Equals, float64(2))

scope := dom.GetScope("status")
Expand Down
13 changes: 10 additions & 3 deletions domain/global_vars_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func (gvcSuite *testGVCSuite) TestSimple(c *C) {

store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer store.Close()
defer func() {
err := store.Close()
c.Assert(err, IsNil)
}()
ddlLease := 50 * time.Millisecond
dom := NewDomain(store, ddlLease, 0, 0, mockFactory)
err = dom.Init(ddlLease, sysMockFactory)
Expand Down Expand Up @@ -175,7 +178,10 @@ func (gvcSuite *testGVCSuite) TestCheckEnableStmtSummary(c *C) {

store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer store.Close()
defer func() {
err := store.Close()
c.Assert(err, IsNil)
}()
ddlLease := 50 * time.Millisecond
dom := NewDomain(store, ddlLease, 0, 0, mockFactory)
err = dom.Init(ddlLease, sysMockFactory)
Expand All @@ -197,7 +203,8 @@ func (gvcSuite *testGVCSuite) TestCheckEnableStmtSummary(c *C) {
Collate: charset.CollationBin,
}

stmtsummary.StmtSummaryByDigestMap.SetEnabled("0", false)
err = stmtsummary.StmtSummaryByDigestMap.SetEnabled("0", false)
c.Assert(err, IsNil)
ck := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024)
ck.AppendString(0, variable.TiDBEnableStmtSummary)
ck.AppendString(1, "1")
Expand Down
12 changes: 10 additions & 2 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,16 @@ func TestTopology(t *testing.T) {

cli := clus.RandClient()

failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockServerInfo", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockServerInfo")
err := failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockServerInfo", "return(true)")
if err != nil {
t.Fatal(err)
}
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockServerInfo")
if err != nil {
t.Fatal(err)
}
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, cli, false)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand All @@ -44,7 +44,7 @@ func checkGoroutineExists(keyword string) bool {

func (s *testSuite3) TestCopClientSend(c *C) {
c.Skip("not stable")
if _, ok := s.store.GetClient().(*tikv.CopClient); !ok {
if _, ok := s.store.GetClient().(*copr.CopClient); !ok {
// Make sure the store is tikv store.
return
}
Expand Down
15 changes: 8 additions & 7 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
Expand Down Expand Up @@ -7002,7 +7003,7 @@ func (s *testSerialSuite) TestCoprocessorOOMTicase(c *C) {
for _, testcase := range testcases {
c.Log(testcase.name)
// larger than one copResponse, smaller than 2 copResponse
quota := 2*tikv.MockResponseSizeForTest - 100
quota := 2*copr.MockResponseSizeForTest - 100
se, err := session.CreateSession4Test(s.store)
c.Check(err, IsNil)
tk.Se = se
Expand All @@ -7020,17 +7021,17 @@ func (s *testSerialSuite) TestCoprocessorOOMTicase(c *C) {
}

// ticase-4169, trigger oom action twice after workers consuming all the data
failpoint.Enable("github.com/pingcap/tidb/store/tikv/ticase-4169", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/store/copr/ticase-4169", `return(true)`)
f()
failpoint.Disable("github.com/pingcap/tidb/store/tikv/ticase-4169")
failpoint.Disable("github.com/pingcap/tidb/store/copr/ticase-4169")
// ticase-4170, trigger oom action twice after iterator receiving all the data.
failpoint.Enable("github.com/pingcap/tidb/store/tikv/ticase-4170", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/store/copr/ticase-4170", `return(true)`)
f()
failpoint.Disable("github.com/pingcap/tidb/store/tikv/ticase-4170")
failpoint.Disable("github.com/pingcap/tidb/store/copr/ticase-4170")
// ticase-4171, trigger oom before reading or consuming any data
failpoint.Enable("github.com/pingcap/tidb/store/tikv/ticase-4171", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/store/copr/ticase-4171", `return(true)`)
f()
failpoint.Disable("github.com/pingcap/tidb/store/tikv/ticase-4171")
failpoint.Disable("github.com/pingcap/tidb/store/copr/ticase-4171")
}

func (s *testSuite) TestIssue20237(c *C) {
Expand Down
7 changes: 4 additions & 3 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
Expand Down Expand Up @@ -148,9 +149,9 @@ func (s *seqTestSuite) TestEarlyClose(c *C) {
}

// Goroutine should not leak when error happen.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/handleTaskOnceError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/copr/handleTaskOnceError", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/handleTaskOnceError"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/copr/handleTaskOnceError"), IsNil)
}()
rss, err := tk.Se.Execute(ctx, "select * from earlyclose")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -680,7 +681,7 @@ func (s *seqTestSuite) TestShowStatsHealthy(c *C) {
// TestIndexDoubleReadClose checks that when a index double read returns before reading all the rows, the goroutine doesn't
// leak. For testing distsql with multiple regions, we need to manually split a mock TiKV.
func (s *seqTestSuite) TestIndexDoubleReadClose(c *C) {
if _, ok := s.store.GetClient().(*tikv.CopClient); !ok {
if _, ok := s.store.GetClient().(*copr.CopClient); !ok {
// Make sure the store is tikv store.
return
}
Expand Down
27 changes: 20 additions & 7 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func (*testSuite) TestT(c *C) {
defer testleak.AfterTest(c)()
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer store.Close()
defer func() {
err := store.Close()
c.Assert(err, IsNil)
}()
// Make sure it calls perfschema.Init().
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -107,7 +110,8 @@ func (*testSuite) TestT(c *C) {

dbInfos := []*model.DBInfo{dbInfo}
err = kv.RunInNewTxn(context.Background(), store, true, func(ctx context.Context, txn kv.Transaction) error {
meta.NewMeta(txn).CreateDatabase(dbInfo)
err := meta.NewMeta(txn).CreateDatabase(dbInfo)
c.Assert(err, IsNil)
return errors.Trace(err)
})
c.Assert(err, IsNil)
Expand All @@ -119,7 +123,8 @@ func (*testSuite) TestT(c *C) {
c.Assert(err, IsNil)
checkApplyCreateNonExistsSchemaDoesNotPanic(c, txn, builder)
checkApplyCreateNonExistsTableDoesNotPanic(c, txn, builder, dbID)
txn.Rollback()
err = txn.Rollback()
c.Assert(err, IsNil)

builder.Build()
is := handle.Get()
Expand Down Expand Up @@ -197,15 +202,17 @@ func (*testSuite) TestT(c *C) {
c.Assert(tb, NotNil)

err = kv.RunInNewTxn(context.Background(), store, true, func(ctx context.Context, txn kv.Transaction) error {
meta.NewMeta(txn).CreateTableOrView(dbID, tblInfo)
err := meta.NewMeta(txn).CreateTableOrView(dbID, tblInfo)
c.Assert(err, IsNil)
return errors.Trace(err)
})
c.Assert(err, IsNil)
txn, err = store.Begin()
c.Assert(err, IsNil)
_, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionRenameTable, SchemaID: dbID, TableID: tbID, OldSchemaID: dbID})
c.Assert(err, IsNil)
txn.Rollback()
err = txn.Rollback()
c.Assert(err, IsNil)
builder.Build()
is = handle.Get()
schema, ok = is.SchemaByID(dbID)
Expand Down Expand Up @@ -282,7 +289,10 @@ func (*testSuite) TestInfoTables(c *C) {
defer testleak.AfterTest(c)()
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer store.Close()
defer func() {
err := store.Close()
c.Assert(err, IsNil)
}()
handle := infoschema.NewHandle(store)
builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -344,7 +354,10 @@ func (*testSuite) TestGetBundle(c *C) {
defer testleak.AfterTest(c)()
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer store.Close()
defer func() {
err := store.Close()
c.Assert(err, IsNil)
}()

handle := infoschema.NewHandle(store)
builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0)
Expand Down
10 changes: 10 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) {
p.err = ddl.ErrWrongTableName.GenWithStackByArgs(tName)
return
}
enableNoopFuncs := p.ctx.GetSessionVars().EnableNoopFuncs
if stmt.IsTemporary && !enableNoopFuncs {
p.err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("CREATE TEMPORARY TABLE")
return
}
countPrimaryKey := 0
for _, colDef := range stmt.Cols {
if err := checkColumn(colDef); err != nil {
Expand Down Expand Up @@ -669,6 +674,11 @@ func (p *preprocessor) checkDropSequenceGrammar(stmt *ast.DropSequenceStmt) {

func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) {
p.checkDropTableNames(stmt.Tables)
enableNoopFuncs := p.ctx.GetSessionVars().EnableNoopFuncs
if stmt.IsTemporary && !enableNoopFuncs {
p.err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("DROP TEMPORARY TABLE")
return
}
}

func (p *preprocessor) checkDropTableNames(tables []*ast.TableName) {
Expand Down
4 changes: 4 additions & 0 deletions planner/core/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (s *testValidatorSuite) TestValidator(c *C) {
{"select CONVERT( 2, DECIMAL(30,65) )", true, types.ErrMBiggerThanD.GenWithStackByArgs("2")},
{"select CONVERT( 2, DECIMAL(66,99) )", true, types.ErrMBiggerThanD.GenWithStackByArgs("2")},

// https://github.com/pingcap/parser/issues/609
{"CREATE TEMPORARY TABLE t (a INT);", false, expression.ErrFunctionsNoopImpl.GenWithStackByArgs("CREATE TEMPORARY TABLE")},
{"DROP TEMPORARY TABLE t;", false, expression.ErrFunctionsNoopImpl.GenWithStackByArgs("DROP TEMPORARY TABLE")},

// TABLESAMPLE
{"select * from t tablesample bernoulli();", false, expression.ErrInvalidTableSample},
{"select * from t tablesample bernoulli(10 rows);", false, expression.ErrInvalidTableSample},
Expand Down
4 changes: 2 additions & 2 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
}

func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout", "return(true)"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout", "return(true)"), IsNil)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
pkt: &packetIO{
Expand Down Expand Up @@ -755,5 +755,5 @@ func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout"), IsNil)
}
Loading

0 comments on commit e8fe146

Please sign in to comment.