Skip to content

Commit

Permalink
Merge pull request #3 from qiuyesuifeng/qiueysuifeng/hackathon
Browse files Browse the repository at this point in the history
*: add global variable for stream table pos
  • Loading branch information
qiuyesuifeng authored Dec 1, 2018
2 parents fb5ff81 + c95bbfc commit edabda9
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 37 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func checkStreamType(args map[string]string) error {
}

tp := strings.ToLower(key)
if tp != "kafka" && tp != "pulsar" && tp != "binlog" && tp != "log" {
if tp != "kafka" && tp != "pulsar" && tp != "binlog" && tp != "log" && tp != "demo" {
return errors.New("Invalid stream table type")
}

Expand Down
147 changes: 122 additions & 25 deletions executor/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
package executor

import (
gojson "encoding/json"
"strconv"
"strings"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
log "github.com/sirupsen/logrus"
// log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

Expand All @@ -30,49 +35,76 @@ var _ Executor = &StreamReaderExecutor{}

var batchFetchCnt = 10
var maxFetchCnt = 10000
var streamCursor = 0

// streamTableMap : {type|topic|table =>pos}
var streamTableMaps = make(map[string]int64)

// StreamReaderExecutor reads data from a stream.
type StreamReaderExecutor struct {
baseExecutor
Table *model.TableInfo
Columns []*model.ColumnInfo

isInit bool
result *chunk.Chunk
cursor int
pos int

variableName string
}

func (e *StreamReaderExecutor) init() error {
if e.isInit {
return nil
func (e *StreamReaderExecutor) setVariableName(tp string) {
if tp == "kafka" {
e.variableName = variable.TiDBKafkaStreamTablePos
} else if tp == "pulsar" {
e.variableName = variable.TiDBPulsarStreamTablePos
} else if tp == "log" {
e.variableName = variable.TiDBLogStreamTablePos
} else if tp == "demo" {
e.variableName = variable.TiDBStreamTableDemoPos
}

e.isInit = true
return nil
}

// Open initialzes necessary variables for using this executor.
func (e *StreamReaderExecutor) Open(ctx context.Context) error {
err := e.init()
tp, ok := e.Table.StreamProperties["type"]
if !ok {
return errors.New("Cannot find stream table type")
}

e.setVariableName(strings.ToLower(tp))

var err error
value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName)
if err != nil {
return errors.Trace(err)
}
if value != "" {
e.pos, err = strconv.Atoi(value)
if err != nil {
return errors.Trace(err)
}
} else {
err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, "0")
if err != nil {
return errors.Trace(err)
}
}

return nil
}

// Next fills data into the chunk passed by its caller.
func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
log.Warnf("[qiuyesuifeng]%v:%v", e.Table, e.cursor)
value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName)
if err != nil {
return errors.Trace(err)
}
e.pos, err = strconv.Atoi(value)
if err != nil {
return errors.Trace(err)
}

chk.GrowAndReset(e.maxChunkSize)
if e.result == nil {
e.result = e.newFirstChunk()
err := e.fetchAll()
err = e.fetchAll(e.pos)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -95,36 +127,101 @@ func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error
numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor)
chk.Append(e.result, e.cursor, e.cursor+numCurBatch)
e.cursor += numCurBatch
streamCursor += numCurBatch

e.pos += numCurBatch
err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, strconv.Itoa(e.pos))
if err != nil {
return errors.Trace(err)
}

return nil
}

// Close implements the Executor Close interface.
func (e *StreamReaderExecutor) Close() error {
e.isInit = false
return nil
}

func (e *StreamReaderExecutor) fetchAll() error {
err := e.fetchMockData()
if err != nil {
return errors.Trace(err)
func (e *StreamReaderExecutor) fetchAll(cursor int) error {
tableName := e.Table.Name.L
if tableName == "tidb_kafka_stream_table_demo" {
err := e.fetchMockKafkaData(cursor)
if err != nil {
return errors.Trace(err)
}
} else if tableName == "tidb_pulsar_stream_table_demo" {
err := e.fetchMockPulsarData(cursor)
if err != nil {
return errors.Trace(err)
}
} else if tableName == "tidb_stream_table_demo" {
err := e.fetchMockData(cursor)
if err != nil {
return errors.Trace(err)
}
}

return nil
}

func (e *StreamReaderExecutor) fetchMockData(cursor int) error {
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
data, err := e.getData(mock.MockStreamJsonData[i])
if err != nil {
return errors.Trace(err)
}

row := chunk.MutRowFromDatums(data).ToRow()
e.result.AppendRow(row)
}

return nil
}

func (e *StreamReaderExecutor) fetchMockData() error {
log.Warnf("[qiuyesuifeng][fetch mock data]%v", e.cursor)
func (e *StreamReaderExecutor) fetchMockKafkaData(cursor int) error {
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockKafkaStreamData[i].ID, mock.MockKafkaStreamData[i].Content, mock.MockKafkaStreamData[i].CreateTime}
e.appendRow(e.result, row)
}

return nil
}

