diff --git a/ddl/table.go b/ddl/table.go index 2fdf0fea1edde..7fd2972063dd9 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -51,6 +52,13 @@ func onCreateStream(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error return ver, errors.Trace(err) } + // Fill in TableInfo.StreamWinCol + for _, c := range tbInfo.Columns { + if c.Tp == mysql.TypeTimestamp { + tbInfo.StreamWinCol = c.Name.L + } + } + switch tbInfo.State { case model.StateNone: // none -> public diff --git a/executor/aggregate.go b/executor/aggregate.go index 5f7b3f09aa73b..7058110df037c 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -940,6 +940,17 @@ type StreamWindowHashAggExec struct { defaultVal *chunk.Chunk childResult *chunk.Chunk + + lastIter *chunk.Iterator4Chunk + + winCol string + winColIdx int + winSize uint64 + + windowStart types.Time + windowEnd types.Time + needSetWindow bool + init bool } // Open implements the Executor Open interface. @@ -947,13 +958,41 @@ func (e *StreamWindowHashAggExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } + found := false + for i, c := range e.children[0].Schema().Columns { + if c.ColName.L == e.winCol { + e.winColIdx = i + found = true + break + } + } + if !found { + return errors.New("Fail to find window col") + } + e.reset() + e.childResult = e.children[0].newFirstChunk() + e.init = false + e.needSetWindow = true + e.lastIter = nil + return nil +} + +func (e *StreamWindowHashAggExec) reset() { e.prepared = false + e.cursor4GroupKey = 0 e.groupSet = set.NewStringSet() e.partialResultMap = make(aggPartialResultMapper, 0) e.groupKeyBuffer = make([]byte, 0, 8) e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer)) - e.childResult = e.children[0].newFirstChunk() - return nil + e.groupKeys = nil +} + +func (e *StreamWindowHashAggExec) shouldReset() bool { + return e.needSetWindow +} + +func (e *StreamWindowHashAggExec) shouldStop() bool { + return e.childResult.NumRows() == 0 } // Next implements the Executor Next interface. @@ -963,6 +1002,14 @@ func (e *StreamWindowHashAggExec) Next(ctx context.Context, chk *chunk.Chunk) er defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() + //TODO: Deal with error + if e.shouldStop() && (e.init) { + return nil + } + if e.shouldReset() { + e.reset() + } + e.init = true return errors.Trace(e.next(ctx, chk)) } @@ -974,11 +1021,12 @@ func (e *StreamWindowHashAggExec) Close() error { return nil } -// unparallelExec executes hash aggregation algorithm in single thread. func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) error { // In this stage we consider all data from src as a single group. + //fmt.Println("XXXXXXXXXXXXXXXXX") if !e.prepared { err := e.execute(ctx) + //fmt.Println("55555") if err != nil { return errors.Trace(err) } @@ -993,10 +1041,13 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er e.prepared = true } chk.Reset() + //fmt.Println("666666") // Since we return e.maxChunkSize rows every time, so we should not traverse // `groupSet` because of its randomness. + var i = 0 for ; e.cursor4GroupKey < len(e.groupKeys); e.cursor4GroupKey++ { + i++ partialResults := e.getPartialResults(e.groupKeys[e.cursor4GroupKey]) if len(e.PartialAggFuncs) == 0 { chk.SetNumVirtualRows(chk.NumRows() + 1) @@ -1004,7 +1055,9 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er for i, af := range e.PartialAggFuncs { af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk) } - if chk.NumRows() == e.maxChunkSize { + chk.AppendTime(len(e.schema.Columns) - 2, e.windowStart) + chk.AppendTime(len(e.schema.Columns) - 1, e.windowEnd) + if chk.NumRows() == e.maxChunkSize { //|| e.s { e.cursor4GroupKey++ return nil } @@ -1014,26 +1067,63 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er // execute fetches Chunks from src and update each aggregate function for each row in Chunk. func (e *StreamWindowHashAggExec) execute(ctx context.Context) (err error) { - inputIter := chunk.NewIterator4Chunk(e.childResult) + //fmt.Println("=== StreamWindowHashAggExec.execute === ") + var inputIter *chunk.Iterator4Chunk + var row chunk.Row for { - err := e.children[0].Next(ctx, e.childResult) - if err != nil { - return errors.Trace(err) - } - // no more data. - if e.childResult.NumRows() == 0 { - return nil - } - for row := inputIter.Begin(); row != inputIter.End(); row = inputIter.Next() { + //fmt.Println("e.lastIter == nil = %+v", e.lastIter == nil) + if e.lastIter == nil { + inputIter = chunk.NewIterator4Chunk(e.childResult) + err := e.children[0].Next(ctx, e.childResult) + if err != nil { + return errors.Trace(err) + } + if e.childResult.NumRows() == 0 { + return nil + } + row = inputIter.Begin() + } else { + inputIter = e.lastIter + row = inputIter.Current() + } + for ;row != inputIter.End(); row = inputIter.Next() { + //fmt.Println("999999999") + tm := row.GetTime(e.winColIdx) + if e.needSetWindow { + e.windowStart = tm + e.windowEnd, err = e.windowStart.Add(e.ctx.GetSessionVars().StmtCtx, types.Duration{Duration: time.Duration(int(e.winSize)) * time.Second}) + e.needSetWindow = false + } + //fmt.Printf("win_start=%s, win_end=%s\n",e.windowStart.String(), e.windowEnd.String()) + //fmt.Printf("tm=%s\n",tm) + //fmt.Printf("FFFFFF\n") + if tm.Compare(e.windowEnd) == 1 { + e.needSetWindow = true + } + //fmt.Printf("e.needSetWindow=%v\n", e.needSetWindow) + //fmt.Printf("e.shouldStop() =%v\n", e.shouldStop()) + //fmt.Printf("row != inputIter.End() =%v\n", row == inputIter.End()) + if e.needSetWindow || e.shouldStop(){ + if row == inputIter.End() { + e.lastIter = nil + } else { + e.lastIter = inputIter + } + return nil + } groupKey, err := e.getGroupKey(row) + //fmt.Println("111") if err != nil { return errors.Trace(err) } + //fmt.Println("222") if !e.groupSet.Exist(groupKey) { e.groupSet.Insert(groupKey) e.groupKeys = append(e.groupKeys, groupKey) } + //fmt.Println("333") partialResults := e.getPartialResults(groupKey) + //fmt.Println("444") for i, af := range e.PartialAggFuncs { err = af.UpdatePartialResult(e.ctx, []chunk.Row{row}, partialResults[i]) if err != nil { @@ -1041,6 +1131,7 @@ func (e *StreamWindowHashAggExec) execute(ctx context.Context) (err error) { } } } + e.lastIter = nil } } diff --git a/executor/builder.go b/executor/builder.go index f92637dd2dd18..25f80cb03d012 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1013,6 +1013,11 @@ func (b *executorBuilder) buildProjBelowAgg(aggFuncs []*aggregation.AggFuncDesc, groupByItems[i] = newArg cursor++ } + for i, r := range src.Schema().Columns { + projSchemaCols = append(projSchemaCols, r.Clone().(*expression.Column)) + projExprs = append(projExprs, expression.Column2Exprs([]*expression.Column{r})...) + projExprs[len(projExprs)-1].(*expression.Column).Index = i + } return &ProjectionExec{ baseExecutor: newBaseExecutor(b.ctx, expression.NewSchema(projSchemaCols...), projFromID, src), @@ -1120,6 +1125,8 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor defaultVal: e.defaultVal, childResult: e.childResult, + winCol: v.StreamWindow.WinColName, + winSize: v.StreamWindow.Size, } } diff --git a/executor/show.go b/executor/show.go index 2c99a220d246d..8ff23cc51659e 100644 --- a/executor/show.go +++ b/executor/show.go @@ -103,6 +103,8 @@ func (e *ShowExec) fetchAll() error { return e.fetchShowColumns() case ast.ShowCreateTable: return e.fetchShowCreateTable() + case ast.ShowCreateStream: + return e.fetchShowCreateStream() case ast.ShowCreateDatabase: return e.fetchShowCreateDatabase() case ast.ShowDatabases: @@ -536,7 +538,9 @@ func (e *ShowExec) fetchShowCreateTable() error { if err != nil { return errors.Trace(err) } - + if tb.Meta().IsStream { + return errors.Trace(errors.New(fmt.Sprintf("table %s is a stream table. use 'show create stream %s' instead", tb.Meta().Name.L, tb.Meta().Name.L))) + } sqlMode := e.ctx.GetSessionVars().SQLMode // TODO: let the result more like MySQL. @@ -722,6 +726,80 @@ func (e *ShowExec) fetchShowCreateTable() error { return nil } +func (e *ShowExec) fetchShowCreateStream() error { + tb, err := e.getTable() + if err != nil { + return errors.Trace(err) + } + if !tb.Meta().IsStream { + return errors.Trace(errors.New(fmt.Sprintf("table %s is not a stream table. use 'show create talbe %s' instead", tb.Meta().Name.L, tb.Meta().Name.L))) + } + + sqlMode := e.ctx.GetSessionVars().SQLMode + + // TODO: let the result more like MySQL. + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf("CREATE STREAM %s (\n", escape(tb.Meta().Name, sqlMode))) + for i, col := range tb.Cols() { + buf.WriteString(fmt.Sprintf(" %s %s", escape(col.Name, sqlMode), col.GetTypeDesc())) + if col.IsGenerated() { + // It's a generated column. + buf.WriteString(fmt.Sprintf(" GENERATED ALWAYS AS (%s)", col.GeneratedExprString)) + if col.GeneratedStored { + buf.WriteString(" STORED") + } else { + buf.WriteString(" VIRTUAL") + } + } + if mysql.HasAutoIncrementFlag(col.Flag) { + buf.WriteString(" NOT NULL AUTO_INCREMENT") + } else { + if mysql.HasNotNullFlag(col.Flag) { + buf.WriteString(" NOT NULL") + } + if !mysql.HasNoDefaultValueFlag(col.Flag) { + defaultValue := col.GetDefaultValue() + switch defaultValue { + case nil: + if !mysql.HasNotNullFlag(col.Flag) { + if col.Tp == mysql.TypeTimestamp { + buf.WriteString(" NULL") + } + buf.WriteString(" DEFAULT NULL") + } + case "CURRENT_TIMESTAMP": + buf.WriteString(" DEFAULT CURRENT_TIMESTAMP") + default: + defaultValStr := fmt.Sprintf("%v", defaultValue) + if col.Tp == mysql.TypeBit { + defaultValBinaryLiteral := types.BinaryLiteral(defaultValStr) + buf.WriteString(fmt.Sprintf(" DEFAULT %s", defaultValBinaryLiteral.ToBitLiteralString(true))) + } else { + buf.WriteString(fmt.Sprintf(" DEFAULT '%s'", format.OutputFormat(defaultValStr))) + } + } + } + if mysql.HasOnUpdateNowFlag(col.Flag) { + buf.WriteString(" ON UPDATE CURRENT_TIMESTAMP") + } + } + if len(col.Comment) > 0 { + buf.WriteString(fmt.Sprintf(" COMMENT '%s'", format.OutputFormat(col.Comment))) + } + if i != len(tb.Cols())-1 { + buf.WriteString(",\n") + } + } + buf.WriteString(") WITH (\n") + for k, v := range tb.Meta().StreamProperties { + buf.WriteString(fmt.Sprintf("\t\t'%s'='%s'\n", k, v)) + } + buf.WriteString("\t\t);") + + e.appendRow([]interface{}{tb.Meta().Name.O, buf.String()}) + return nil +} + // fetchShowCreateDatabase composes show create database result. func (e *ShowExec) fetchShowCreateDatabase() error { db, ok := e.is.SchemaByName(e.DBName) diff --git a/expression/aggregation/descriptor.go b/expression/aggregation/descriptor.go index 5cb58e4aaad49..00885a32c3ec3 100644 --- a/expression/aggregation/descriptor.go +++ b/expression/aggregation/descriptor.go @@ -33,6 +33,7 @@ import ( // TODO: Complete here type AggWindowDesc struct { + WinColName string Size uint64 } diff --git a/go.mod b/go.mod index 5b2dda11bded3..49c430a7e0ea2 100644 --- a/go.mod +++ b/go.mod @@ -49,4 +49,4 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) -replace github.com/pingcap/parser => github.com/spongedu/parser v0.0.0-20181102150703-36c270493d6d +replace github.com/pingcap/parser => github.com/spongedu/parser v0.0.0-20181102150703-28da3dcd8827 diff --git a/go.sum b/go.sum index e5acf05871824..1bc40d85ad4bf 100644 --- a/go.sum +++ b/go.sum @@ -279,6 +279,10 @@ github.com/spongedu/parser v0.0.0-20181102150703-0294367261e2 h1:FfNM0blvy5go/ey github.com/spongedu/parser v0.0.0-20181102150703-0294367261e2/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= github.com/spongedu/parser v0.0.0-20181102150703-05d67ec39b67 h1:Yb88YfrMzIZWbXiuCvjCNc7lcV2HcWd+1KuNPspaOpQ= github.com/spongedu/parser v0.0.0-20181102150703-05d67ec39b67/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= +github.com/spongedu/parser v0.0.0-20181102150703-263a8d3a093d h1:ympimjDjKqSoc4Y1XUYmc1ZbyYfC99jNdOJIeySLLHI= +github.com/spongedu/parser v0.0.0-20181102150703-263a8d3a093d/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= +github.com/spongedu/parser v0.0.0-20181102150703-28da3dcd8827 h1:0PQpmdl+tqYB+TlfTSgMjXHaLlQ6PrLEifBuPykWJys= +github.com/spongedu/parser v0.0.0-20181102150703-28da3dcd8827/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= github.com/spongedu/parser v0.0.0-20181102150703-2ae733bc2c81 h1:rs+sXF/OicOiX//QfVIVDfBvsB5MzcE0coyHDWTHvmw= github.com/spongedu/parser v0.0.0-20181102150703-2ae733bc2c81/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= github.com/spongedu/parser v0.0.0-20181102150703-36c270493d6d h1:qEeHZ/BVNH8zriuiZKsbq9GzaXdx6Vyw7EWIgLKTcuk= @@ -289,6 +293,8 @@ github.com/spongedu/parser v0.0.0-20181102150703-4acd198f5092 h1:k7LQeI7t82s95X2 github.com/spongedu/parser v0.0.0-20181102150703-4acd198f5092/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= github.com/spongedu/parser v0.0.0-20181102150703-58bff2a4ea0b h1:UXFKQHwVjopn2VNpUTzMRgq2Fm3QtLLnHZ3HliGW2b0= github.com/spongedu/parser v0.0.0-20181102150703-58bff2a4ea0b/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= +github.com/spongedu/parser v0.0.0-20181102150703-69c772388fc2 h1:FTtmJ9E6kPBsGRFElKiW9mN80/9uHWOnE1YcP9bJyXo= +github.com/spongedu/parser v0.0.0-20181102150703-69c772388fc2/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= github.com/spongedu/parser v0.0.0-20181102150703-6b6427a2156a h1:nUp9zaAAEq8Q+Jqw/Y3cVaxfwOz6zTipjBZprYOBuNk= github.com/spongedu/parser v0.0.0-20181102150703-6b6427a2156a/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo= github.com/spongedu/parser v0.0.0-20181102150703-7bb1bc7c942b h1:w7FL9Lo4EvZinolU2rKdzdWIcY4H8WrnaNBfogafzoQ= @@ -425,9 +431,11 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20170420181420-c06e80d9300e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52 h1:JG/0uqcGdTNgq7FdU+61l5Pdmb8putNZlXb65bJBROs= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181026183834-f60e5f99f081/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181201035826-d0ca3933b724 h1:eV9myT/I6o1p8salzgZ0f1pz54PEgUf2NkCxEf6t+xs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.2.0 h1:S0iUepdCWODXRvtE+gcRDd15L+k+k1AiHlMiMjefH24= google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 3f6b2ca87f394..5ab220651d9b7 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -78,6 +78,7 @@ func (b *PlanBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega b.optFlag = b.optFlag | flagPredicatePushDown b.optFlag = b.optFlag | flagEliminateAgg b.optFlag = b.optFlag | flagEliminateProjection + b.optFlag = b.optFlag | flagCompleteWindowInfo plan4Agg := LogicalAggregation{AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(aggFuncList))}.Init(b.ctx) schema4Agg := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncList)+p.Schema().Len())...) @@ -122,14 +123,23 @@ func (b *PlanBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega newCol.RetType = newFunc.RetTp schema4Agg.Append(newCol) } + //TODO: Complete here + if sw != nil { + plan4Agg.AggWindow = &aggregation.AggWindowDesc{Size: sw.Size, WinColName: sw.WinCol} + // winStartCol := &expression.Column{ + // ColName: model.NewCIStr("window_start"), + // UniqueID: plan4Agg.ctx.GetSessionVars().AllocPlanColumnID(), + // } + // winEndCol := &expression.Column{ + // ColName: model.NewCIStr("window_end"), + // UniqueID: plan4Agg.ctx.GetSessionVars().AllocPlanColumnID(), + // } + // schema4Agg.Columns = append([]*expression.Column{winStartCol, winEndCol}, schema4Agg.Columns...) + } plan4Agg.SetChildren(p) plan4Agg.GroupByItems = gbyItems plan4Agg.SetSchema(schema4Agg) plan4Agg.collectGroupByColumns() - //TODO: Complete here - if sw != nil { - plan4Agg.AggWindow = &aggregation.AggWindowDesc{Size: sw.Size} - } return plan4Agg, aggIndexMap, nil } diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 3065ebb1891d7..91368583aed3d 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -43,6 +43,7 @@ const ( flagPartitionProcessor flagPushDownAgg flagPushDownTopN + flagCompleteWindowInfo ) var optRuleList = []logicalOptRule{ @@ -57,6 +58,7 @@ var optRuleList = []logicalOptRule{ &partitionProcessor{}, &aggregationPushDownSolver{}, &pushDownTopNOptimizer{}, + &streamWindowCompleter{}, } // logicalOptRule means a logical optimizing rule, which contains decorrelate, ppd, column pruning, etc. diff --git a/planner/core/plan.go b/planner/core/plan.go index ca17f0ed7a7d4..870dd18d02fe4 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -70,6 +70,8 @@ type LogicalPlan interface { // PruneColumns prunes the unused columns. PruneColumns([]*expression.Column) + CompleteStreamWindow() []*expression.Column + // findBestTask converts the logical plan to the physical plan. It's a new interface. // It is called recursively from the parent to the children to create the result physical plan. // Some logical plans will convert the children to the physical plans in different ways, and return the one @@ -233,6 +235,10 @@ func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column) { p.children[0].PruneColumns(parentUsedCols) } +func (p *baseLogicalPlan) CompleteStreamWindow() []*expression.Column { + return p.children[0].CompleteStreamWindow() +} + // basePlan implements base Plan interface. // Should be used as embedded struct in Plan implementations. type basePlan struct { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 68c5cfd05f1a3..02800cf359cca 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -919,7 +919,7 @@ func (b *PlanBuilder) buildShow(show *ast.ShowStmt) (Plan, error) { if p.DBName == "" { return nil, ErrNoDB } - case ast.ShowCreateTable: + case ast.ShowCreateTable, ast.ShowCreateStream: b.visitInfo = appendVisitInfo(b.visitInfo, mysql.AllPrivMask, show.Table.Schema.L, show.Table.Name.L, "") } p.SetSchema(buildShowSchema(show)) @@ -1631,6 +1631,8 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) { mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong} case ast.ShowCreateTable: names = []string{"Table", "Create Table"} + case ast.ShowCreateStream: + names = []string{"Table", "Create Stream"} case ast.ShowCreateDatabase: names = []string{"Database", "Create Database"} case ast.ShowGrants: diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 93bd45865d9ac..c92bbda8b07dd 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -51,6 +51,10 @@ type preprocessor struct { hasStreamTable bool + strWinColName string + + inStreamAgg bool + // tableAliasInJoin is a stack that keeps the table alias names for joins. // len(tableAliasInJoin) may bigger than 1 because the left/right child of join may be subquery that contains `JOIN` tableAliasInJoin []map[string]interface{} @@ -88,6 +92,10 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) { p.resolveShowStmt(node) case *ast.UnionSelectList: p.checkUnionSelectList(node) + case *ast.SelectStmt: + if node.GroupBy != nil && node.StreamWindowSpec != nil { + p.inStreamAgg = true + } case *ast.DeleteTableList: return in, true case *ast.Join: @@ -131,6 +139,9 @@ func (p *preprocessor) Leave(in ast.Node) (out ast.Node, ok bool) { if p.hasStreamTable && x.GroupBy != nil && x.StreamWindowSpec == nil { p.err = errors.New("Can not execute aggregation on stream table without time window") } + if x.StreamWindowSpec != nil { + x.StreamWindowSpec.WinCol = p.strWinColName + } case *ast.Join: if len(p.tableAliasInJoin) > 0 { p.tableAliasInJoin = p.tableAliasInJoin[:len(p.tableAliasInJoin)-1] @@ -640,6 +651,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { } if table != nil && table.Meta().IsStream == true { p.hasStreamTable = true + p.strWinColName = table.Meta().StreamWinCol } tn.TableInfo = table.Meta() dbInfo, _ := p.is.SchemaByName(tn.Schema) diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index b96ed9e76a314..abcd0f52c6de2 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -14,10 +14,13 @@ package core import ( + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + //"github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" + //"github.com/pingcap/tidb/types" log "github.com/sirupsen/logrus" ) @@ -85,12 +88,17 @@ func (p *LogicalSelection) PruneColumns(parentUsedCols []*expression.Column) { func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column) { child := la.children[0] used := getUsedList(parentUsedCols, la.Schema()) + //for i := len(used) - 1; i >= 0; i-- { for i := len(used) - 1; i >= 0; i-- { if !used[i] { la.schema.Columns = append(la.schema.Columns[:i], la.schema.Columns[i+1:]...) la.AggFuncs = append(la.AggFuncs[:i], la.AggFuncs[i+1:]...) } } + //TODO: Complete here + if la.AggWindow != nil { + return + } var selfUsedCols []*expression.Column for _, aggrFunc := range la.AggFuncs { selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil) diff --git a/planner/core/rule_complete_window_info.go b/planner/core/rule_complete_window_info.go new file mode 100644 index 0000000000000..386fb0bfb1599 --- /dev/null +++ b/planner/core/rule_complete_window_info.go @@ -0,0 +1,60 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/types" +) + +type streamWindowCompleter struct { +} + +func (s *streamWindowCompleter) optimize(lp LogicalPlan) (LogicalPlan, error) { + lp.CompleteStreamWindow() + return lp, nil +} + +func (p *LogicalProjection) CompleteStreamWindow() []*expression.Column { + child := p.children[0] + c := child.CompleteStreamWindow() + if c != nil { + p.schema.Columns = append(p.schema.Columns, c...) + p.Exprs = append(p.Exprs, expression.Column2Exprs(c)...) + } + return c +} + +func (la *LogicalAggregation) CompleteStreamWindow() []*expression.Column { + //TODO: Complete here + if la.AggWindow != nil { + winStartCol := &expression.Column{ + ColName: model.NewCIStr("window_start"), + UniqueID: la.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: types.NewFieldType(mysql.TypeTimestamp), + } + winEndCol := &expression.Column{ + ColName: model.NewCIStr("window_end"), + UniqueID: la.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: types.NewFieldType(mysql.TypeTimestamp), + } + x := []*expression.Column{winStartCol, winEndCol} + la.schema.Columns = append(la.schema.Columns, x...) + return x + } + return nil +} +