Skip to content

Commit

Permalink
Qiuyesuifeng/hackathon (#7)
Browse files Browse the repository at this point in the history
* *: tiny clean up

* util: add http util package

* *: add click mock data
  • Loading branch information
qiuyesuifeng authored Dec 2, 2018
1 parent 45ce04b commit 619c426
Show file tree
Hide file tree
Showing 5 changed files with 445 additions and 66 deletions.
40 changes: 13 additions & 27 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,20 +946,20 @@ type StreamWindowHashAggExec struct {
groupKeyBuffer []byte
groupValDatums []types.Datum

defaultVal *chunk.Chunk
defaultVal *chunk.Chunk

childResult *chunk.Chunk

lastIter *chunk.Iterator4Chunk
lastIter *chunk.Iterator4Chunk

winCol string
winColIdx int
winSize uint64
winSize uint64

windowStart types.Time
windowEnd types.Time
windowStart types.Time
windowEnd types.Time
needSetWindow bool
init bool
init bool
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -1032,10 +1032,8 @@ func (e *StreamWindowHashAggExec) Close() error {

func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) error {
// In this stage we consider all data from src as a single group.
//fmt.Println("XXXXXXXXXXXXXXXXX")
if !e.prepared {
err := e.execute(ctx)
//fmt.Println("55555")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1050,7 +1048,6 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er
e.prepared = true
}
chk.Reset()
//fmt.Println("666666")

// Since we return e.maxChunkSize rows every time, so we should not traverse
// `groupSet` because of its randomness.
Expand All @@ -1064,8 +1061,8 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er
for i, af := range e.PartialAggFuncs {
af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk)
}
chk.AppendTime(len(e.schema.Columns) - 2, e.windowStart)
chk.AppendTime(len(e.schema.Columns) - 1, e.windowEnd)
chk.AppendTime(len(e.schema.Columns)-2, e.windowStart)
chk.AppendTime(len(e.schema.Columns)-1, e.windowEnd)
if chk.NumRows() == e.maxChunkSize { //|| e.s {
e.cursor4GroupKey++
return nil
Expand All @@ -1076,11 +1073,9 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er

// execute fetches Chunks from src and update each aggregate function for each row in Chunk.
func (e *StreamWindowHashAggExec) execute(ctx context.Context) (err error) {
//fmt.Println("=== StreamWindowHashAggExec.execute === ")
var inputIter *chunk.Iterator4Chunk
var row chunk.Row
for {
//fmt.Println("e.lastIter == nil = %+v", e.lastIter == nil)
if e.lastIter == nil {
inputIter = chunk.NewIterator4Chunk(e.childResult)
err := e.children[0].Next(ctx, e.childResult)
Expand All @@ -1095,24 +1090,19 @@ func (e *StreamWindowHashAggExec) execute(ctx context.Context) (err error) {
inputIter = e.lastIter
row = inputIter.Current()
}
for ;row != inputIter.End(); row = inputIter.Next() {
//fmt.Println("999999999")
for ; row != inputIter.End(); row = inputIter.Next() {
tm := row.GetTime(e.winColIdx)
if e.needSetWindow {
e.windowStart = tm
e.windowEnd, err = e.windowStart.Add(e.ctx.GetSessionVars().StmtCtx, types.Duration{Duration: time.Duration(int(e.winSize)) * time.Second})
e.windowEnd, err = e.windowStart.Add(e.ctx.GetSessionVars().StmtCtx, types.Duration{Duration: time.Duration(int(e.winSize)) * time.Second})
e.needSetWindow = false
}
//fmt.Printf("win_start=%s, win_end=%s\n",e.windowStart.String(), e.windowEnd.String())
//fmt.Printf("tm=%s\n",tm)
//fmt.Printf("FFFFFF\n")

if tm.Compare(e.windowEnd) == 1 {
e.needSetWindow = true
}
//fmt.Printf("e.needSetWindow=%v\n", e.needSetWindow)
//fmt.Printf("e.shouldStop() =%v\n", e.shouldStop())
//fmt.Printf("row != inputIter.End() =%v\n", row == inputIter.End())
if e.needSetWindow || e.shouldStop(){

if e.needSetWindow || e.shouldStop() {
if row == inputIter.End() {
e.lastIter = nil
} else {
Expand All @@ -1121,18 +1111,14 @@ func (e *StreamWindowHashAggExec) execute(ctx context.Context) (err error) {
return nil
}
groupKey, err := e.getGroupKey(row)
//fmt.Println("111")
if err != nil {
return errors.Trace(err)
}
//fmt.Println("222")
if !e.groupSet.Exist(groupKey) {
e.groupSet.Insert(groupKey)
e.groupKeys = append(e.groupKeys, groupKey)
}
//fmt.Println("333")
partialResults := e.getPartialResults(groupKey)
//fmt.Println("444")
for i, af := range e.PartialAggFuncs {
err = af.UpdatePartialResult(e.ctx, []chunk.Row{row}, partialResults[i])
if err != nil {
Expand Down
101 changes: 84 additions & 17 deletions executor/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
// log "github.com/sirupsen/logrus"
Expand All @@ -36,6 +37,22 @@ var _ Executor = &StreamReaderExecutor{}
var batchFetchCnt = 10
var maxFetchCnt = 10000

type ErrorResponse struct {
ErrorCode int64 `json:"error_code"`
ErrorMsg string `json:"error_msg"`
}

type KafkaStreamData struct {
Data string `json:"data"`
Offset int64 `json:"offset"`
}

type KafkaStreamResponse struct {
ErrorResponse

Msgs []KafkaStreamData `json:"msgs"`
}

// StreamReaderExecutor reads data from a stream.
type StreamReaderExecutor struct {
baseExecutor
Expand All @@ -46,6 +63,8 @@ type StreamReaderExecutor struct {
cursor int
pos int

tp string
topic string
variableName string
}

Expand All @@ -68,8 +87,14 @@ func (e *StreamReaderExecutor) Open(ctx context.Context) error {
return errors.New("Cannot find stream table type")
}

e.tp = tp
e.setVariableName(strings.ToLower(tp))

e.topic, ok = e.Table.StreamProperties["topic"]
if !ok {
return errors.New("Cannot find stream table topic")
}

var err error
value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName)
if err != nil {
Expand Down Expand Up @@ -101,10 +126,11 @@ func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error
return errors.Trace(err)
}

pos := 0
chk.GrowAndReset(e.maxChunkSize)
if e.result == nil {
e.result = e.newFirstChunk()
err = e.fetchAll(e.pos)
pos, err = e.fetchAll(e.pos)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -128,7 +154,7 @@ func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error
chk.Append(e.result, e.cursor, e.cursor+numCurBatch)
e.cursor += numCurBatch

e.pos += numCurBatch
e.pos = pos
err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, strconv.Itoa(e.pos))
if err != nil {
return errors.Trace(err)
Expand All @@ -142,58 +168,99 @@ func (e *StreamReaderExecutor) Close() error {
return nil
}

func (e *StreamReaderExecutor) fetchAll(cursor int) error {
func (e *StreamReaderExecutor) fetchAll(cursor int) (int, error) {
var pos int
var err error
tableName := e.Table.Name.L
if tableName == "tidb_kafka_stream_table_demo" {
err := e.fetchMockKafkaData(cursor)
pos, err = e.fetchMockKafkaData(cursor)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
} else if tableName == "tidb_pulsar_stream_table_demo" {
err := e.fetchMockPulsarData(cursor)
pos, err = e.fetchMockPulsarData(cursor)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
} else if tableName == "tidb_stream_table_demo" {
err := e.fetchMockData(cursor)
pos, err = e.fetchMockData(cursor)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
} else {
pos, err = e.fetchKafkaData(e.pos)
if err != nil {
return 0, errors.Trace(err)
}
}

return nil
return pos, nil
}

func (e *StreamReaderExecutor) fetchMockData(cursor int) error {
func (e *StreamReaderExecutor) fetchMockData(cursor int) (int, error) {
var pos int
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
data, err := e.getData(mock.MockStreamJsonData[i])
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

row := chunk.MutRowFromDatums(data).ToRow()
e.result.AppendRow(row)
pos = i
}

return nil
return pos, nil
}

func (e *StreamReaderExecutor) fetchMockKafkaData(cursor int) error {
func (e *StreamReaderExecutor) fetchKafkaData(cursor int) (int, error) {
url := "http://127.0.0.1:9001/api/v1/data/kafka"
rq := util.Get(url).Param("topic", e.topic).
Param("batch", "10").
Param("offset", strconv.Itoa(cursor))

resp := &KafkaStreamResponse{}
err := rq.ToJson(resp)
if err != nil {
return 0, errors.Trace(err)
}

// log.Infof("[fetch kafka data]%v", resp)

var pos int
for _, msg := range resp.Msgs {
row, err := e.getData(msg.Data)
if err != nil {
return 0, errors.Trace(err)
}

e.result.AppendRow(chunk.MutRowFromDatums(row).ToRow())
pos = int(msg.Offset)
}

return pos, nil
}

func (e *StreamReaderExecutor) fetchMockKafkaData(cursor int) (int, error) {
var pos int
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockKafkaStreamData[i].ID, mock.MockKafkaStreamData[i].Content, mock.MockKafkaStreamData[i].CreateTime}
e.appendRow(e.result, row)
pos = i
}

return nil
return pos, nil
}

func (e *StreamReaderExecutor) fetchMockPulsarData(cursor int) error {
func (e *StreamReaderExecutor) fetchMockPulsarData(cursor int) (int, error) {
var pos int
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockPulsarStreamData[i].ID, mock.MockPulsarStreamData[i].Content, mock.MockPulsarStreamData[i].CreateTime}
e.appendRow(e.result, row)
pos = i
}

return nil
return pos, nil
}

func (e *StreamReaderExecutor) getData(data string) ([]types.Datum, error) {
Expand Down
7 changes: 1 addition & 6 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
package core

import (
"fmt"

"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
//"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
//"github.com/pingcap/tidb/types"
//log "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)

type columnPruner struct {
Expand Down Expand Up @@ -89,7 +85,6 @@ func (p *LogicalSelection) PruneColumns(parentUsedCols []*expression.Column) {
func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column) {
child := la.children[0]
used := getUsedList(parentUsedCols, la.Schema())
//for i := len(used) - 1; i >= 0; i-- {
for i := len(used) - 1; i >= 0; i-- {
if !used[i] {
la.schema.Columns = append(la.schema.Columns[:i], la.schema.Columns[i+1:]...)
Expand Down
Loading

0 comments on commit 619c426

Please sign in to comment.