Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor,server: re-implement the kill statement by checking the Next() function (10841) #10879

Merged
merged 3 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
err := a.executor.Next(ctx, chk)
err := Next(ctx, a.executor, chk)
if err != nil {
a.lastErr = err
return errors.Trace(err)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logAudit()
}()

err = e.Next(ctx, e.newFirstChunk())
err = Next(ctx, e, e.newFirstChunk())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
}
chk = input.chk
}
err = e.children[0].Next(ctx, chk)
err = Next(ctx, e.children[0], chk)
if err != nil {
e.finalOutputCh <- &AfFinalResult{err: errors.Trace(err)}
return
Expand Down Expand Up @@ -669,7 +669,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -856,7 +856,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return errors.Trace(err)
}

err = e.children[0].Next(ctx, e.childResult)
err = Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
for {
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
chk := e.children[0].newFirstChunk()
for {
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newFirstChunk()
err = tableReader.Next(ctx, chk)
err = Next(ctx, tableReader, chk)
if err != nil {
logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err))
return errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ErrCantChangeTxCharacteristics = terror.ClassExecutor.New(mysql.ErrCantChangeTxCharacteristics, mysql.MySQLErrName[mysql.ErrCantChangeTxCharacteristics])
ErrPsManyParam = terror.ClassExecutor.New(mysql.ErrPsManyParam, mysql.MySQLErrName[mysql.ErrPsManyParam])
ErrAdminCheckTable = terror.ClassExecutor.New(mysql.ErrAdminCheckTable, mysql.MySQLErrName[mysql.ErrAdminCheckTable])
ErrQueryInterrupted = terror.ClassExecutor.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
)

func init() {
Expand All @@ -57,6 +58,7 @@ func init() {
mysql.ErrCantChangeTxCharacteristics: mysql.ErrCantChangeTxCharacteristics,
mysql.ErrPsManyParam: mysql.ErrPsManyParam,
mysql.ErrAdminCheckTable: mysql.ErrAdminCheckTable,
mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted,
}
terror.ErrClassToMySQLCodes[terror.ClassExecutor] = tableMySQLErrCodes
}
31 changes: 23 additions & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type baseExecutor struct {
runtimeStats *execdetails.RuntimeStats
}

