diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 1f30cb1ce66ff..3245692c93f77 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -125,7 +125,7 @@ func checkStreamType(args map[string]string) error { } tp := strings.ToLower(key) - if tp != "kafka" && tp != "pulsar" && tp != "binlog" && tp != "log" { + if tp != "kafka" && tp != "pulsar" && tp != "binlog" && tp != "log" && tp != "demo" { return errors.New("Invalid stream table type") } diff --git a/executor/stream_reader.go b/executor/stream_reader.go index c35e62f64c467..ff3d81a147350 100644 --- a/executor/stream_reader.go +++ b/executor/stream_reader.go @@ -14,14 +14,19 @@ package executor import ( + gojson "encoding/json" + "strconv" + "strings" + "github.com/cznic/mathutil" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/mock" - log "github.com/sirupsen/logrus" + // log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -30,10 +35,6 @@ var _ Executor = &StreamReaderExecutor{} var batchFetchCnt = 10 var maxFetchCnt = 10000 -var streamCursor = 0 - -// streamTableMap : {type|topic|table =>pos} -var streamTableMaps = make(map[string]int64) // StreamReaderExecutor reads data from a stream. type StreamReaderExecutor struct { @@ -41,38 +42,69 @@ type StreamReaderExecutor struct { Table *model.TableInfo Columns []*model.ColumnInfo - isInit bool result *chunk.Chunk cursor int + pos int + + variableName string } -func (e *StreamReaderExecutor) init() error { - if e.isInit { - return nil +func (e *StreamReaderExecutor) setVariableName(tp string) { + if tp == "kafka" { + e.variableName = variable.TiDBKafkaStreamTablePos + } else if tp == "pulsar" { + e.variableName = variable.TiDBPulsarStreamTablePos + } else if tp == "log" { + e.variableName = variable.TiDBLogStreamTablePos + } else if tp == "demo" { + e.variableName = variable.TiDBStreamTableDemoPos } - - e.isInit = true - return nil } // Open initialzes necessary variables for using this executor. func (e *StreamReaderExecutor) Open(ctx context.Context) error { - err := e.init() + tp, ok := e.Table.StreamProperties["type"] + if !ok { + return errors.New("Cannot find stream table type") + } + + e.setVariableName(strings.ToLower(tp)) + + var err error + value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName) if err != nil { return errors.Trace(err) } + if value != "" { + e.pos, err = strconv.Atoi(value) + if err != nil { + return errors.Trace(err) + } + } else { + err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, "0") + if err != nil { + return errors.Trace(err) + } + } return nil } // Next fills data into the chunk passed by its caller. func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { - log.Warnf("[qiuyesuifeng]%v:%v", e.Table, e.cursor) + value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName) + if err != nil { + return errors.Trace(err) + } + e.pos, err = strconv.Atoi(value) + if err != nil { + return errors.Trace(err) + } chk.GrowAndReset(e.maxChunkSize) if e.result == nil { e.result = e.newFirstChunk() - err := e.fetchAll() + err = e.fetchAll(e.pos) if err != nil { return errors.Trace(err) } @@ -95,36 +127,101 @@ func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor) chk.Append(e.result, e.cursor, e.cursor+numCurBatch) e.cursor += numCurBatch - streamCursor += numCurBatch + + e.pos += numCurBatch + err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, strconv.Itoa(e.pos)) + if err != nil { + return errors.Trace(err) + } + return nil } // Close implements the Executor Close interface. func (e *StreamReaderExecutor) Close() error { - e.isInit = false return nil } -func (e *StreamReaderExecutor) fetchAll() error { - err := e.fetchMockData() - if err != nil { - return errors.Trace(err) +func (e *StreamReaderExecutor) fetchAll(cursor int) error { + tableName := e.Table.Name.L + if tableName == "tidb_kafka_stream_table_demo" { + err := e.fetchMockKafkaData(cursor) + if err != nil { + return errors.Trace(err) + } + } else if tableName == "tidb_pulsar_stream_table_demo" { + err := e.fetchMockPulsarData(cursor) + if err != nil { + return errors.Trace(err) + } + } else if tableName == "tidb_stream_table_demo" { + err := e.fetchMockData(cursor) + if err != nil { + return errors.Trace(err) + } + } + + return nil +} + +func (e *StreamReaderExecutor) fetchMockData(cursor int) error { + for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ { + data, err := e.getData(mock.MockStreamJsonData[i]) + if err != nil { + return errors.Trace(err) + } + + row := chunk.MutRowFromDatums(data).ToRow() + e.result.AppendRow(row) } return nil } -func (e *StreamReaderExecutor) fetchMockData() error { - log.Warnf("[qiuyesuifeng][fetch mock data]%v", e.cursor) +func (e *StreamReaderExecutor) fetchMockKafkaData(cursor int) error { + for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ { + row := []interface{}{mock.MockKafkaStreamData[i].ID, mock.MockKafkaStreamData[i].Content, mock.MockKafkaStreamData[i].CreateTime} + e.appendRow(e.result, row) + } + + return nil +} - for i := streamCursor; i < maxFetchCnt && i < streamCursor+batchFetchCnt; i++ { - row := []interface{}{mock.MockStreamData[i].ID, mock.MockStreamData[i].Content, mock.MockStreamData[i].CreateTime} +func (e *StreamReaderExecutor) fetchMockPulsarData(cursor int) error { + for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ { + row := []interface{}{mock.MockPulsarStreamData[i].ID, mock.MockPulsarStreamData[i].Content, mock.MockPulsarStreamData[i].CreateTime} e.appendRow(e.result, row) } return nil } +func (e *StreamReaderExecutor) getData(data string) ([]types.Datum, error) { + m := make(map[string]interface{}) + err := gojson.Unmarshal([]byte(data), &m) + if err != nil { + return nil, errors.Trace(err) + } + + row := make([]types.Datum, 0, len(e.Columns)) + for _, col := range e.Columns { + name := col.Name.L + if value, ok := m[name]; ok { + data := types.NewDatum(value) + val, err := data.ConvertTo(e.ctx.GetSessionVars().StmtCtx, &col.FieldType) + if err != nil { + return nil, errors.Trace(err) + } + row = append(row, val) + } else { + data := types.NewDatum(nil) + row = append(row, data) + } + } + + return row, nil +} + func (e *StreamReaderExecutor) appendRow(chk *chunk.Chunk, row []interface{}) { for i, col := range row { if col == nil { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a4947a79fc01c..6327fc48d27c8 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -670,6 +670,12 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, {ScopeSession, TiDBEnableRadixJoin, boolToIntStr(DefTiDBUseRadixJoin)}, + + // Stream Table Variable + {ScopeGlobal | ScopeSession, TiDBKafkaStreamTablePos, "0"}, + {ScopeGlobal | ScopeSession, TiDBPulsarStreamTablePos, "0"}, + {ScopeGlobal | ScopeSession, TiDBLogStreamTablePos, "0"}, + {ScopeGlobal | ScopeSession, TiDBStreamTableDemoPos, "0"}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 248ab3e5cd8b6..977e7cae0df96 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -215,6 +215,19 @@ const ( // tidb_constraint_check_in_place indicates to check the constraint when the SQL executing. // It could hurt the performance of bulking insert when it is ON. TiDBConstraintCheckInPlace = "tidb_constraint_check_in_place" + + // Hack for stream table demo + // tidb kafka stream table pos + TiDBKafkaStreamTablePos = "tidb_kafka_stream_table_pos" + + // tidb pulsar stream table pos + TiDBPulsarStreamTablePos = "tidb_pulsar_stream_table_pos" + + // tidb log stream table pos + TiDBLogStreamTablePos = "tidb_log_stream_table_pos" + + // tidb stream table demo pos + TiDBStreamTableDemoPos = "tidb_stream_table_demo_pos" ) // Default TiDB system variable values. diff --git a/types/time.go b/types/time.go index 5a924c97014d2..9bbedbe2df6d4 100644 --- a/types/time.go +++ b/types/time.go @@ -186,6 +186,29 @@ type Time struct { Fsp int } +// Add user-defined time json unmarshal method +func (t *Time) UnmarshalJSON(data []byte) (err error) { + tt, err := gotime.ParseInLocation(`"`+TimeFormat+`"`, string(data), gotime.Local) + if err != nil { + return errors.Trace(err) + } + + t.Time.year = uint16(tt.Year()) + t.Time.month = uint8(tt.Month()) + t.Time.day = uint8(tt.Day()) + t.Time.hour = int(tt.Hour()) + t.Time.minute = uint8(tt.Minute()) + t.Time.second = uint8(tt.Second()) + return nil +} + +// Add user-defined time json marshal method +func (t Time) MarshalJSON() ([]byte, error) { + d := fmt.Sprintf(`"%04d-%02d-%02d %02d:%02d:%02d"`, t.Time.Year(), t.Time.Month(), t.Time.Day(), + t.Time.Hour(), t.Time.Minute(), t.Time.Second()) + return []byte(d), nil +} + // MaxMySQLTime returns Time with maximum mysql time type. func MaxMySQLTime(fsp int) Time { return Time{Time: FromDate(0, 0, 0, TimeMaxHour, TimeMaxMinute, TimeMaxSecond, 0), Type: mysql.TypeDuration, Fsp: fsp} diff --git a/util/mock/stream_data.go b/util/mock/stream_data.go index 0db70a92da5e0..ce7bc941ee3f3 100644 --- a/util/mock/stream_data.go +++ b/util/mock/stream_data.go @@ -14,31 +14,126 @@ package mock import ( - "strconv" + "encoding/json" + "fmt" + "time" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" + log "github.com/sirupsen/logrus" ) -// Mock data for stream reader. -var MockStreamData []Event +// Mock data for stream reader demo. +var ( + MockStreamData []Event + MockKafkaStreamData []KafkaEvent + MockPulsarStreamData []PulsarEvent + MockLogStreamData []LogEvent + MockStreamJsonData []string +) type Event struct { ID int64 `json:"id"` - Content string `json:"name"` + Content string `json:"content"` + CreateTime types.Time `json:"create_time"` +} + +type KafkaEvent struct { + ID int64 `json:"id"` + Content string `json:"content"` CreateTime types.Time `json:"create_time"` } +type PulsarEvent struct { + ID int64 `json:"id"` + Content string `json:"content"` + CreateTime types.Time `json:"create_time"` +} + +type LogEvent struct { + ID int64 `json:"id"` + Content string `json:"content"` + CreateTime types.Time `json:"create_time"` +} + +// {"id":49,"content":"TiDB Stream Demo Data 49","create_time":"2018-12-01 14:39:22"} + +func genData(i int, t types.Time) Event { + tt := types.Time{ + Time: types.FromDate(t.Time.Year(), t.Time.Month(), t.Time.Day(), t.Time.Hour(), t.Time.Minute(), t.Time.Second(), t.Time.Microsecond()), + Type: mysql.TypeTimestamp, + Fsp: types.DefaultFsp, + } + content := fmt.Sprintf("TiDB Stream Demo Data %d", i) + evt := Event{int64(i), content, tt} + + return evt +} + +func genKafkaData(i int, t types.Time) KafkaEvent { + tt := types.Time{ + Time: types.FromDate(t.Time.Year(), t.Time.Month(), t.Time.Day(), t.Time.Hour(), t.Time.Minute(), t.Time.Second(), t.Time.Microsecond()), + Type: mysql.TypeTimestamp, + Fsp: types.DefaultFsp, + } + content := fmt.Sprintf("TiDB Stream Demo Data %d", i) + evt := KafkaEvent{int64(i), content, tt} + + return evt +} + +func genPulsarData(i int, t types.Time) PulsarEvent { + evt := PulsarEvent{} + return evt +} + +func genLogData(i int, t types.Time) LogEvent { + evt := LogEvent{} + return evt +} + func init() { + sc := &stmtctx.StatementContext{ + TimeZone: time.UTC, + } + secondDur, err := types.ParseDuration(sc, "00:00:01", types.MaxFsp) + if err != nil { + log.Fatalf("[parse duration failed]%v", err) + } + t := types.CurrentTime(mysql.TypeTimestamp) - for i := 0; i < 10000; i++ { - tt := types.Time{ - Time: types.FromDate(t.Time.Year(), t.Time.Month(), t.Time.Day(), t.Time.Hour(), t.Time.Minute(), t.Time.Second()+i, t.Time.Microsecond()), - Type: mysql.TypeTimestamp, - Fsp: types.DefaultFsp, + for i := 0; i < 1000; i++ { + t, err = t.Add(sc, secondDur) + if err != nil { + log.Fatalf("[add time duration failed]%v", err) + } + + MockStreamData = append(MockStreamData, genData(i, t)) + MockKafkaStreamData = append(MockKafkaStreamData, genKafkaData(i, t)) + MockPulsarStreamData = append(MockPulsarStreamData, genPulsarData(i, t)) + MockLogStreamData = append(MockLogStreamData, genLogData(i, t)) + } + + for i := 0; i < 1000; i++ { + data, err := json.Marshal(MockStreamData[i]) + if err != nil { + log.Fatalf("[mock stream data marshal failed]%v", err) } - evt := Event{int64(i), strconv.Itoa(i), tt} - MockStreamData = append(MockStreamData, evt) + + evt := Event{} + err = json.Unmarshal([]byte(data), &evt) + if err != nil { + log.Fatalf("[mock stream data unmarshal failed]%v", err) + } + + MockStreamData[i] = evt + MockStreamJsonData = append(MockStreamJsonData, string(data)) + + // log.Errorf("[mock stream data]%v-%s", MockStreamData[i], string(data)) + // log.Errorf("[mock stream kafka data]%v", MockKafkaStreamData[i]) + // log.Errorf("[mock stream pulsar data]%v", MockPulsarStreamData[i]) + // log.Errorf("[mock stream log data]%v", MockLogStreamData[i]) } }