Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into not_pus…
Browse files Browse the repository at this point in the history
…hdown_to_tiflash_in_some_cases
  • Loading branch information
windtalker committed Mar 12, 2021
2 parents 7999359 + b578af4 commit 3f15bcc
Show file tree
Hide file tree
Showing 35 changed files with 657 additions and 351 deletions.
2 changes: 1 addition & 1 deletion bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func normalizeWithDefaultDB(c *C, sql, db string) (string, string) {
testParser := parser.New()
stmt, err := testParser.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
return parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test"))
return parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test", ""))
}

func (s *testSuite) TestBindParse(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (h *BindHandle) CaptureBaselines() {
continue
}
dbName := utilparser.GetDefaultDB(stmt, bindableStmt.Schema)
normalizedSQL, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, dbName))
normalizedSQL, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, dbName, bindableStmt.Query))
if r := h.GetBindRecord(digest, normalizedSQL, dbName); r != nil && r.HasUsingBinding() {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/explaintest/r/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ drop view if exists v;
create view v as select cast(replace(substring_index(substring_index("",',',1),':',-1),'"','') as CHAR(32)) as event_id;
desc v;
Field Type Null Key Default Extra
event_id varchar(32) YES NULL
event_id varchar(32) NO NULL
13 changes: 7 additions & 6 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -613,14 +613,15 @@ HashJoin 8002.00 root right outer join, equal:[eq(test.t.nb, test.t.nb)]
explain format = 'brief' select ifnull(t.a, 1) in (select count(*) from t s , t t1 where s.a = t.a and s.a = t1.a) from t;
id estRows task access object operator info
Projection 10000.00 root Column#14
└─Apply 10000.00 root CARTESIAN left outer semi join, other cond:eq(ifnull(test.t.a, 1), Column#13)
├─TableReader(Build) 10000.00 root data:TableFullScan
│ └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo
└─HashAgg(Probe) 1.00 root funcs:count(Column#15)->Column#13
└─Apply 10000.00 root left outer semi join, equal:[eq(Column#15, Column#13)]
├─Projection(Build) 10000.00 root test.t.a, ifnull(test.t.a, 1)->Column#15
│ └─TableReader 10000.00 root data:TableFullScan
│ └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo
└─HashAgg(Probe) 1.00 root funcs:count(Column#17)->Column#13
└─HashJoin 9.99 root inner join, equal:[eq(test.t.a, test.t.a)]
├─HashAgg(Build) 7.99 root group by:test.t.a, funcs:count(Column#16)->Column#15, funcs:firstrow(test.t.a)->test.t.a
├─HashAgg(Build) 7.99 root group by:test.t.a, funcs:count(Column#18)->Column#17, funcs:firstrow(test.t.a)->test.t.a
│ └─TableReader 7.99 root data:HashAgg
│ └─HashAgg 7.99 cop[tikv] group by:test.t.a, funcs:count(1)->Column#16
│ └─HashAgg 7.99 cop[tikv] group by:test.t.a, funcs:count(1)->Column#18
│ └─Selection 9.99 cop[tikv] eq(test.t.a, test.t.a), not(isnull(test.t.a))
│ └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader(Probe) 9.99 root data:Selection
Expand Down
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// DispatchMPPTasks dispathes all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, tasks)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down
8 changes: 7 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,20 @@ func (s *testFastAnalyze) TestFastAnalyze(c *C) {
c.Assert(result.Rows()[1][5], Equals, "2")
c.Assert(result.Rows()[2][5], Equals, "3")
*/
}

func (s *testSerialSuite2) TestFastAnalyze4GlobalStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
// test fast analyze in dynamic mode
tk.MustExec("set @@session.tidb_analyze_version = 2;")
tk.MustExec("set @@session.tidb_partition_prune_mode = 'dynamic';")
tk.MustExec("drop table if exists t4;")
tk.MustExec("create table t4(a int, b int) PARTITION BY HASH(a) PARTITIONS 2;")
tk.MustExec("insert into t4 values(1,1),(3,3),(4,4),(2,2),(5,5);")
err = tk.ExecToErr("analyze table t4;")
err := tk.ExecToErr("analyze table t4;")
c.Assert(err.Error(), Equals, "Fast analyze hasn't reached General Availability and only support analyze version 1 currently.")
}

Expand Down
43 changes: 4 additions & 39 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cznic/mathutil"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -1442,12 +1441,6 @@ type UnionExec struct {
results []*chunk.Chunk
wg sync.WaitGroup
initialized bool
mu struct {
*sync.Mutex
maxOpenedChildID int
}

childInFlightForTest int32
}

// unionWorkerResult stores the result for a union worker.
Expand All @@ -1467,11 +1460,12 @@ func (e *UnionExec) waitAllFinished() {

// Open implements the Executor Open interface.
func (e *UnionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
e.mu.Mutex = &sync.Mutex{}
e.mu.maxOpenedChildID = -1
return nil
}

Expand Down Expand Up @@ -1517,19 +1511,6 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) {
e.wg.Done()
}()
for childID := range e.childIDChan {
e.mu.Lock()
if childID > e.mu.maxOpenedChildID {
e.mu.maxOpenedChildID = childID
}
e.mu.Unlock()
if err := e.children[childID].Open(ctx); err != nil {
result.err = err
e.stopFetchData.Store(true)
e.resultPool <- result
}
failpoint.Inject("issue21441", func() {
atomic.AddInt32(&e.childInFlightForTest, 1)
})
for {
if e.stopFetchData.Load().(bool) {
return
Expand All @@ -1544,20 +1525,12 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) {
e.resourcePools[workerID] <- result.chk
break
}
failpoint.Inject("issue21441", func() {
if int(atomic.LoadInt32(&e.childInFlightForTest)) > e.concurrency {
panic("the count of child in flight is larger than e.concurrency unexpectedly")
}
})
e.resultPool <- result
if result.err != nil {
e.stopFetchData.Store(true)
return
}
}
failpoint.Inject("issue21441", func() {
atomic.AddInt32(&e.childInFlightForTest, -1)
})
}
}

Expand Down Expand Up @@ -1596,15 +1569,7 @@ func (e *UnionExec) Close() error {
for range e.childIDChan {
}
}
// We do not need to acquire the e.mu.Lock since all the resultPuller can be
// promised to exit when reaching here (e.childIDChan been closed).
var firstErr error
for i := 0; i <= e.mu.maxOpenedChildID; i++ {
if err := e.children[i].Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
return e.baseExecutor.Close()
}

// ResetContextOfStmt resets the StmtContext and session variables.
Expand Down
34 changes: 0 additions & 34 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7399,40 +7399,6 @@ func (s *testSuite) TestOOMActionPriority(c *C) {
c.Assert(action.GetPriority(), Equals, int64(memory.DefLogPriority))
}

func (s *testSerialSuite) TestIssue21441(c *C) {
err := failpoint.Enable("github.com/pingcap/tidb/executor/issue21441", `return`)
c.Assert(err, IsNil)
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/executor/issue21441")
c.Assert(err, IsNil)
}()

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec(`insert into t values(1),(2),(3)`)
tk.Se.GetSessionVars().InitChunkSize = 1
tk.Se.GetSessionVars().MaxChunkSize = 1
sql := `
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t`
tk.MustQuery(sql).Sort().Check(testkit.Rows(
"1", "1", "1", "1", "1", "1", "1", "1",
"2", "2", "2", "2", "2", "2", "2", "2",
"3", "3", "3", "3", "3", "3", "3", "3",
))

tk.MustQuery("select a from (" + sql + ") t order by a limit 4").Check(testkit.Rows("1", "1", "1", "1"))
tk.MustQuery("select a from (" + sql + ") t order by a limit 7, 4").Check(testkit.Rows("1", "2", "2", "2"))
}

func (s *testSuite) Test17780(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
1 change: 1 addition & 0 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
Expand Down
79 changes: 79 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@ package executor_test

import (
"fmt"
"sync"
"sync/atomic"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

type tiflashTestSuite struct {
Expand All @@ -36,6 +43,7 @@ type tiflashTestSuite struct {
}

func (s *tiflashTestSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
var err error
s.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
Expand Down Expand Up @@ -271,3 +279,74 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) {
failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks")
failpoint.Disable("github.com/pingcap/tidb/executor/checkUseMPP")
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustExec("insert into t values(4,0)")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 0)
c.Assert(failpoint.Enable(hang, `return(true)`), IsNil)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a")
c.Assert(err, NotNil)
c.Assert(int(terror.ToSQLError(errors.Cause(err).(*terror.Error)).Code), Equals, int(executor.ErrQueryInterrupted.Code()))
}()
time.Sleep(1 * time.Second)
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 1)
wg.Wait()
c.Assert(failpoint.Disable(hang), IsNil)
}

// all goroutines exit if one goroutine hangs but another return errors
func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) {
defer testleak.AfterTest(c)()
// mock non-root tasks return error
var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError"
// mock root tasks hang
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int not null primary key, b int not null)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb = testGetTableByName(c, tk.Se, "test", "t1")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t1 values(1,0)")
tk.MustExec("insert into t1 values(2,0)")
tk.MustExec("insert into t1 values(3,0)")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
c.Assert(failpoint.Enable(mppNonRootTaskError, `return(true)`), IsNil)
c.Assert(failpoint.Enable(hang, `return(true)`), IsNil)

// generate 2 root tasks, one will hang and another will return errors
err = tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a")
c.Assert(err, NotNil)
c.Assert(failpoint.Disable(mppNonRootTaskError), IsNil)
c.Assert(failpoint.Disable(hang), IsNil)
}
15 changes: 13 additions & 2 deletions expression/constant_fold.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package expression
import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -207,14 +208,24 @@ func foldConstant(expr Expression) (Expression, bool) {
return expr, isDeferredConst
}
value, err := x.Eval(chunk.Row{})
retType := x.RetType.Clone()
if !hasNullArg {
// set right not null flag for constant value
switch value.Kind() {
case types.KindNull:
retType.Flag &= ^mysql.NotNullFlag
default:
retType.Flag |= mysql.NotNullFlag
}
}
if err != nil {
logutil.BgLogger().Debug("fold expression to constant", zap.String("expression", x.ExplainInfo()), zap.Error(err))
return expr, isDeferredConst
}
if isDeferredConst {
return &Constant{Value: value, RetType: x.RetType, DeferredExpr: x}, true
return &Constant{Value: value, RetType: retType, DeferredExpr: x}, true
}
return &Constant{Value: value, RetType: x.RetType}, false
return &Constant{Value: value, RetType: retType}, false
case *Constant:
if x.ParamMarker != nil {
return &Constant{
Expand Down
3 changes: 0 additions & 3 deletions expression/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ func (pc PbConverter) ExprToPB(expr Expression) *tipb.Expr {
if pbExpr == nil {
return nil
}
if !x.Value.IsNull() {
pbExpr.FieldType.Flag |= uint32(mysql.NotNullFlag)
}
return pbExpr
case *CorrelatedColumn:
return pc.conOrCorColToPBExpr(expr)
Expand Down
1 change: 1 addition & 0 deletions expression/expr_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (s *testEvaluatorSuite) TestLikeFunc2Pb(c *C) {
client := new(mock.Client)

retTp := types.NewFieldType(mysql.TypeString)
retTp.Flag |= mysql.NotNullFlag
retTp.Charset = charset.CharsetUTF8
retTp.Collate = charset.CollationUTF8
args := []Expression{
Expand Down
Loading

0 comments on commit 3f15bcc

Please sign in to comment.