for i := streamCursor; i < maxFetchCnt && i < streamCursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockStreamData[i].ID, mock.MockStreamData[i].Content, mock.MockStreamData[i].CreateTime}
func (e *StreamReaderExecutor) fetchMockPulsarData(cursor int) error {
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockPulsarStreamData[i].ID, mock.MockPulsarStreamData[i].Content, mock.MockPulsarStreamData[i].CreateTime}
e.appendRow(e.result, row)
}

return nil
}

func (e *StreamReaderExecutor) getData(data string) ([]types.Datum, error) {
m := make(map[string]interface{})
err := gojson.Unmarshal([]byte(data), &m)
if err != nil {
return nil, errors.Trace(err)
}

row := make([]types.Datum, 0, len(e.Columns))
for _, col := range e.Columns {
name := col.Name.L
if value, ok := m[name]; ok {
data := types.NewDatum(value)
val, err := data.ConvertTo(e.ctx.GetSessionVars().StmtCtx, &col.FieldType)
if err != nil {
return nil, errors.Trace(err)
}
row = append(row, val)
} else {
data := types.NewDatum(nil)
row = append(row, data)
}
}

return row, nil
}

func (e *StreamReaderExecutor) appendRow(chk *chunk.Chunk, row []interface{}) {
for i, col := range row {
if col == nil {
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,12 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, boolToIntStr(DefTiDBUseRadixJoin)},

// Stream Table Variable
{ScopeGlobal | ScopeSession, TiDBKafkaStreamTablePos, "0"},
{ScopeGlobal | ScopeSession, TiDBPulsarStreamTablePos, "0"},
{ScopeGlobal | ScopeSession, TiDBLogStreamTablePos, "0"},
{ScopeGlobal | ScopeSession, TiDBStreamTableDemoPos, "0"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
13 changes: 13 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ const (
// tidb_constraint_check_in_place indicates to check the constraint when the SQL executing.
// It could hurt the performance of bulking insert when it is ON.
TiDBConstraintCheckInPlace = "tidb_constraint_check_in_place"

// Hack for stream table demo
// tidb kafka stream table pos
TiDBKafkaStreamTablePos = "tidb_kafka_stream_table_pos"

// tidb pulsar stream table pos
TiDBPulsarStreamTablePos = "tidb_pulsar_stream_table_pos"

// tidb log stream table pos
TiDBLogStreamTablePos = "tidb_log_stream_table_pos"

// tidb stream table demo pos
TiDBStreamTableDemoPos = "tidb_stream_table_demo_pos"
)

// Default TiDB system variable values.
Expand Down
23 changes: 23 additions & 0 deletions types/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,29 @@ type Time struct {
Fsp int
}

// Add user-defined time json unmarshal method
func (t *Time) UnmarshalJSON(data []byte) (err error) {
tt, err := gotime.ParseInLocation(`"`+TimeFormat+`"`, string(data), gotime.Local)
if err != nil {
return errors.Trace(err)
}

t.Time.year = uint16(tt.Year())
t.Time.month = uint8(tt.Month())
t.Time.day = uint8(tt.Day())
t.Time.hour = int(tt.Hour())
t.Time.minute = uint8(tt.Minute())
t.Time.second = uint8(tt.Second())
return nil
}

// Add user-defined time json marshal method
func (t Time) MarshalJSON() ([]byte, error) {
d := fmt.Sprintf(`"%04d-%02d-%02d %02d:%02d:%02d"`, t.Time.Year(), t.Time.Month(), t.Time.Day(),
t.Time.Hour(), t.Time.Minute(), t.Time.Second())
return []byte(d), nil
}

// MaxMySQLTime returns Time with maximum mysql time type.
func MaxMySQLTime(fsp int) Time {
return Time{Time: FromDate(0, 0, 0, TimeMaxHour, TimeMaxMinute, TimeMaxSecond, 0), Type: mysql.TypeDuration, Fsp: fsp}
Expand Down
Loading

0 comments on commit edabda9

Please sign in to comment.