func (e *baseExecutor) base() *baseExecutor {
return e
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
for _, child := range e.children {
Expand Down Expand Up @@ -161,6 +165,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
base() *baseExecutor
Open(context.Context) error
Next(ctx context.Context, chk *chunk.Chunk) error
Close() error
Expand All @@ -170,6 +175,16 @@ type Executor interface {
newFirstChunk() *chunk.Chunk
}

// Next is a wrapper function on e.Next(), it handles some common codes.
func Next(ctx context.Context, e Executor, chk *chunk.Chunk) error {
sessVars := e.base().ctx.GetSessionVars()
if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) {
return ErrQueryInterrupted
}

return e.Next(ctx, chk)
}

// CancelDDLJobsExec represents a cancel DDL jobs executor.
type CancelDDLJobsExec struct {
baseExecutor
Expand Down Expand Up @@ -533,7 +548,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk = e.src.newFirstChunk()
for {
err := e.src.Next(ctx, chk)
err := Next(ctx, e.src, chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -636,7 +651,7 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
// Next implements the Executor Next interface.
func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -693,7 +708,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
for !e.meetFirstBatch {
// transfer req's requiredRows to childResult and then adjust it in childResult
e.childResult = e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.adjustRequiredRows(e.childResult))
err := Next(ctx, e.children[0], e.adjustRequiredRows(e.childResult))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -718,7 +733,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
e.cursor += batchSize
}
e.adjustRequiredRows(chk)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -788,7 +803,7 @@ func init() {
}
chk := exec.newFirstChunk()
for {
err = exec.Next(ctx, chk)
err = Next(ctx, exec, chk)
if err != nil {
return rows, errors.Trace(err)
}
Expand Down Expand Up @@ -897,7 +912,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk.AppendRow(e.inputRow)
}
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -929,7 +944,7 @@ func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) err
return nil
}
}
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1069,7 +1084,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}
e.evaluated = true
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, erro
if e.analyzeExec != nil {
chk := e.analyzeExec.newFirstChunk()
for {
err := e.analyzeExec.Next(ctx, chk)
err := Next(ctx, e.analyzeExec, chk)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {

task.memTracker.Consume(task.outerResult.MemoryUsage())
for task.outerResult.NumRows() < ow.batchSize {
err := ow.executor.Next(ctx, ow.executorChk)
err := Next(ctx, ow.executor, ow.executorChk)
if err != nil {
return task, errors.Trace(err)
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
innerResult.GetMemTracker().SetLabel("inner result")
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
err := innerExec.Next(ctx, iw.executorChk)
err := Next(ctx, innerExec, iw.executorChk)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows
batchInsert := (sessVars.BatchInsert && !sessVars.InTxn()) || config.GetGlobalConfig().EnableBatchDML

for {
err := selectExec.Next(ctx, chk)
err := Next(ctx, selectExec, chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
}
}
outerResult := outerResource.chk
err := e.outerExec.Next(ctx, outerResult)
err := Next(ctx, e.outerExec, outerResult)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: errors.Trace(err),
Expand Down Expand Up @@ -268,7 +268,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
return
}
chk := e.children[e.innerIdx].newFirstChunk()
err = e.innerExec.Next(ctx, chk)
err = Next(ctx, e.innerExec, chk)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
Expand Down Expand Up @@ -648,7 +648,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
outerIter := chunk.NewIterator4Chunk(e.outerChunk)
for {
if e.outerChunkCursor >= e.outerChunk.NumRows() {
err := e.outerExec.Next(ctx, e.outerChunk)
err := Next(ctx, e.outerExec, e.outerChunk)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
e.innerList.Reset()
innerIter := chunk.NewIterator4Chunk(e.innerChunk)
for {
err := e.innerExec.Next(ctx, e.innerChunk)
err := Next(ctx, e.innerExec, e.innerChunk)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (t *mergeJoinInnerTable) nextRow() (chunk.Row, error) {
if t.curRow == t.curIter.End() {
t.reallocReaderResult()
oldMemUsage := t.curResult.MemoryUsage()
err := t.reader.Next(t.ctx, t.curResult)
err := Next(t.ctx, t.reader, t.curResult)
// error happens or no more data.
if err != nil || t.curResult.NumRows() == 0 {
t.curRow = t.curIter.End()
Expand Down Expand Up @@ -378,7 +378,7 @@ func (e *MergeJoinExec) fetchNextInnerRows() (err error) {
// may not all belong to the same join key, but are guaranteed to be sorted
// according to the join key.
func (e *MergeJoinExec) fetchNextOuterRows(ctx context.Context) (err error) {
err = e.outerTable.reader.Next(ctx, e.outerTable.chk)
err = Next(ctx, e.outerTable.reader, e.outerTable.chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
46 changes: 13 additions & 33 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -38,22 +36,23 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
b.err = errors.Trace(err)
return nil
}
return &PointGetExecutor{
ctx: b.ctx,
schema: p.Schema(),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
}
e := &PointGetExecutor{
baseExecutor: newBaseExecutor(b.ctx, p.Schema(), p.ExplainID()),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
}
e.base().initCap = 1
e.base().maxChunkSize = 1
return e
}

// PointGetExecutor executes point select query.
type PointGetExecutor struct {
ctx sessionctx.Context
schema *expression.Schema
tps []*types.FieldType
baseExecutor

tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
Expand Down Expand Up @@ -232,22 +231,3 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
}
return nil
}

// Schema implements the Executor interface.
func (e *PointGetExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *PointGetExecutor) retTypes() []*types.FieldType {
if e.tps == nil {
e.tps = make([]*types.FieldType, e.schema.Len())
for i := range e.schema.Columns {
e.tps[i] = e.schema.Columns[i].RetType
}
}
return e.tps
}

func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk {
return chunk.New(e.retTypes(), 1, 1)
}
4 changes: 2 additions & 2 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (e *ProjectionExec) isUnparallelExec() bool {
func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk) error {
// push requiredRows down
e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {

requiredRows := atomic.LoadInt64(&f.proj.parentReqRows)
input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize)
err := f.child.Next(ctx, input.chk)
err := Next(ctx, f.child, input.chk)
if err != nil || input.chk.NumRows() == 0 {
output.done <- errors.Trace(err)
return
Expand Down
Loading