From f9fa867af3105560752460261d0b13d176bc3252 Mon Sep 17 00:00:00 2001 From: Kai Cao Date: Wed, 18 Dec 2024 15:34:31 +0800 Subject: [PATCH 1/2] [Cherry-pick] cdc task auto detect db/tbl (#20802) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 支持account,db级别同步,具体格式参考文档:https://doc.weixin.qq.com/doc/w3_AVQAQwZ_AEgrnqKicSrR0CJ1Dqxe8?scode=AJsA6gc3AA8P3rA66vAVQAQwZ_AEg 2. 支持exclude选项,排除指定范围内某些db/table,格式同上 3. 定期扫描所有库表,若检测到同步范围的新db/table,会自动拉起同步任务,下游若不存在该db/table会尝试进行自动创建 4. 其他微调:新建任务时不再会check db/table是否存在;同一任务内不允许同一张表同时作为多个同步源 Approved by: @zhangxu19830126, @XuPeng-SH, @daviszhen --- pkg/catalog/types.go | 10 + pkg/cdc/reader.go | 18 +- pkg/cdc/reader_test.go | 62 +- pkg/cdc/sinker.go | 47 +- pkg/cdc/sinker_test.go | 50 +- pkg/cdc/table_scanner.go | 173 +++++ pkg/cdc/table_scanner_test.go | 73 ++ pkg/cdc/types.go | 46 +- pkg/cdc/types_test.go | 89 +-- pkg/cdc/util.go | 23 +- pkg/cdc/watermark_updater.go | 83 +-- pkg/cdc/watermark_updater_test.go | 147 ++-- pkg/common/moerr/error.go | 7 +- pkg/common/moerr/error_no_ctx.go | 4 + pkg/frontend/cdc.go | 921 +++++++---------------- pkg/frontend/cdc_test.go | 1155 +++++++++++------------------ pkg/frontend/predefined.go | 3 +- pkg/frontend/util.go | 12 +- 18 files changed, 1261 insertions(+), 1662 deletions(-) create mode 100644 pkg/cdc/table_scanner.go create mode 100644 pkg/cdc/table_scanner_test.go diff --git a/pkg/catalog/types.go b/pkg/catalog/types.go index 4b15301ef7442..acb70c45fe21d 100644 --- a/pkg/catalog/types.go +++ b/pkg/catalog/types.go @@ -833,3 +833,13 @@ const ( SAVED_ROW_COUNT_IDX = 14 QUERY_ROW_COUNT_IDX = 15 ) + +var SystemDatabases = []string{ + "information_schema", + "mo_catalog", + "mo_debug", + "mo_task", + "mysql", + "system", + "system_metrics", +} diff --git a/pkg/cdc/reader.go b/pkg/cdc/reader.go index e6b9d5fc5ee8c..b6a8bbcf4bdd7 100644 --- a/pkg/cdc/reader.go +++ b/pkg/cdc/reader.go @@ -16,6 +16,7 @@ package cdc import ( "context" + "sync" "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -39,27 +40,29 @@ type tableReader struct { packerPool *fileservice.Pool[*types.Packer] info *DbTableInfo sinker Sinker - wMarkUpdater *WatermarkUpdater + wMarkUpdater IWatermarkUpdater tick *time.Ticker resetWatermarkFunc func(*DbTableInfo) error initSnapshotSplitTxn bool + runningReaders *sync.Map tableDef *plan.TableDef insTsColIdx, insCompositedPkColIdx int delTsColIdx, delCompositedPkColIdx int } -func NewTableReader( +var NewTableReader = func( cnTxnClient client.TxnClient, cnEngine engine.Engine, mp *mpool.MPool, packerPool *fileservice.Pool[*types.Packer], info *DbTableInfo, sinker Sinker, - wMarkUpdater *WatermarkUpdater, + wMarkUpdater IWatermarkUpdater, tableDef *plan.TableDef, resetWatermarkFunc func(*DbTableInfo) error, initSnapshotSplitTxn bool, + runningReaders *sync.Map, ) Reader { reader := &tableReader{ cnTxnClient: cnTxnClient, @@ -72,6 +75,7 @@ func NewTableReader( tick: time.NewTicker(200 * time.Millisecond), resetWatermarkFunc: resetWatermarkFunc, initSnapshotSplitTxn: initSnapshotSplitTxn, + runningReaders: runningReaders, tableDef: tableDef, } @@ -98,14 +102,16 @@ func (reader *tableReader) Run( logutil.Infof("cdc tableReader(%v).Run: start", reader.info) defer func() { if err != nil { - if err = reader.wMarkUpdater.SaveErrMsg(reader.info.SourceTblIdStr, err.Error()); err != nil { + if err = reader.wMarkUpdater.SaveErrMsg(reader.info.SourceDbName, reader.info.SourceTblName, err.Error()); err != nil { logutil.Infof("cdc tableReader(%v).Run: save err msg failed, err: %v", reader.info, err) } } reader.Close() + reader.runningReaders.Delete(GenDbTblKey(reader.info.SourceDbName, reader.info.SourceTblName)) logutil.Infof("cdc tableReader(%v).Run: end", reader.info) }() + reader.runningReaders.Store(GenDbTblKey(reader.info.SourceDbName, reader.info.SourceTblName), reader) for { select { case <-ctx.Done(): @@ -192,7 +198,7 @@ func (reader *tableReader) readTableWithTxn( //step2 : define time range // from = last wmark // to = txn operator snapshot ts - fromTs := reader.wMarkUpdater.GetFromMem(reader.info.SourceTblIdStr) + fromTs := reader.wMarkUpdater.GetFromMem(reader.info.SourceDbName, reader.info.SourceTblName) toTs := types.TimestampToTS(GetSnapshotTS(txnOp)) start := time.Now() changes, err = CollectChanges(ctx, rel, fromTs, toTs, reader.mp) @@ -264,7 +270,7 @@ func (reader *tableReader) readTableWithTxn( } if err == nil { - reader.wMarkUpdater.UpdateMem(reader.info.SourceTblIdStr, toTs) + reader.wMarkUpdater.UpdateMem(reader.info.SourceDbName, reader.info.SourceTblName, toTs) } } else { // has error already if hasBegin { diff --git a/pkg/cdc/reader_test.go b/pkg/cdc/reader_test.go index 773f6b5a07751..2df4c9864c60a 100644 --- a/pkg/cdc/reader_test.go +++ b/pkg/cdc/reader_test.go @@ -38,15 +38,16 @@ import ( func TestNewTableReader(t *testing.T) { type args struct { - cnTxnClient client.TxnClient - cnEngine engine.Engine - mp *mpool.MPool - packerPool *fileservice.Pool[*types.Packer] - info *DbTableInfo - sinker Sinker - wMarkUpdater *WatermarkUpdater - tableDef *plan.TableDef - restartFunc func(*DbTableInfo) error + cnTxnClient client.TxnClient + cnEngine engine.Engine + mp *mpool.MPool + packerPool *fileservice.Pool[*types.Packer] + info *DbTableInfo + sinker Sinker + wMarkUpdater *WatermarkUpdater + tableDef *plan.TableDef + restartFunc func(*DbTableInfo) error + runningReaders *sync.Map } tableDef := &plan.TableDef{ @@ -76,7 +77,29 @@ func TestNewTableReader(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.NotNilf(t, NewTableReader(tt.args.cnTxnClient, tt.args.cnEngine, tt.args.mp, tt.args.packerPool, tt.args.info, tt.args.sinker, tt.args.wMarkUpdater, tt.args.tableDef, tt.args.restartFunc, true), "NewTableReader(%v, %v, %v, %v, %v, %v, %v, %v, %v)", tt.args.cnTxnClient, tt.args.cnEngine, tt.args.mp, tt.args.packerPool, tt.args.info, tt.args.sinker, tt.args.wMarkUpdater, tt.args.tableDef, tt.args.restartFunc) + assert.NotNilf(t, NewTableReader( + tt.args.cnTxnClient, + tt.args.cnEngine, + tt.args.mp, + tt.args.packerPool, + tt.args.info, + tt.args.sinker, + tt.args.wMarkUpdater, + tt.args.tableDef, + tt.args.restartFunc, + true, + tt.args.runningReaders, + ), + "NewTableReader(%v,%v,%v,%v,%v,%v,%v,%v,%v)", + tt.args.cnTxnClient, + tt.args.cnEngine, + tt.args.mp, + tt.args.packerPool, + tt.args.info, + tt.args.sinker, + tt.args.wMarkUpdater, + tt.args.tableDef, + tt.args.restartFunc) }) } } @@ -218,6 +241,7 @@ func Test_tableReader_Run(t *testing.T) { insCompositedPkColIdx: tt.fields.insCompositedPkColIdx, delTsColIdx: tt.fields.delTsColIdx, delCompositedPkColIdx: tt.fields.delCompositedPkColIdx, + runningReaders: &sync.Map{}, } reader.Run(tt.args.ctx, tt.args.ar) }) @@ -238,6 +262,11 @@ func Test_tableReader_Run_StaleRead(t *testing.T) { tick: time.NewTicker(time.Millisecond * 300), sinker: NewConsoleSinker(nil, nil), resetWatermarkFunc: func(*DbTableInfo) error { return nil }, + runningReaders: &sync.Map{}, + info: &DbTableInfo{ + SourceDbName: "db1", + SourceTblName: "t1", + }, } reader.Run(ctx, NewCdcActiveRoutine()) cancel() @@ -254,10 +283,12 @@ func Test_tableReader_Run_StaleRead(t *testing.T) { tick: time.NewTicker(time.Millisecond * 300), sinker: NewConsoleSinker(nil, nil), info: &DbTableInfo{ - SourceTblIdStr: "1_0", + SourceDbName: "db1", + SourceTblName: "t1", }, wMarkUpdater: u, resetWatermarkFunc: func(*DbTableInfo) error { return moerr.NewInternalErrorNoCtx("") }, + runningReaders: &sync.Map{}, } reader.Run(ctx, NewCdcActiveRoutine()) cancel() @@ -283,10 +314,12 @@ func Test_tableReader_Run_NonStaleReadErr(t *testing.T) { tick: time.NewTicker(time.Millisecond * 300), sinker: NewConsoleSinker(nil, nil), info: &DbTableInfo{ - SourceTblIdStr: "1_0", + SourceDbName: "db1", + SourceTblName: "t1", }, wMarkUpdater: u, resetWatermarkFunc: func(*DbTableInfo) error { return nil }, + runningReaders: &sync.Map{}, } reader.Run(ctx, NewCdcActiveRoutine()) } @@ -319,8 +352,8 @@ func Test_tableReader_readTableWithTxn(t *testing.T) { reader := &tableReader{ info: &DbTableInfo{ - SourceTblName: "t1", - SourceTblIdStr: "123", + SourceDbName: "db1", + SourceTblName: "t1", }, packerPool: pool, wMarkUpdater: watermarkUpdater, @@ -328,6 +361,7 @@ func Test_tableReader_readTableWithTxn(t *testing.T) { insTsColIdx: 0, insCompositedPkColIdx: 3, sinker: NewConsoleSinker(nil, nil), + runningReaders: &sync.Map{}, } getRelationByIdStub := gostub.Stub(&GetRelationById, func(_ context.Context, _ engine.Engine, _ client.TxnOperator, _ uint64) (string, string, engine.Relation, error) { diff --git a/pkg/cdc/sinker.go b/pkg/cdc/sinker.go index 0ef171aa5109a..d55a1dc5d8f85 100644 --- a/pkg/cdc/sinker.go +++ b/pkg/cdc/sinker.go @@ -35,9 +35,11 @@ import ( const ( // sqlBufReserved leave 5 bytes for mysql driver - sqlBufReserved = 5 - sqlPrintLen = 200 - fakeSql = "fakeSql" + sqlBufReserved = 5 + sqlPrintLen = 200 + fakeSql = "fakeSql" + createTable = "create table" + createTableIfNotExists = "create table if not exists" ) var ( @@ -47,10 +49,10 @@ var ( dummy = []byte("") ) -func NewSinker( +var NewSinker = func( sinkUri UriInfo, dbTblInfo *DbTableInfo, - watermarkUpdater *WatermarkUpdater, + watermarkUpdater IWatermarkUpdater, tableDef *plan.TableDef, retryTimes int, retryDuration time.Duration, @@ -68,6 +70,21 @@ func NewSinker( return nil, err } + ctx := context.Background() + padding := strings.Repeat(" ", sqlBufReserved) + // create db + _ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbTblInfo.SinkDbName))) + // use db + _ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("use `%s`", dbTblInfo.SinkDbName))) + // create table + createSql := strings.TrimSpace(dbTblInfo.SourceCreateSql) + if len(createSql) < len(createTableIfNotExists) || !strings.EqualFold(createSql[:len(createTableIfNotExists)], createTableIfNotExists) { + createSql = createTableIfNotExists + createSql[len(createTable):] + } + createSql = strings.ReplaceAll(createSql, dbTblInfo.SourceDbName, dbTblInfo.SinkDbName) + createSql = strings.ReplaceAll(createSql, dbTblInfo.SourceTblName, dbTblInfo.SinkTblName) + _ = sink.Send(ctx, ar, []byte(padding+createSql)) + return NewMysqlSinker(sink, dbTblInfo, watermarkUpdater, tableDef, ar, maxSqlLength), nil } @@ -75,12 +92,12 @@ var _ Sinker = new(consoleSinker) type consoleSinker struct { dbTblInfo *DbTableInfo - watermarkUpdater *WatermarkUpdater + watermarkUpdater IWatermarkUpdater } func NewConsoleSinker( dbTblInfo *DbTableInfo, - watermarkUpdater *WatermarkUpdater, + watermarkUpdater IWatermarkUpdater, ) Sinker { return &consoleSinker{ dbTblInfo: dbTblInfo, @@ -142,7 +159,7 @@ var _ Sinker = new(mysqlSinker) type mysqlSinker struct { mysql Sink dbTblInfo *DbTableInfo - watermarkUpdater *WatermarkUpdater + watermarkUpdater IWatermarkUpdater ar *ActiveRoutine // buf of sql statement @@ -179,7 +196,7 @@ type mysqlSinker struct { var NewMysqlSinker = func( mysql Sink, dbTblInfo *DbTableInfo, - watermarkUpdater *WatermarkUpdater, + watermarkUpdater IWatermarkUpdater, tableDef *plan.TableDef, ar *ActiveRoutine, maxSqlLength uint64, @@ -288,7 +305,7 @@ func (s *mysqlSinker) Run(ctx context.Context, ar *ActiveRoutine) { } func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) { - watermark := s.watermarkUpdater.GetFromMem(s.dbTblInfo.SourceTblIdStr) + watermark := s.watermarkUpdater.GetFromMem(s.dbTblInfo.SourceDbName, s.dbTblInfo.SourceTblName) if data.toTs.LE(&watermark) { logutil.Errorf("cdc mysqlSinker(%v): unexpected watermark: %s, current watermark: %s", s.dbTblInfo, data.toTs.ToString(), watermark.ToString()) @@ -417,7 +434,7 @@ func (s *mysqlSinker) sinkSnapshot(ctx context.Context, bat *batch.Batch) { } // step3: append to sqlBuf, send sql if sqlBuf is full - if err = s.appendSqlBuf(ctx, InsertRow); err != nil { + if err = s.appendSqlBuf(InsertRow); err != nil { s.err.Store(err) return } @@ -501,7 +518,7 @@ func (s *mysqlSinker) sinkInsert(ctx context.Context, insertIter *atomicBatchRow } // step3: append to sqlBuf - if err = s.appendSqlBuf(ctx, InsertRow); err != nil { + if err = s.appendSqlBuf(InsertRow); err != nil { return } @@ -530,7 +547,7 @@ func (s *mysqlSinker) sinkDelete(ctx context.Context, deleteIter *atomicBatchRow } // step3: append to sqlBuf - if err = s.appendSqlBuf(ctx, DeleteRow); err != nil { + if err = s.appendSqlBuf(DeleteRow); err != nil { return } @@ -539,7 +556,7 @@ func (s *mysqlSinker) sinkDelete(ctx context.Context, deleteIter *atomicBatchRow // appendSqlBuf appends rowBuf to sqlBuf if not exceed its cap // otherwise, send sql to downstream first, then reset sqlBuf and append -func (s *mysqlSinker) appendSqlBuf(ctx context.Context, rowType RowType) (err error) { +func (s *mysqlSinker) appendSqlBuf(rowType RowType) (err error) { // insert suffix: `;`, delete suffix: `);` suffixLen := 1 if rowType == DeleteRow { @@ -668,8 +685,8 @@ var NewMysqlSink = func( return ret, err } +// Send must leave 5 bytes at the head of sqlBuf func (s *mysqlSink) Send(ctx context.Context, ar *ActiveRoutine, sqlBuf []byte) error { - // must leave 5 bytes at the head of sqlBuf reuseQueryArg := sql.NamedArg{ Name: mysql.ReuseQueryBuf, Value: sqlBuf, diff --git a/pkg/cdc/sinker_test.go b/pkg/cdc/sinker_test.go index 3a0b40eb2677f..682d7964f06e3 100644 --- a/pkg/cdc/sinker_test.go +++ b/pkg/cdc/sinker_test.go @@ -40,7 +40,7 @@ func TestNewSinker(t *testing.T) { type args struct { sinkUri UriInfo dbTblInfo *DbTableInfo - watermarkUpdater *WatermarkUpdater + watermarkUpdater IWatermarkUpdater tableDef *plan.TableDef retryTimes int retryDuration time.Duration @@ -75,23 +75,42 @@ func TestNewSinker(t *testing.T) { sinkUri: UriInfo{ SinkTyp: MysqlSink, }, - dbTblInfo: &DbTableInfo{}, + dbTblInfo: &DbTableInfo{ + SourceCreateSql: "create table t1 (a int, b int, c int)", + }, watermarkUpdater: nil, tableDef: nil, retryTimes: 0, retryDuration: 0, + ar: NewCdcActiveRoutine(), }, want: nil, wantErr: assert.NoError, }, } + db, mock, err := sqlmock.New() + assert.NoError(t, err) + mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1)) + + sink := &mysqlSink{ + user: "root", + password: "123456", + ip: "127.0.0.1", + port: 3306, + retryTimes: DefaultRetryTimes, + retryDuration: DefaultRetryDuration, + conn: db, + } + sinkStub := gostub.Stub(&NewMysqlSink, func(_, _, _ string, _, _ int, _ time.Duration, _ string) (Sink, error) { - return nil, nil + return sink, nil }) defer sinkStub.Reset() - sinkerStub := gostub.Stub(&NewMysqlSinker, func(_ Sink, _ *DbTableInfo, _ *WatermarkUpdater, _ *plan.TableDef, _ *ActiveRoutine, _ uint64) Sinker { + sinkerStub := gostub.Stub(&NewMysqlSinker, func(Sink, *DbTableInfo, IWatermarkUpdater, *plan.TableDef, *ActiveRoutine, uint64) Sinker { return nil }) defer sinkerStub.Reset() @@ -110,7 +129,7 @@ func TestNewSinker(t *testing.T) { func TestNewConsoleSinker(t *testing.T) { type args struct { dbTblInfo *DbTableInfo - watermarkUpdater *WatermarkUpdater + watermarkUpdater IWatermarkUpdater } tests := []struct { name string @@ -370,11 +389,11 @@ func Test_mysqlSinker_appendSqlBuf(t *testing.T) { s.sqlBuf = append(s.sqlBuf[:sqlBufReserved], s.tsInsertPrefix...) s.rowBuf = []byte("insert") // not exceed cap - err = s.appendSqlBuf(ctx, InsertRow) + err = s.appendSqlBuf(InsertRow) assert.NoError(t, err) assert.Equal(t, []byte(prefix+tsInsertPrefix+"insert"), s.sqlBuf) // exceed cap - err = s.appendSqlBuf(ctx, InsertRow) + err = s.appendSqlBuf(InsertRow) assert.NoError(t, err) assert.Equal(t, []byte(prefix+tsInsertPrefix+"insert"), s.sqlBuf) @@ -382,11 +401,11 @@ func Test_mysqlSinker_appendSqlBuf(t *testing.T) { s.sqlBuf = append(s.sqlBuf[:sqlBufReserved], s.tsDeletePrefix...) s.rowBuf = []byte("delete") // not exceed cap - err = s.appendSqlBuf(ctx, DeleteRow) + err = s.appendSqlBuf(DeleteRow) assert.NoError(t, err) assert.Equal(t, []byte(prefix+tsDeletePrefix+"delete"), s.sqlBuf) // exceed cap - err = s.appendSqlBuf(ctx, DeleteRow) + err = s.appendSqlBuf(DeleteRow) assert.NoError(t, err) assert.Equal(t, []byte(prefix+tsDeletePrefix+"delete"), s.sqlBuf) } @@ -473,7 +492,7 @@ func Test_mysqlSinker_Sink(t *testing.T) { watermarkUpdater := &WatermarkUpdater{ watermarkMap: &sync.Map{}, } - watermarkUpdater.UpdateMem("1_0", t0) + watermarkUpdater.UpdateMem("db1", "t1", t0) tableDef := &plan.TableDef{ Cols: []*plan.ColDef{ @@ -592,13 +611,14 @@ func Test_mysqlSinker_Sink_NoMoreData(t *testing.T) { mock.ExpectExec(".*").WillReturnError(moerr.NewInternalErrorNoCtx("")) dbTblInfo := &DbTableInfo{ - SourceTblIdStr: "1_0", + SourceDbName: "db1", + SourceTblName: "t1", } watermarkUpdater := &WatermarkUpdater{ watermarkMap: &sync.Map{}, } - watermarkUpdater.UpdateMem("1_0", types.BuildTS(0, 1)) + watermarkUpdater.UpdateMem("db1", "t1", types.BuildTS(0, 1)) ar := NewCdcActiveRoutine() @@ -773,14 +793,16 @@ func Test_mysqlsink(t *testing.T) { sink := &mysqlSinker{ watermarkUpdater: wmark, dbTblInfo: &DbTableInfo{ - SourceTblId: 0, + SourceTblId: 0, + SourceTblName: "t1", + SourceDbName: "db1", }, } tts := timestamp.Timestamp{ PhysicalTime: 100, LogicalTime: 100, } - sink.watermarkUpdater.watermarkMap.Store(uint64(0), types.TimestampToTS(tts)) + sink.watermarkUpdater.UpdateMem("db1", "t1", types.TimestampToTS(tts)) sink.Sink(context.Background(), &DecoderOutput{}) } diff --git a/pkg/cdc/table_scanner.go b/pkg/cdc/table_scanner.go new file mode 100644 index 0000000000000..af65bcb0a88c4 --- /dev/null +++ b/pkg/cdc/table_scanner.go @@ -0,0 +1,173 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cdc + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/util/executor" +) + +var ( + scanner *TableScanner + once sync.Once + + scanSql = fmt.Sprintf("select "+ + " rel_id, "+ + " relname, "+ + " reldatabase_id, "+ + " reldatabase, "+ + " rel_createsql, "+ + " account_id "+ + "from "+ + " mo_catalog.mo_tables "+ + "where "+ + " relkind = '%s'"+ + " and reldatabase not in (%s)", + catalog.SystemOrdinaryRel, // only scan ordinary tables + AddSingleQuotesJoin(catalog.SystemDatabases), // skip system databases + ) +) + +var getSqlExecutor = func(cnUUID string) executor.SQLExecutor { + v, _ := runtime.ServiceRuntime(cnUUID).GetGlobalVariables(runtime.InternalSQLExecutor) + return v.(executor.SQLExecutor) +} + +var GetTableScanner = func(cnUUID string) *TableScanner { + once.Do(func() { + scanner = &TableScanner{ + Mutex: sync.Mutex{}, + Mp: make(map[uint32]TblMap), + Callbacks: make(map[string]func(map[uint32]TblMap)), + } + scanner.exec = getSqlExecutor(cnUUID) + }) + return scanner +} + +// TblMap key is dbName.tableName, e.g. db1.t1 +type TblMap map[string]*DbTableInfo + +type TableScanner struct { + sync.Mutex + + Mp map[uint32]TblMap + Callbacks map[string]func(map[uint32]TblMap) + exec executor.SQLExecutor + cancel context.CancelFunc +} + +func (s *TableScanner) Register(id string, cb func(map[uint32]TblMap)) { + s.Lock() + defer s.Unlock() + + if len(s.Callbacks) == 0 { + ctx, cancel := context.WithCancel(defines.AttachAccountId(context.Background(), catalog.System_Account)) + s.cancel = cancel + go s.scanTableLoop(ctx) + } + s.Callbacks[id] = cb +} + +func (s *TableScanner) UnRegister(id string) { + s.Lock() + defer s.Unlock() + + delete(s.Callbacks, id) + if len(s.Callbacks) == 0 && s.cancel != nil { + s.cancel() + s.cancel = nil + } +} + +func (s *TableScanner) scanTableLoop(ctx context.Context) { + logutil.Infof("cdc TableScanner.scanTableLoop: start") + defer func() { + logutil.Infof("cdc TableScanner.scanTableLoop: end") + }() + + timeTick := time.Tick(10 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-timeTick: + s.scanTable() + // do callbacks + s.Lock() + for _, cb := range s.Callbacks { + go cb(s.Mp) + } + s.Unlock() + } + } +} + +func (s *TableScanner) scanTable() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + result, err := s.exec.Exec(ctx, scanSql, executor.Options{}) + if err != nil { + return + } + defer result.Close() + + mp := make(map[uint32]TblMap) + result.ReadRows(func(rows int, cols []*vector.Vector) bool { + for i := 0; i < rows; i++ { + tblId := vector.MustFixedColWithTypeCheck[uint64](cols[0])[i] + tblName := cols[1].UnsafeGetStringAt(i) + dbId := vector.MustFixedColWithTypeCheck[uint64](cols[2])[i] + dbName := cols[3].UnsafeGetStringAt(i) + createSql := cols[4].UnsafeGetStringAt(i) + accountId := vector.MustFixedColWithTypeCheck[uint32](cols[5])[i] + + // skip table with foreign key + if strings.Contains(strings.ToLower("createSql"), "foreign key") { + continue + } + + if _, ok := mp[accountId]; !ok { + mp[accountId] = make(TblMap) + } + + key := GenDbTblKey(dbName, tblName) + mp[accountId][key] = &DbTableInfo{ + SourceDbId: dbId, + SourceDbName: dbName, + SourceTblId: tblId, + SourceTblName: tblName, + SourceCreateSql: createSql, + } + } + return true + }) + + // replace the old table map + s.Lock() + s.Mp = mp + s.Unlock() +} diff --git a/pkg/cdc/table_scanner_test.go b/pkg/cdc/table_scanner_test.go new file mode 100644 index 0000000000000..0eb7b50beb900 --- /dev/null +++ b/pkg/cdc/table_scanner_test.go @@ -0,0 +1,73 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cdc + +import ( + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/testutil" + "github.com/matrixorigin/matrixone/pkg/util/executor" + mock_executor "github.com/matrixorigin/matrixone/pkg/util/executor/test" + "github.com/prashantv/gostub" + "github.com/stretchr/testify/assert" +) + +func TestGetTableScanner(t *testing.T) { + gostub.Stub(&getSqlExecutor, func(cnUUID string) executor.SQLExecutor { + return &mock_executor.MockSQLExecutor{} + }) + assert.NotNil(t, GetTableScanner("cnUUID")) +} + +func TestTableScanner(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + bat := batch.New([]string{"tblId", "tblName", "dbId", "dbName", "createSql", "accountId"}) + bat.Vecs[0] = testutil.MakeUint64Vector([]uint64{1}, nil) + bat.Vecs[1] = testutil.MakeVarcharVector([]string{"tblName"}, nil) + bat.Vecs[2] = testutil.MakeUint64Vector([]uint64{1}, nil) + bat.Vecs[3] = testutil.MakeVarcharVector([]string{"dbName"}, nil) + bat.Vecs[4] = testutil.MakeVarcharVector([]string{"createSql"}, nil) + bat.Vecs[5] = testutil.MakeUint32Vector([]uint32{1}, nil) + bat.SetRowCount(1) + res := executor.Result{ + Mp: testutil.TestUtilMp, + Batches: []*batch.Batch{bat}, + } + + mockSqlExecutor := mock_executor.NewMockSQLExecutor(ctrl) + mockSqlExecutor.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any()).Return(res, nil).AnyTimes() + + scanner = &TableScanner{ + Mutex: sync.Mutex{}, + Mp: make(map[uint32]TblMap), + Callbacks: make(map[string]func(map[uint32]TblMap)), + exec: mockSqlExecutor, + } + + scanner.Register("id", func(mp map[uint32]TblMap) {}) + assert.Equal(t, 1, len(scanner.Callbacks)) + + // one round of scanTable + time.Sleep(11 * time.Second) + + scanner.UnRegister("id") + assert.Equal(t, 0, len(scanner.Callbacks)) +} diff --git a/pkg/cdc/types.go b/pkg/cdc/types.go index 643b4672e2262..2b0663bfe08f8 100644 --- a/pkg/cdc/types.go +++ b/pkg/cdc/types.go @@ -35,8 +35,11 @@ import ( ) const ( - AccountLevel = "account" - ClusterLevel = "cluster" + TableLevel = "table" + DbLevel = "database" + AccountLevel = "account" + MatchAll = "*" + MysqlSink = "mysql" MatrixoneSink = "matrixone" ConsoleSink = "console" @@ -92,6 +95,18 @@ type Sink interface { Close() } +type IWatermarkUpdater interface { + Run(ctx context.Context, ar *ActiveRoutine) + InsertIntoDb(dbTableInfo *DbTableInfo, watermark types.TS) error + GetFromMem(dbName, tblName string) types.TS + GetFromDb(dbName, tblName string) (watermark types.TS, err error) + UpdateMem(dbName, tblName string, watermark types.TS) + DeleteFromMem(dbName, tblName string) + DeleteFromDb(dbName, tblName string) error + DeleteAllFromDb() error + SaveErrMsg(dbName, tblName string, errMsg string) error +} + type ActiveRoutine struct { sync.Mutex Pause chan struct{} @@ -161,17 +176,14 @@ type RowIterator interface { } type DbTableInfo struct { - SourceAccountName string - SourceDbName string - SourceTblName string - SourceAccountId uint64 - SourceDbId uint64 - SourceTblId uint64 - SourceTblIdStr string + SourceDbId uint64 + SourceDbName string + SourceTblId uint64 + SourceTblName string + SourceCreateSql string - SinkAccountName string - SinkDbName string - SinkTblName string + SinkDbName string + SinkTblName string } func (info DbTableInfo) String() string { @@ -365,16 +377,12 @@ func (info *UriInfo) String() string { } type PatternTable struct { - AccountId uint64 `json:"account_id"` - Account string `json:"account"` - Database string `json:"database"` - Table string `json:"table"` - TableIsRegexp bool `json:"table_is_regexp"` - Reserved bool `json:"reserved"` + Database string `json:"database"` + Table string `json:"table"` } func (table PatternTable) String() string { - return fmt.Sprintf("(%s,%d,%s,%s)", table.Account, table.AccountId, table.Database, table.Table) + return fmt.Sprintf("%s.%s", table.Database, table.Table) } type PatternTuple struct { diff --git a/pkg/cdc/types_test.go b/pkg/cdc/types_test.go index 6a08ce82052ce..b4780896ae15d 100644 --- a/pkg/cdc/types_test.go +++ b/pkg/cdc/types_test.go @@ -205,15 +205,12 @@ func Test_atomicBatchRowIter(t *testing.T) { func TestDbTableInfo_String(t *testing.T) { type fields struct { - SourceAccountName string - SourceDbName string - SourceTblName string - SourceAccountId uint64 - SourceDbId uint64 - SourceTblId uint64 - SinkAccountName string - SinkDbName string - SinkTblName string + SourceDbName string + SourceTblName string + SourceDbId uint64 + SourceTblId uint64 + SinkDbName string + SinkTblName string } tests := []struct { name string @@ -235,15 +232,12 @@ func TestDbTableInfo_String(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { info := DbTableInfo{ - SourceAccountName: tt.fields.SourceAccountName, - SourceDbName: tt.fields.SourceDbName, - SourceTblName: tt.fields.SourceTblName, - SourceAccountId: tt.fields.SourceAccountId, - SourceDbId: tt.fields.SourceDbId, - SourceTblId: tt.fields.SourceTblId, - SinkAccountName: tt.fields.SinkAccountName, - SinkDbName: tt.fields.SinkDbName, - SinkTblName: tt.fields.SinkTblName, + SourceDbName: tt.fields.SourceDbName, + SourceTblName: tt.fields.SourceTblName, + SourceDbId: tt.fields.SourceDbId, + SourceTblId: tt.fields.SourceTblId, + SinkDbName: tt.fields.SinkDbName, + SinkTblName: tt.fields.SinkTblName, } assert.Equalf(t, tt.want, info.String(), "String()") }) @@ -331,12 +325,9 @@ func TestOutputType_String(t *testing.T) { func TestPatternTable_String(t *testing.T) { type fields struct { - AccountId uint64 - Account string - Database string - Table string - TableIsRegexp bool - Reserved bool + Database string + Table string + Reserved bool } tests := []struct { name string @@ -345,23 +336,17 @@ func TestPatternTable_String(t *testing.T) { }{ { fields: fields{ - AccountId: 123, - Account: "account", - Database: "database", - Table: "table", + Database: "database", + Table: "table", }, - want: "(account,123,database,table)", + want: "database.table", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { table := PatternTable{ - AccountId: tt.fields.AccountId, - Account: tt.fields.Account, - Database: tt.fields.Database, - Table: tt.fields.Table, - TableIsRegexp: tt.fields.TableIsRegexp, - Reserved: tt.fields.Reserved, + Database: tt.fields.Database, + Table: tt.fields.Table, } assert.Equalf(t, tt.want, table.String(), "String()") }) @@ -383,19 +368,15 @@ func TestPatternTuple_String(t *testing.T) { { fields: fields{ Source: PatternTable{ - AccountId: 123, - Account: "account1", - Database: "database1", - Table: "table1", + Database: "database1", + Table: "table1", }, Sink: PatternTable{ - AccountId: 456, - Account: "account2", - Database: "database2", - Table: "table2", + Database: "database2", + Table: "table2", }, }, - want: "(account1,123,database1,table1),(account2,456,database2,table2)", + want: "database1.table1,database2.table2", }, } for _, tt := range tests { @@ -434,10 +415,8 @@ func TestPatternTuples_Append(t *testing.T) { args: args{ pt: &PatternTuple{ Source: PatternTable{ - AccountId: 123, - Account: "account1", - Database: "database1", - Table: "table1", + Database: "database1", + Table: "table1", }, }, }, @@ -475,21 +454,17 @@ func TestPatternTuples_String(t *testing.T) { Pts: []*PatternTuple{ { Source: PatternTable{ - AccountId: 123, - Account: "account1", - Database: "database1", - Table: "table1", + Database: "database1", + Table: "table1", }, Sink: PatternTable{ - AccountId: 456, - Account: "account2", - Database: "database2", - Table: "table2", + Database: "database2", + Table: "table2", }, }, }, }, - want: "(account1,123,database1,table1),(account2,456,database2,table2)", + want: "database1.table1,database2.table2", }, } for _, tt := range tests { diff --git a/pkg/cdc/util.go b/pkg/cdc/util.go index a67411db18a24..72dd13d9ce0d2 100644 --- a/pkg/cdc/util.go +++ b/pkg/cdc/util.go @@ -26,6 +26,7 @@ import ( "math/rand" "slices" "strconv" + "strings" "time" "go.uber.org/zap" @@ -623,7 +624,7 @@ var ExitRunSql = func(txnOp client.TxnOperator) { txnOp.ExitRunSql() } -func GetTableDef( +var GetTableDef = func( ctx context.Context, txnOp client.TxnOperator, cnEngine engine.Engine, @@ -732,3 +733,23 @@ func batchRowCount(bat *batch.Batch) int { } return bat.Vecs[0].Length() } + +// AddSingleQuotesJoin [a, b, c] -> 'a','b','c' +func AddSingleQuotesJoin(s []string) string { + if len(s) == 0 { + return "" + } + return "'" + strings.Join(s, "','") + "'" +} + +func GenDbTblKey(dbName, tblName string) string { + return dbName + "." + tblName +} + +func SplitDbTblKey(dbTblKey string) (dbName, tblName string) { + s := strings.Split(dbTblKey, ".") + if len(s) != 2 { + return + } + return s[0], s[1] +} diff --git a/pkg/cdc/watermark_updater.go b/pkg/cdc/watermark_updater.go index 90d2f8fd36ce8..f04308bc51822 100644 --- a/pkg/cdc/watermark_updater.go +++ b/pkg/cdc/watermark_updater.go @@ -35,21 +35,21 @@ const ( maxErrMsgLen = 256 - insertWatermarkFormat = "insert into mo_catalog.mo_cdc_watermark values (%d, '%s', '%s', '%s', '%s', '%s', '%s')" + insertWatermarkFormat = "insert into mo_catalog.mo_cdc_watermark values (%d, '%s', '%s', '%s', '%s', '%s')" - getWatermarkFormat = "select watermark from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s' and table_id = '%s'" + getWatermarkFormat = "select watermark from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s' and db_name = '%s' and table_name = '%s'" - getAllWatermarkFormat = "select table_id, watermark from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s'" - - updateWatermarkFormat = "update mo_catalog.mo_cdc_watermark set watermark='%s' where account_id = %d and task_id = '%s' and table_id = '%s'" + updateWatermarkFormat = "update mo_catalog.mo_cdc_watermark set watermark='%s' where account_id = %d and task_id = '%s' and db_name = '%s' and table_name = '%s'" deleteWatermarkFormat = "delete from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s'" - deleteWatermarkByTableFormat = "delete from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s' and table_id = '%s'" + deleteWatermarkByTableFormat = "delete from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s' and db_name = '%s' and table_name = '%s'" - updateErrMsgFormat = "update mo_catalog.mo_cdc_watermark set err_msg='%s' where account_id = %d and task_id = '%s' and table_id = '%s'" + updateErrMsgFormat = "update mo_catalog.mo_cdc_watermark set err_msg='%s' where account_id = %d and task_id = '%s' and db_name = '%s' and table_name = '%s'" ) +var _ IWatermarkUpdater = new(WatermarkUpdater) + type WatermarkUpdater struct { accountId uint32 taskId uuid.UUID @@ -59,9 +59,9 @@ type WatermarkUpdater struct { watermarkMap *sync.Map } -func NewWatermarkUpdater(accountId uint64, taskId string, ie ie.InternalExecutor) *WatermarkUpdater { +func NewWatermarkUpdater(accountId uint32, taskId string, ie ie.InternalExecutor) *WatermarkUpdater { u := &WatermarkUpdater{ - accountId: uint32(accountId), + accountId: accountId, ie: ie, watermarkMap: &sync.Map{}, } @@ -93,29 +93,29 @@ func (u *WatermarkUpdater) Run(ctx context.Context, ar *ActiveRoutine) { func (u *WatermarkUpdater) InsertIntoDb(dbTableInfo *DbTableInfo, watermark types.TS) error { sql := fmt.Sprintf(insertWatermarkFormat, u.accountId, u.taskId, - dbTableInfo.SourceTblIdStr, dbTableInfo.SourceDbName, dbTableInfo.SourceTblName, + dbTableInfo.SourceDbName, dbTableInfo.SourceTblName, watermark.ToString(), "") ctx := defines.AttachAccountId(context.Background(), catalog.System_Account) return u.ie.Exec(ctx, sql, ie.SessionOverrideOptions{}) } -func (u *WatermarkUpdater) GetFromMem(tableIdStr string) types.TS { - if value, ok := u.watermarkMap.Load(tableIdStr); ok { +func (u *WatermarkUpdater) GetFromMem(dbName, tblName string) types.TS { + if value, ok := u.watermarkMap.Load(GenDbTblKey(dbName, tblName)); ok { return value.(types.TS) } return types.TS{} } -func (u *WatermarkUpdater) GetFromDb(tableIdStr string) (watermark types.TS, err error) { - sql := fmt.Sprintf(getWatermarkFormat, u.accountId, u.taskId, tableIdStr) +func (u *WatermarkUpdater) GetFromDb(dbName, tblName string) (watermark types.TS, err error) { + sql := fmt.Sprintf(getWatermarkFormat, u.accountId, u.taskId, dbName, tblName) ctx := defines.AttachAccountId(context.Background(), catalog.System_Account) res := u.ie.Query(ctx, sql, ie.SessionOverrideOptions{}) if res.Error() != nil { err = res.Error() } else if res.RowCount() < 1 { - err = moerr.NewInternalErrorf(ctx, "no watermark found for task: %s, tableIdStr: %v\n", u.taskId, tableIdStr) + err = moerr.NewErrNoWatermarkFoundNoCtx(dbName, tblName) } else if res.RowCount() > 1 { - err = moerr.NewInternalErrorf(ctx, "duplicate watermark found for task: %s, tableIdStr: %v\n", u.taskId, tableIdStr) + err = moerr.NewInternalErrorf(ctx, "duplicate watermark found for task: %s, table: %s.%s", u.taskId, dbName, tblName) } if err != nil { return @@ -128,40 +128,16 @@ func (u *WatermarkUpdater) GetFromDb(tableIdStr string) (watermark types.TS, err return types.StringToTS(watermarkStr), nil } -func (u *WatermarkUpdater) GetAllFromDb() (mp map[string]types.TS, err error) { - sql := fmt.Sprintf(getAllWatermarkFormat, u.accountId, u.taskId) - ctx := defines.AttachAccountId(context.Background(), catalog.System_Account) - res := u.ie.Query(ctx, sql, ie.SessionOverrideOptions{}) - if res.Error() != nil { - err = res.Error() - return - } - - var tableIdStr string - var watermarkStr string - mp = make(map[string]types.TS) - for i := uint64(0); i < res.RowCount(); i++ { - if tableIdStr, err = res.GetString(ctx, i, 0); err != nil { - return - } - if watermarkStr, err = res.GetString(ctx, i, 1); err != nil { - return - } - mp[tableIdStr] = types.StringToTS(watermarkStr) - } - return -} - -func (u *WatermarkUpdater) UpdateMem(tableIdStr string, watermark types.TS) { - u.watermarkMap.Store(tableIdStr, watermark) +func (u *WatermarkUpdater) UpdateMem(dbName, tblName string, watermark types.TS) { + u.watermarkMap.Store(GenDbTblKey(dbName, tblName), watermark) } -func (u *WatermarkUpdater) DeleteFromMem(tableIdStr string) { - u.watermarkMap.Delete(tableIdStr) +func (u *WatermarkUpdater) DeleteFromMem(dbName, tblName string) { + u.watermarkMap.Delete(GenDbTblKey(dbName, tblName)) } -func (u *WatermarkUpdater) DeleteFromDb(tableIdStr string) error { - sql := fmt.Sprintf(deleteWatermarkByTableFormat, u.accountId, u.taskId, tableIdStr) +func (u *WatermarkUpdater) DeleteFromDb(dbName, tblName string) error { + sql := fmt.Sprintf(deleteWatermarkByTableFormat, u.accountId, u.taskId, dbName, tblName) ctx := defines.AttachAccountId(context.Background(), catalog.System_Account) return u.ie.Exec(ctx, sql, ie.SessionOverrideOptions{}) } @@ -172,28 +148,29 @@ func (u *WatermarkUpdater) DeleteAllFromDb() error { return u.ie.Exec(ctx, sql, ie.SessionOverrideOptions{}) } -func (u *WatermarkUpdater) SaveErrMsg(tableIdStr string, errMsg string) error { +func (u *WatermarkUpdater) SaveErrMsg(dbName, tblName string, errMsg string) error { if len(errMsg) > maxErrMsgLen { errMsg = errMsg[:maxErrMsgLen] } - sql := fmt.Sprintf(updateErrMsgFormat, errMsg, u.accountId, u.taskId, tableIdStr) + sql := fmt.Sprintf(updateErrMsgFormat, errMsg, u.accountId, u.taskId, dbName, tblName) ctx := defines.AttachAccountId(context.Background(), catalog.System_Account) return u.ie.Exec(ctx, sql, ie.SessionOverrideOptions{}) } func (u *WatermarkUpdater) flushAll() { u.watermarkMap.Range(func(k, v any) bool { - tableIdStr := k.(string) + key := k.(string) ts := v.(types.TS) - if err := u.flush(tableIdStr, ts); err != nil { - logutil.Errorf("flush table %s failed, current watermark: %s err: %v\n", tableIdStr, ts.ToString(), err) + if err := u.flush(key, ts); err != nil { + logutil.Errorf("flush table %s failed, current watermark: %s err: %v", key, ts.ToString(), err) } return true }) } -func (u *WatermarkUpdater) flush(tableIdStr string, watermark types.TS) error { - sql := fmt.Sprintf(updateWatermarkFormat, watermark.ToString(), u.accountId, u.taskId, tableIdStr) +func (u *WatermarkUpdater) flush(key string, watermark types.TS) error { + dbName, tblName := SplitDbTblKey(key) + sql := fmt.Sprintf(updateWatermarkFormat, watermark.ToString(), u.accountId, u.taskId, dbName, tblName) ctx := defines.AttachAccountId(context.Background(), catalog.System_Account) return u.ie.Exec(ctx, sql, ie.SessionOverrideOptions{}) } diff --git a/pkg/cdc/watermark_updater_test.go b/pkg/cdc/watermark_updater_test.go index 55917c0c0ccb1..842c3c4195c90 100644 --- a/pkg/cdc/watermark_updater_test.go +++ b/pkg/cdc/watermark_updater_test.go @@ -17,14 +17,12 @@ package cdc import ( "context" "regexp" - "strconv" "strings" "sync" "testing" "time" "github.com/google/uuid" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" "github.com/stretchr/testify/assert" @@ -32,35 +30,34 @@ import ( ) type wmMockSQLExecutor struct { - mp map[string]string - insertRe *regexp.Regexp - updateRe *regexp.Regexp - selectRe *regexp.Regexp - selectAllRe *regexp.Regexp + mp map[string]string + insertRe *regexp.Regexp + updateRe *regexp.Regexp + selectRe *regexp.Regexp } func newWmMockSQLExecutor() *wmMockSQLExecutor { return &wmMockSQLExecutor{ - mp: make(map[string]string), - insertRe: regexp.MustCompile(`^insert .* values \(.*\, .*\, \'(.*)\'\, .*\, .*\, \'(.*)\'\, \'\'\)$`), - updateRe: regexp.MustCompile(`^update .* set watermark\=\'(.*)\' where .* and table_id \= '(.*)'$`), - selectRe: regexp.MustCompile(`^select .* and table_id \= '(.*)'$`), - selectAllRe: regexp.MustCompile(`^select .* where account_id \= (.*) and task_id .*`), + mp: make(map[string]string), + // matches[1] = db_name, matches[2] = table_name, matches[3] = watermark + insertRe: regexp.MustCompile(`^insert .* values \(.*\, .*\, \'(.*)\'\, \'(.*)\'\, \'(.*)\'\, \'\'\)$`), + updateRe: regexp.MustCompile(`^update .* set watermark\=\'(.*)\' where .* and db_name \= '(.*)' and table_name \= '(.*)'$`), + selectRe: regexp.MustCompile(`^select .* and db_name \= '(.*)' and table_name \= '(.*)'$`), } } func (m *wmMockSQLExecutor) Exec(_ context.Context, sql string, _ ie.SessionOverrideOptions) error { if strings.HasPrefix(sql, "insert") { matches := m.insertRe.FindStringSubmatch(sql) - m.mp[matches[1]] = matches[2] + m.mp[GenDbTblKey(matches[1], matches[2])] = matches[3] } else if strings.HasPrefix(sql, "update mo_catalog.mo_cdc_watermark set err_msg") { // do nothing } else if strings.HasPrefix(sql, "update") { matches := m.updateRe.FindStringSubmatch(sql) - m.mp[matches[2]] = matches[1] + m.mp[GenDbTblKey(matches[2], matches[3])] = matches[1] } else if strings.HasPrefix(sql, "delete") { if strings.Contains(sql, "table_id") { - delete(m.mp, "1_0") + delete(m.mp, "db1.t1") } else { m.mp = make(map[string]string) } @@ -120,27 +117,7 @@ func (res *internalExecResult) GetString(ctx context.Context, i uint64, j uint64 } func (m *wmMockSQLExecutor) Query(ctx context.Context, sql string, pts ie.SessionOverrideOptions) ie.InternalExecResult { - if strings.HasPrefix(sql, "select table_id") { - matches := m.selectAllRe.FindStringSubmatch(sql) - accountId, _ := strconv.Atoi(matches[1]) - if accountId == 1 { // normal path - var data [][]interface{} - for k, v := range m.mp { - data = append(data, []interface{}{k, v}) - } - return &internalExecResult{ - affectedRows: 1, - resultSet: &MysqlResultSet{ - Columns: nil, - Name2Index: nil, - Data: data, - }, - err: nil, - } - } else { // error path - return &internalExecResult{err: moerr.NewInternalErrorNoCtx("error")} - } - } else if strings.HasPrefix(sql, "select") { + if strings.HasPrefix(sql, "select") { matches := m.selectRe.FindStringSubmatch(sql) return &internalExecResult{ affectedRows: 1, @@ -148,13 +125,12 @@ func (m *wmMockSQLExecutor) Query(ctx context.Context, sql string, pts ie.Sessio Columns: nil, Name2Index: nil, Data: [][]interface{}{ - {m.mp[matches[1]]}, + {m.mp[GenDbTblKey(matches[1], matches[2])]}, }, }, err: nil, } } - return nil } @@ -165,7 +141,7 @@ func TestNewWatermarkUpdater(t *testing.T) { require.NoError(t, err) type args struct { - accountId uint64 + accountId uint32 taskId string ie ie.InternalExecutor } @@ -205,12 +181,12 @@ func TestWatermarkUpdater_MemOps(t *testing.T) { } t1 := types.BuildTS(1, 1) - u.UpdateMem("1_0", t1) - actual := u.GetFromMem("1_0") + u.UpdateMem("db1", "t1", t1) + actual := u.GetFromMem("db1", "t1") assert.Equal(t, t1, actual) - u.DeleteFromMem("1_0") - actual = u.GetFromMem("1_0") + u.DeleteFromMem("db1", "t1") + actual = u.GetFromMem("db1", "t1") assert.Equal(t, types.TS{}, actual) } @@ -222,69 +198,49 @@ func TestWatermarkUpdater_DbOps(t *testing.T) { watermarkMap: &sync.Map{}, } - // ---------- init count is 0 - mp, err := u.GetAllFromDb() - assert.NoError(t, err) - assert.Equal(t, 0, len(mp)) - // ---------- insert into a record t1 := types.BuildTS(1, 1) info1 := &DbTableInfo{ - SourceTblIdStr: "1_0", - SourceDbName: "dbName", - SourceTblName: "tblName", + SourceDbName: "db1", + SourceTblName: "t1", } - err = u.InsertIntoDb(info1, t1) - assert.NoError(t, err) - // count is 1 - mp, err = u.GetAllFromDb() + err := u.InsertIntoDb(info1, t1) assert.NoError(t, err) - assert.Equal(t, 1, len(mp)) // get value of tableId 1 - actual, err := u.GetFromDb("1_0") + actual, err := u.GetFromDb("db1", "t1") assert.NoError(t, err) assert.Equal(t, t1, actual) // ---------- update t1 -> t2 t2 := types.BuildTS(2, 1) - err = u.flush("1_0", t2) + err = u.flush("db1.t1", t2) assert.NoError(t, err) // value is t2 - actual, err = u.GetFromDb("1_0") + actual, err = u.GetFromDb("db1", "t1") assert.NoError(t, err) assert.Equal(t, t2, actual) + // ---------- delete tableId 1 + err = u.DeleteFromDb("db1", "t1") + assert.NoError(t, err) + // ---------- insert more records info2 := &DbTableInfo{ - SourceTblIdStr: "2_0", - SourceDbName: "dbName", - SourceTblName: "tblName", + SourceDbName: "db2", + SourceTblName: "t2", } err = u.InsertIntoDb(info2, t1) assert.NoError(t, err) info3 := &DbTableInfo{ - SourceTblIdStr: "3_0", - SourceDbName: "dbName", - SourceTblName: "tblName", + SourceDbName: "db3", + SourceTblName: "t3", } err = u.InsertIntoDb(info3, t1) assert.NoError(t, err) - // ---------- delete tableId 1 - err = u.DeleteFromDb("1_0") - assert.NoError(t, err) - // count is 2 - mp, err = u.GetAllFromDb() - assert.NoError(t, err) - assert.Equal(t, 2, len(mp)) - // ---------- delete all err = u.DeleteAllFromDb() assert.NoError(t, err) - // count is 0 - mp, err = u.GetAllFromDb() - assert.NoError(t, err) - assert.Equal(t, 0, len(mp)) } func TestWatermarkUpdater_Run(t *testing.T) { @@ -311,50 +267,37 @@ func TestWatermarkUpdater_flushAll(t *testing.T) { t1 := types.BuildTS(1, 1) info1 := &DbTableInfo{ - SourceTblIdStr: "1_0", - SourceDbName: "dbName", - SourceTblName: "tblName", + SourceDbName: "db1", + SourceTblName: "t1", } err := u.InsertIntoDb(info1, t1) assert.NoError(t, err) info2 := &DbTableInfo{ - SourceTblIdStr: "1_0", - SourceDbName: "dbName", - SourceTblName: "tblName", + SourceDbName: "db2", + SourceTblName: "t2", } err = u.InsertIntoDb(info2, t1) assert.NoError(t, err) info3 := &DbTableInfo{ - SourceTblIdStr: "1_0", - SourceDbName: "dbName", - SourceTblName: "tblName", + SourceDbName: "db3", + SourceTblName: "t3", } err = u.InsertIntoDb(info3, t1) assert.NoError(t, err) t2 := types.BuildTS(2, 1) - u.UpdateMem("1_0", t2) - u.UpdateMem("2_0", t2) - u.UpdateMem("3_0", t2) + u.UpdateMem("db1", "t1", t2) + u.UpdateMem("db2", "t2", t2) + u.UpdateMem("db3", "t3", t2) u.flushAll() - actual, err := u.GetFromDb("1_0") + actual, err := u.GetFromDb("db1", "t1") assert.NoError(t, err) assert.Equal(t, t2, actual) - actual, err = u.GetFromDb("2_0") + actual, err = u.GetFromDb("db2", "t2") assert.NoError(t, err) assert.Equal(t, t2, actual) - actual, err = u.GetFromDb("3_0") + actual, err = u.GetFromDb("db3", "t3") assert.NoError(t, err) assert.Equal(t, t2, actual) } - -func TestWatermarkUpdater_GetAllFromDb(t *testing.T) { - u := &WatermarkUpdater{ - accountId: 2, - taskId: uuid.New(), - ie: newWmMockSQLExecutor(), - } - _, err := u.GetAllFromDb() - assert.Error(t, err) -} diff --git a/pkg/common/moerr/error.go b/pkg/common/moerr/error.go index 7d945d68057d5..77f3b02131d4c 100644 --- a/pkg/common/moerr/error.go +++ b/pkg/common/moerr/error.go @@ -291,7 +291,8 @@ const ( ErrTooLargeObjectSize uint16 = 22001 // Group 13: CDC - ErrStaleRead uint16 = 22101 + ErrStaleRead uint16 = 22101 + ErrNoWatermarkFound uint16 = 22102 // ErrEnd, the max value of MOErrorCode ErrEnd uint16 = 65535 @@ -527,7 +528,9 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{ ErrTooLargeObjectSize: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "objectio: too large object size %d"}, // Group 13: CDC - ErrStaleRead: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "CDC handle: stale read, min TS is %v, receive %v"}, + ErrStaleRead: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "CDC handle: stale read, min TS is %v, receive %v"}, + ErrNoWatermarkFound: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "CDC task: no watermark found of table %s.%s"}, + // Group End: max value of MOErrorCode ErrEnd: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "internal error: end of errcode code"}, } diff --git a/pkg/common/moerr/error_no_ctx.go b/pkg/common/moerr/error_no_ctx.go index 42685213b5061..036bd0fe03668 100644 --- a/pkg/common/moerr/error_no_ctx.go +++ b/pkg/common/moerr/error_no_ctx.go @@ -442,6 +442,10 @@ func NewErrStaleReadNoCtx(minTS, start string) *Error { return newError(Context(), ErrStaleRead, minTS, start) } +func NewErrNoWatermarkFoundNoCtx(dbName, tblName string) *Error { + return newError(Context(), ErrNoWatermarkFound, dbName, tblName) +} + func NewArenaFullNoCtx() *Error { return newError(Context(), ErrArenaFull) } diff --git a/pkg/frontend/cdc.go b/pkg/frontend/cdc.go index 02085c7e3a448..0247877300450 100644 --- a/pkg/frontend/cdc.go +++ b/pkg/frontend/cdc.go @@ -19,9 +19,10 @@ import ( "database/sql" "encoding/json" "fmt" - "math" + "regexp" "strconv" "strings" + "sync" "time" "github.com/google/uuid" @@ -287,67 +288,33 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err cdcTaskOptionsMap[create.Option[i]] = create.Option[i+1] } - cdcLevel := cdcTaskOptionsMap["Level"] - cdcAccount := cdcTaskOptionsMap["Account"] - - if cdcLevel != cdc2.AccountLevel { - return moerr.NewInternalError(ctx, "invalid level. only support account level in 1.3") - } - - if cdcAccount != ses.GetTenantInfo().GetTenant() { - return moerr.NewInternalErrorf(ctx, "invalid account. account must be %s", ses.GetTenantInfo().GetTenant()) - } - - //////////// - //!!!NOTE!!! - //1.3 - // level: account level - // account: must be designated. - /////////// - - //step 1 : handle tables - tablePts, err := preprocessTables( - ctx, - ses, - cdcLevel, - cdcAccount, - create.Tables, - ) - if err != nil { - return err + // step 1: handle tables + level := cdcTaskOptionsMap["Level"] + if level != cdc2.AccountLevel && level != cdc2.DbLevel && level != cdc2.TableLevel { + return moerr.NewInternalErrorf(ctx, "invalid level: %s", level) } - - //step 2: handle filters - //There must be no special characters (',' '.' ':' '`') in the single rule. - filters := cdcTaskOptionsMap["Rules"] - - jsonFilters, filterPts, err := preprocessRules(ctx, filters) + tablePts, err := getPatternTuples(ctx, level, create.Tables) if err != nil { return err } - - err = attachAccountToFilters(ctx, ses, cdcLevel, cdcAccount, filterPts) + jsonTables, err := cdc2.JsonEncode(tablePts) if err != nil { - return err + return } - //TODO: refine it after 1.3 - //check table be filtered or not - if filterTable(tablePts, filterPts) == 0 { - return moerr.NewInternalError(ctx, "all tables has been excluded by filters. create cdc failed.") + // step 2: handle exclude (regular expression) + exclude := cdcTaskOptionsMap["Exclude"] + if _, err = regexp.Compile(exclude); err != nil { + return moerr.NewInternalErrorf(ctx, "invalid exclude expression: %s, err: %v", exclude, err) } - dat := time.Now().UTC() - - creatorAccInfo := ses.GetTenantInfo() - cdcId, _ := uuid.NewV7() - - //step 4: check uri format and strip password + //step 4: check source uri format and strip password jsonSrcUri, _, err := extractUriInfo(ctx, create.SourceUri, cdc2.SourceUriPrefix) if err != nil { return err } + //step 5: check sink uri format, strip password and check connection sinkType := strings.ToLower(create.SinkType) useConsole := false if cdc2.EnableConsoleSink && sinkType == cdc2.ConsoleSink { @@ -358,7 +325,6 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err return moerr.NewInternalErrorf(ctx, "unsupported sink type: %s", create.SinkType) } - //step 5: check downstream connectivity jsonSinkUri, sinkUriInfo, err := extractUriInfo(ctx, create.SinkUri, cdc2.SinkUriPrefix) if err != nil { return @@ -368,22 +334,27 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err return } - noFull := false - if cdcTaskOptionsMap["NoFull"] == "true" { - noFull = true - } + //step 6: no full + noFull, _ := strconv.ParseBool(cdcTaskOptionsMap["NoFull"]) + //step 7: additionalConfig additionalConfig := make(map[string]any) + + // InitSnapshotSplitTxn additionalConfig[cdc2.InitSnapshotSplitTxn] = cdc2.DefaultInitSnapshotSplitTxn if cdcTaskOptionsMap[cdc2.InitSnapshotSplitTxn] == "false" { additionalConfig[cdc2.InitSnapshotSplitTxn] = false } + + // MaxSqlLength additionalConfig[cdc2.MaxSqlLength] = cdc2.DefaultMaxSqlLength if val, ok := cdcTaskOptionsMap[cdc2.MaxSqlLength]; ok { if additionalConfig[cdc2.MaxSqlLength], err = strconv.ParseUint(val, 10, 64); err != nil { return } } + + // SendSqlTimeout additionalConfig[cdc2.SendSqlTimeout] = cdc2.DefaultSendSqlTimeout if val, ok := cdcTaskOptionsMap[cdc2.SendSqlTimeout]; ok { // check duration format @@ -393,24 +364,30 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err additionalConfig[cdc2.SendSqlTimeout] = val } + // marshal additionalConfigBytes, err := json.Marshal(additionalConfig) if err != nil { return err } + //step 8: details + accountInfo := ses.GetTenantInfo() + accountId := accountInfo.GetTenantID() + accountName := accountInfo.GetTenant() + cdcId, _ := uuid.NewV7() details := &task.Details{ //account info that create cdc - AccountID: creatorAccInfo.GetTenantID(), - Account: creatorAccInfo.GetTenant(), - Username: creatorAccInfo.GetUser(), + AccountID: accountId, + Account: accountName, + Username: accountInfo.GetUser(), Details: &task.Details_CreateCdc{ CreateCdc: &task.CreateCdcDetails{ TaskName: create.TaskName.String(), TaskId: cdcId.String(), Accounts: []*task.Account{ { - Id: uint64(creatorAccInfo.GetTenantID()), - Name: cdcAccount, + Id: uint64(accountId), + Name: accountName, }, }, }, @@ -418,21 +395,6 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err } addCdcTaskCallback := func(ctx context.Context, tx taskservice.SqlExecutor) (ret int, err error) { - err = checkAccounts(ctx, tx, tablePts, filterPts) - if err != nil { - return 0, err - } - - ret, err = checkTables(ctx, tx, tablePts, filterPts) - if err != nil { - return - } - - jsonTables, err := cdc2.JsonEncode(tablePts) - if err != nil { - return 0, err - } - var encodedSinkPwd string if !useConsole { // TODO replace with creatorAccountId @@ -445,9 +407,11 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err } } + exclude = strings.ReplaceAll(exclude, "\\", "\\\\") + //step 5: create daemon task insertSql := getSqlForNewCdcTask( - uint64(creatorAccInfo.GetTenantID()), //the account_id of cdc creator + uint64(accountId), //the account_id of cdc creator cdcId, create.TaskName.String(), jsonSrcUri, //json bytes @@ -459,14 +423,14 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err "", "", jsonTables, - jsonFilters, + exclude, "", cdc2.SASCommon, cdc2.SASCommon, "", //1.3 does not support startTs "", //1.3 does not support endTs cdcTaskOptionsMap["ConfigFile"], - dat, + time.Now().UTC(), CdcRunning, 0, noFull, @@ -506,114 +470,6 @@ func cdcTaskMetadata(cdcId string) task.TaskMetadata { } } -// checkAccounts checks the accounts exists or not -func checkAccounts(ctx context.Context, tx taskservice.SqlExecutor, tablePts, filterPts *cdc2.PatternTuples) error { - //step1 : collect accounts - accounts := make(map[string]uint64) - for _, pt := range tablePts.Pts { - if pt == nil || pt.Source.Account == "" { - continue - } - accounts[pt.Source.Account] = math.MaxUint64 - } - - for _, pt := range filterPts.Pts { - if pt == nil || pt.Source.Account == "" { - continue - } - accounts[pt.Source.Account] = math.MaxUint64 - } - - //step2 : collect account id - //after this step, all account has accountid - res := make(map[string]uint64) - for acc := range accounts { - exists, accId, err := checkAccountExists(ctx, tx, acc) - if err != nil { - return err - } - if !exists { - return moerr.NewInternalErrorf(ctx, "account %s does not exist", acc) - } - res[acc] = accId - } - - //step3: attach accountId - for _, pt := range tablePts.Pts { - if pt == nil || pt.Source.Account == "" { - continue - } - pt.Source.AccountId = res[pt.Source.Account] - } - - for _, pt := range filterPts.Pts { - if pt == nil || pt.Source.Account == "" { - continue - } - pt.Source.AccountId = res[pt.Source.Account] - } - - return nil -} - -// checkTables -// checks the table existed or not -// checks the table having the primary key -// filters the table -func checkTables(ctx context.Context, tx taskservice.SqlExecutor, tablePts, filterPts *cdc2.PatternTuples) (int, error) { - var err error - var found bool - var hasPrimaryKey bool - for _, pt := range tablePts.Pts { - if pt == nil { - continue - } - - //skip tables that is filtered - if needSkipThisTable(pt.Source.Account, pt.Source.Database, pt.Source.Table, filterPts) { - continue - } - - //check tables exists or not and filter the table - found, err = checkTableExists(ctx, tx, pt.Source.AccountId, pt.Source.Database, pt.Source.Table) - if err != nil { - return 0, err - } - if !found { - return 0, moerr.NewInternalErrorf(ctx, "no table %s:%s", pt.Source.Database, pt.Source.Table) - } - - //check table has primary key - hasPrimaryKey, err = checkPrimaryKey(ctx, tx, pt.Source.AccountId, pt.Source.Database, pt.Source.Table) - if err != nil { - return 0, err - } - if !hasPrimaryKey { - return 0, moerr.NewInternalErrorf(ctx, "table %s:%s does not have primary key", pt.Source.Database, pt.Source.Table) - } - } - return 0, err -} - -// filterTable checks the table filtered or not -// returns the count of tables that not be filtered -func filterTable(tablePts, filterPts *cdc2.PatternTuples) int { - //check table be filtered or not - leftCount := 0 - for _, pt := range tablePts.Pts { - if pt == nil { - continue - } - - //skip tables that is filtered - if needSkipThisTable(pt.Source.Account, pt.Source.Database, pt.Source.Table, filterPts) { - continue - } - leftCount++ - } - return leftCount -} - func queryTable( ctx context.Context, tx taskservice.SqlExecutor, @@ -645,299 +501,105 @@ func queryTable( return false, nil } -func checkAccountExists(ctx context.Context, tx taskservice.SqlExecutor, account string) (bool, uint64, error) { - checkSql := getSqlForCheckAccount(account) - var err error - var ret bool - var accountId uint64 - ret, err = queryTable(ctx, tx, checkSql, func(ctx context.Context, rows *sql.Rows) (bool, error) { - accountId = 0 - if err = rows.Scan(&accountId); err != nil { - return false, err - } - return true, nil - }) - - return ret, accountId, err -} - -func checkTableExists(ctx context.Context, tx taskservice.SqlExecutor, accountId uint64, db, table string) (bool, error) { - //select from mo_tables - checkSql := getSqlForGetTable(accountId, db, table) - var err error - var ret bool - var tableId uint64 - ret, err = queryTable(ctx, tx, checkSql, func(ctx context.Context, rows *sql.Rows) (bool, error) { - tableId = 0 - if err = rows.Scan(&tableId); err != nil { - return false, err - } - return true, nil - }) - - return ret, err -} - -func checkPrimaryKey(ctx context.Context, tx taskservice.SqlExecutor, accountId uint64, db, table string) (bool, error) { - checkSql := getSqlForGetPkCount(accountId, db, table) - var ret bool - var err error - var pkCount uint64 - - ret, err = queryTable(ctx, tx, checkSql, func(ctx context.Context, rows *sql.Rows) (bool, error) { - pkCount = 0 - - if err = rows.Scan( - &pkCount, - ); err != nil { - return false, err - } - if pkCount > 0 { - return true, nil - } - return false, nil - }) - - return ret, err -} - -// extractTablePair -// extract source:sink pair from the pattern +// getPatternTuple pattern example: // -// There must be no special characters (',' '.' ':' '`') in account name & database name & table name. -func extractTablePair(ctx context.Context, pattern string, defaultAcc string) (*cdc2.PatternTuple, error) { - var err error - pattern = strings.TrimSpace(pattern) - //step1 : split table pair by ':' => table0 table1 - //step2 : split table0/1 by '.' => account database table - //step3 : check table accord with regular expression - pt := &cdc2.PatternTuple{OriginString: pattern} - if strings.Contains(pattern, ":") { - //Format: account.db.table:db.table - splitRes := strings.Split(pattern, ":") - if len(splitRes) != 2 { - return nil, moerr.NewInternalErrorf(ctx, "invalid table format: %s, must be `source:sink`.", pattern) - } - - //handle source part - pt.Source.Account, pt.Source.Database, pt.Source.Table, pt.Source.TableIsRegexp, err = extractTableInfo(ctx, splitRes[0], false) - if err != nil { - return nil, err - } - if pt.Source.Account == "" { - pt.Source.Account = defaultAcc - } - - //handle sink part - pt.Sink.Account, pt.Sink.Database, pt.Sink.Table, pt.Sink.TableIsRegexp, err = extractTableInfo(ctx, splitRes[1], false) - if err != nil { - return nil, err - } - if pt.Sink.Account == "" { - pt.Sink.Account = defaultAcc - } - return pt, nil +// db1 +// db1:db2 +// db1.t1 +// db1.t1:db2.t2 +// +// There must be no special characters (',' '.' ':' '`') in database name & table name. +func getPatternTuple(ctx context.Context, level string, pattern string, dup map[string]struct{}) (pt *cdc2.PatternTuple, err error) { + splitRes := strings.Split(strings.TrimSpace(pattern), ":") + if len(splitRes) > 2 { + err = moerr.NewInternalErrorf(ctx, "invalid pattern format: %s, must be `source` or `source:sink`.", pattern) + return } - //Format: account.db.table - //handle source part only - pt.Source.Account, pt.Source.Database, pt.Source.Table, pt.Source.TableIsRegexp, err = extractTableInfo(ctx, pattern, false) - if err != nil { - return nil, err + pt = &cdc2.PatternTuple{OriginString: pattern} + + // handle source part + if pt.Source.Database, pt.Source.Table, err = extractTableInfo(ctx, splitRes[0], level); err != nil { + return } - if pt.Source.Account == "" { - pt.Source.Account = defaultAcc + key := cdc2.GenDbTblKey(pt.Source.Database, pt.Source.Table) + if _, ok := dup[key]; ok { + err = moerr.NewInternalErrorf(ctx, "one db/table: %s can't be used as multi sources in a cdc task", key) + return } + dup[key] = struct{}{} - pt.Sink.AccountId = pt.Source.AccountId - pt.Sink.Account = pt.Source.Account - pt.Sink.Database = pt.Source.Database - pt.Sink.Table = pt.Source.Table - pt.Sink.TableIsRegexp = pt.Source.TableIsRegexp - return pt, nil + // handle sink part + if len(splitRes) > 1 { + if pt.Sink.Database, pt.Sink.Table, err = extractTableInfo(ctx, splitRes[1], level); err != nil { + return + } + } else { + // if not specify sink, then sink = source + pt.Sink.Database = pt.Source.Database + pt.Sink.Table = pt.Source.Table + } + return } -// extractTableInfo -// get account,database,table info from string +// extractTableInfo get account,database,table info from string +// +// input format: // -// account: may be empty -// database: must be concrete name instead of pattern. -// table: must be concrete name or pattern in the source part. must be concrete name in the destination part -// isRegexpTable: table name is regular expression -// !!!NOTE!!! +// DbLevel: database +// TableLevel: database.table // -// There must be no special characters (',' '.' ':' '`') in account name & database name & table name. -func extractTableInfo(ctx context.Context, input string, mustBeConcreteTable bool) (account string, db string, table string, isRegexpTable bool, err error) { +// There must be no special characters (',' '.' ':' '`') in database name & table name. +func extractTableInfo(ctx context.Context, input string, level string) (db string, table string, err error) { parts := strings.Split(strings.TrimSpace(input), ".") - if len(parts) != 2 && len(parts) != 3 { - err = moerr.NewInternalErrorf(ctx, "invalid table format: %s, needs account.database.table or database.table.", input) + if level == cdc2.DbLevel && len(parts) != 1 { + err = moerr.NewInternalErrorf(ctx, "invalid databases format: %s", input) + return + } else if level == cdc2.TableLevel && len(parts) != 2 { + err = moerr.NewInternalErrorf(ctx, "invalid tables format: %s", input) return } - if len(parts) == 2 { - db, table = strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) - } else { - account, db, table = strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), strings.TrimSpace(parts[2]) - - if !accountNameIsLegal(account) { - err = moerr.NewInternalErrorf(ctx, "invalid account name: %s", account) - return - } - } - + db = strings.TrimSpace(parts[0]) if !dbNameIsLegal(db) { err = moerr.NewInternalErrorf(ctx, "invalid database name: %s", db) return } - if !tableNameIsLegal(table) { - err = moerr.NewInternalErrorf(ctx, "invalid table name: %s", table) - return - } - - return -} - -// preprocessTables extract tables and serialize them -func preprocessTables( - ctx context.Context, - ses *Session, - level string, - account string, - tables string, -) (*cdc2.PatternTuples, error) { - tablesPts, err := extractTablePairs(ctx, tables, account) - if err != nil { - return nil, err - } - - //step 2: check privilege - if err = canCreateCdcTask(ctx, ses, level, account, tablesPts); err != nil { - return nil, err - } - - return tablesPts, nil -} - -/* -extractTablePairs extracts all source:sink pairs from the pattern -There must be no special characters (',' '.' ':' '`') in account name & database name & table name. -*/ -func extractTablePairs(ctx context.Context, pattern string, defaultAcc string) (*cdc2.PatternTuples, error) { - pattern = strings.TrimSpace(pattern) - pts := &cdc2.PatternTuples{} - - tablePairs := strings.Split(pattern, ",") - - //step1 : split pattern by ',' => table pair - for _, pair := range tablePairs { - pt, err := extractTablePair(ctx, pair, defaultAcc) - if err != nil { - return nil, err + if level == cdc2.TableLevel { + table = strings.TrimSpace(parts[1]) + if !tableNameIsLegal(table) { + err = moerr.NewInternalErrorf(ctx, "invalid table name: %s", table) + return } - pts.Append(pt) + } else { + table = cdc2.MatchAll } - - return pts, nil + return } -func preprocessRules(ctx context.Context, rules string) (string, *cdc2.PatternTuples, error) { - pts, err := extractRules(ctx, rules) - if err != nil { - return "", nil, err - } - - jsonPts, err := cdc2.JsonEncode(pts) - if err != nil { - return "", nil, err - } - return jsonPts, pts, nil -} +func getPatternTuples(ctx context.Context, level string, tables string) (pts *cdc2.PatternTuples, err error) { + pts = &cdc2.PatternTuples{} -/* -extractRules extracts filters -pattern maybe empty string. then, it returns empty PatternTuples -There must be no special characters (',' '.' ':' '`') in account name & database name & table name. -*/ -func extractRules(ctx context.Context, pattern string) (*cdc2.PatternTuples, error) { - pattern = strings.TrimSpace(pattern) - pts := &cdc2.PatternTuples{} - if len(pattern) == 0 { - return pts, nil + if level == cdc2.AccountLevel { + pts.Append(&cdc2.PatternTuple{ + Source: cdc2.PatternTable{Database: cdc2.MatchAll, Table: cdc2.MatchAll}, + Sink: cdc2.PatternTable{Database: cdc2.MatchAll, Table: cdc2.MatchAll}, + }) + return } - tablePairs := strings.Split(pattern, ",") - if len(tablePairs) == 0 { - return nil, moerr.NewInternalErrorf(ctx, "invalid pattern format: %s", pattern) - } - var err error - //step1 : split pattern by ',' => table pair + // split tables by ',' => table pair + var pt *cdc2.PatternTuple + tablePairs := strings.Split(strings.TrimSpace(tables), ",") + dup := make(map[string]struct{}) for _, pair := range tablePairs { - pt := &cdc2.PatternTuple{} - pt.Source.Account, pt.Source.Database, pt.Source.Table, pt.Source.TableIsRegexp, err = extractTableInfo(ctx, pair, false) - if err != nil { - return nil, err + if pt, err = getPatternTuple(ctx, level, pair, dup); err != nil { + return } pts.Append(pt) } - - return pts, nil -} - -func canCreateCdcTask(ctx context.Context, ses *Session, level string, account string, pts *cdc2.PatternTuples) error { - if strings.EqualFold(level, cdc2.ClusterLevel) { - if !ses.tenant.IsMoAdminRole() { - return moerr.NewInternalError(ctx, "Only sys account administrator are allowed to create cluster level task") - } - for _, pt := range pts.Pts { - if isBannedDatabase(pt.Source.Database) { - return moerr.NewInternalError(ctx, "The system database cannot be subscribed to") - } - } - } else if strings.EqualFold(level, cdc2.AccountLevel) { - if !ses.tenant.IsMoAdminRole() && ses.GetTenantName() != account { - return moerr.NewInternalErrorf(ctx, "No privilege to create task on %s", account) - } - for _, pt := range pts.Pts { - if account != pt.Source.Account { - return moerr.NewInternalErrorf(ctx, "No privilege to create task on table %s", pt.OriginString) - } - if isBannedDatabase(pt.Source.Database) { - return moerr.NewInternalError(ctx, "The system database cannot be subscribed to") - } - } - } else { - return moerr.NewInternalErrorf(ctx, "Incorrect level %s", level) - } - return nil -} - -func attachAccountToFilters(ctx context.Context, ses *Session, level string, account string, pts *cdc2.PatternTuples) error { - if strings.EqualFold(level, cdc2.ClusterLevel) { - if !ses.tenant.IsMoAdminRole() { - return moerr.NewInternalError(ctx, "Only sys account administrator are allowed to create cluster level task") - } - for _, pt := range pts.Pts { - if pt.Source.Account == "" { - pt.Source.Account = ses.GetTenantName() - } - } - } else if strings.EqualFold(level, cdc2.AccountLevel) { - if !ses.tenant.IsMoAdminRole() && ses.GetTenantName() != account { - return moerr.NewInternalErrorf(ctx, "No privilege to create task on %s", account) - } - for _, pt := range pts.Pts { - if pt.Source.Account == "" { - pt.Source.Account = account - } - if account != pt.Source.Account { - return moerr.NewInternalErrorf(ctx, "No privilege to create task on table %s", pt.OriginString) - } - } - } else { - return moerr.NewInternalErrorf(ctx, "Incorrect level %s", level) - } - return nil + return } func RegisterCdcExecutor( @@ -1001,16 +663,19 @@ type CdcTask struct { sinkUri cdc2.UriInfo tables cdc2.PatternTuples - filters cdc2.PatternTuples + exclude *regexp.Regexp startTs types.TS - noFull string + noFull bool additionalConfig map[string]interface{} activeRoutine *cdc2.ActiveRoutine - // sunkWatermarkUpdater update the watermark of the items that has been sunk to downstream - sunkWatermarkUpdater *cdc2.WatermarkUpdater - isRunning bool - holdCh chan int + // watermarkUpdater update the watermark of the items that has been sunk to downstream + watermarkUpdater cdc2.IWatermarkUpdater + // runningReaders store the running execute pipelines, map key pattern: db.table + runningReaders *sync.Map + + isRunning bool + holdCh chan int // start wrapper, for ut startFunc func(ctx context.Context) error @@ -1053,7 +718,11 @@ func NewCdcTask( } func (cdc *CdcTask) Start(rootCtx context.Context) (err error) { - logutil.Infof("cdc task %s start on cn %s", cdc.cdcTask.TaskName, cdc.cnUUID) + taskId := cdc.cdcTask.TaskId + taskName := cdc.cdcTask.TaskName + cnUUID := cdc.cnUUID + accountId := uint32(cdc.cdcTask.Accounts[0].GetId()) + logutil.Infof("cdc task %s start on cn %s", taskName, cnUUID) defer func() { if err != nil { @@ -1063,56 +732,35 @@ func (cdc *CdcTask) Start(rootCtx context.Context) (err error) { cdc.activeRoutine.CloseCancel() if updateErrMsgErr := cdc.updateErrMsg(rootCtx, err.Error()); updateErrMsgErr != nil { - logutil.Errorf("cdc task %s update err msg failed, err: %v", cdc.cdcTask.TaskName, updateErrMsgErr) + logutil.Errorf("cdc task %s update err msg failed, err: %v", taskName, updateErrMsgErr) } } - // if Resume/Restart successfully will reach here, do nothing + + cdc2.GetTableScanner(cnUUID).UnRegister(taskId) }() - ctx := defines.AttachAccountId(rootCtx, uint32(cdc.cdcTask.Accounts[0].GetId())) + ctx := defines.AttachAccountId(rootCtx, accountId) // get cdc task definition if err = cdc.retrieveCdcTask(ctx); err != nil { return err } - // check table be filtered or not - if filterTable(&cdc.tables, &cdc.filters) == 0 { - return moerr.NewInternalError(ctx, "all tables has been excluded by filters. start cdc failed.") - } - - // get source table id - var info *cdc2.DbTableInfo - dbTableInfos := make([]*cdc2.DbTableInfo, 0, len(cdc.tables.Pts)) - tblIdStrMap := make(map[string]bool) - for _, tuple := range cdc.tables.Pts { - accId, accName, dbName, tblName := tuple.Source.AccountId, tuple.Source.Account, tuple.Source.Database, tuple.Source.Table - if needSkipThisTable(accName, dbName, tblName, &cdc.filters) { - logutil.Infof("cdc skip table %s:%s by filter", dbName, tblName) - continue - } - // get db id, table id for the source table - info, err = cdc.retrieveTable(ctx, accId, accName, dbName, tblName, tblIdStrMap) - if err != nil { - return err - } + // reset runningReaders + cdc.runningReaders = &sync.Map{} - info.SinkAccountName = tuple.Sink.Account - info.SinkDbName = tuple.Sink.Database - info.SinkTblName = tuple.Sink.Table + // start watermarkUpdater + cdc.watermarkUpdater = cdc2.NewWatermarkUpdater(accountId, taskId, cdc.ie) + go cdc.watermarkUpdater.Run(ctx, cdc.activeRoutine) - dbTableInfos = append(dbTableInfos, info) - } - - if err = cdc.startWatermarkAndPipeline(ctx, dbTableInfos); err != nil { - return err - } + // register to table scanner + cdc2.GetTableScanner(cnUUID).Register(taskId, cdc.handleNewTables) cdc.isRunning = true - logutil.Infof("cdc task %s start on cn %s success", cdc.cdcTask.TaskName, cdc.cnUUID) + logutil.Infof("cdc task %s start on cn %s success", taskName, cnUUID) // start success, clear err msg if err = cdc.updateErrMsg(ctx, ""); err != nil { - logutil.Errorf("cdc task %s update err msg failed, err: %v", cdc.cdcTask.TaskName, err) + logutil.Errorf("cdc task %s update err msg failed, err: %v", taskName, err) } // hold @@ -1157,7 +805,7 @@ func (cdc *CdcTask) Restart() (err error) { }() // delete previous records - if err = cdc.sunkWatermarkUpdater.DeleteAllFromDb(); err != nil { + if err = cdc.watermarkUpdater.DeleteAllFromDb(); err != nil { return } go func() { @@ -1199,7 +847,7 @@ func (cdc *CdcTask) Cancel() (err error) { cdc.activeRoutine.CloseCancel() cdc.isRunning = false } - if err = cdc.sunkWatermarkUpdater.DeleteAllFromDb(); err != nil { + if err = cdc.watermarkUpdater.DeleteAllFromDb(); err != nil { return err } // let Start() go @@ -1207,70 +855,20 @@ func (cdc *CdcTask) Cancel() (err error) { return } -func (cdc *CdcTask) addExecPipelineForTable(info *cdc2.DbTableInfo, txnOp client.TxnOperator) error { - // reader --call--> sinker ----> remote db - ctx := defines.AttachAccountId(context.Background(), uint32(cdc.cdcTask.Accounts[0].GetId())) - - // add watermark to updater - watermark, err := cdc.sunkWatermarkUpdater.GetFromDb(info.SourceTblIdStr) - if err != nil { - return err - } - cdc.sunkWatermarkUpdater.UpdateMem(info.SourceTblIdStr, watermark) - - tableDef, err := cdc2.GetTableDef(ctx, txnOp, cdc.cnEngine, info.SourceTblId) - if err != nil { - return err - } - - // make sinker for table - sinker, err := cdc2.NewSinker( - cdc.sinkUri, - info, - cdc.sunkWatermarkUpdater, - tableDef, - cdc2.DefaultRetryTimes, - cdc2.DefaultRetryDuration, - cdc.activeRoutine, - uint64(cdc.additionalConfig[cdc2.MaxSqlLength].(float64)), - cdc.additionalConfig[cdc2.SendSqlTimeout].(string), - ) - if err != nil { - return err - } - go sinker.Run(ctx, cdc.activeRoutine) - - // make reader - reader := cdc2.NewTableReader( - cdc.cnTxnClient, - cdc.cnEngine, - cdc.mp, - cdc.packerPool, - info, - sinker, - cdc.sunkWatermarkUpdater, - tableDef, - cdc.resetWatermarkForTable, - cdc.additionalConfig[cdc2.InitSnapshotSplitTxn].(bool), - ) - go reader.Run(ctx, cdc.activeRoutine) - - return nil -} - func (cdc *CdcTask) resetWatermarkForTable(info *cdc2.DbTableInfo) (err error) { - tblIdStr := info.SourceTblIdStr + dbName, tblName := info.SourceDbName, info.SourceTblName // delete old watermark of table - cdc.sunkWatermarkUpdater.DeleteFromMem(tblIdStr) - if err = cdc.sunkWatermarkUpdater.DeleteFromDb(tblIdStr); err != nil { + cdc.watermarkUpdater.DeleteFromMem(dbName, tblName) + if err = cdc.watermarkUpdater.DeleteFromDb(dbName, tblName); err != nil { return } // use start_ts as init watermark - if err = cdc.sunkWatermarkUpdater.InsertIntoDb(info, cdc.startTs); err != nil { + // TODO handle no_full + if err = cdc.watermarkUpdater.InsertIntoDb(info, cdc.startTs); err != nil { return } - cdc.sunkWatermarkUpdater.UpdateMem(tblIdStr, cdc.startTs) + cdc.watermarkUpdater.UpdateMem(dbName, tblName, cdc.startTs) return } @@ -1311,54 +909,133 @@ func (cdc *CdcTask) updateErrMsg(ctx context.Context, errMsg string) (err error) return cdc.ie.Exec(defines.AttachAccountId(ctx, catalog.System_Account), sql, ie.SessionOverrideOptions{}) } -func (cdc *CdcTask) startWatermarkAndPipeline(ctx context.Context, dbTableInfos []*cdc2.DbTableInfo) (err error) { - var info *cdc2.DbTableInfo - txnOp, err := cdc2.GetTxnOp(ctx, cdc.cnEngine, cdc.cnTxnClient, "cdc-startWatermarkAndPipeline") +func (cdc *CdcTask) handleNewTables(allAccountTbls map[uint32]cdc2.TblMap) { + accountId := uint32(cdc.cdcTask.Accounts[0].GetId()) + ctx := defines.AttachAccountId(context.Background(), accountId) + + txnOp, err := cdc2.GetTxnOp(ctx, cdc.cnEngine, cdc.cnTxnClient, "cdc-handleNewTables") if err != nil { - return err + logutil.Errorf("cdc task %s get txn op failed, err: %v", cdc.cdcTask.TaskName, err) } defer func() { cdc2.FinishTxnOp(ctx, err, txnOp, cdc.cnEngine) }() if err = cdc.cnEngine.New(ctx, txnOp); err != nil { - return err + logutil.Errorf("cdc task %s new engine failed, err: %v", cdc.cdcTask.TaskName, err) } - if cdc.noFull == "true" { - cdc.startTs = types.TimestampToTS(txnOp.SnapshotTS()) - } + for key, info := range allAccountTbls[accountId] { + // already running + if _, ok := cdc.runningReaders.Load(key); ok { + continue + } - // start watermark updater - cdc.sunkWatermarkUpdater = cdc2.NewWatermarkUpdater(cdc.cdcTask.Accounts[0].GetId(), cdc.cdcTask.TaskId, cdc.ie) + if !cdc.matchAnyPattern(key, info) { + continue + } - mp, err := cdc.sunkWatermarkUpdater.GetAllFromDb() - if err != nil { - return err + if cdc.exclude != nil && cdc.exclude.MatchString(key) { + continue + } + + logutil.Infof("cdc task find new table: %s", info) + if err = cdc.addExecPipelineForTable(ctx, info, txnOp); err != nil { + logutil.Errorf("cdc task %s add exec pipeline for table %s failed, err: %v", cdc.cdcTask.TaskName, key, err) + } else { + logutil.Infof("cdc task %s add exec pipeline for table %s successfully", cdc.cdcTask.TaskName, key) + } } - for _, info = range dbTableInfos { - // insert if not exists - if _, ok := mp[info.SourceTblIdStr]; !ok { - // use startTs as watermark - if err = cdc.sunkWatermarkUpdater.InsertIntoDb(info, cdc.startTs); err != nil { - return - } +} + +func (cdc *CdcTask) matchAnyPattern(key string, info *cdc2.DbTableInfo) bool { + match := func(s, p string) bool { + if p == cdc2.MatchAll { + return true } - delete(mp, info.SourceTblIdStr) + return s == p } - // delete outdated watermark - for tableIdStr := range mp { - if err = cdc.sunkWatermarkUpdater.DeleteFromDb(tableIdStr); err != nil { - return err + + db, table := cdc2.SplitDbTblKey(key) + for _, pt := range cdc.tables.Pts { + if match(db, pt.Source.Database) && match(table, pt.Source.Table) { + // complete sink info + info.SinkDbName = pt.Sink.Database + if info.SinkDbName == cdc2.MatchAll { + info.SinkDbName = db + } + info.SinkTblName = pt.Sink.Table + if info.SinkTblName == cdc2.MatchAll { + info.SinkTblName = table + } + return true } } - go cdc.sunkWatermarkUpdater.Run(ctx, cdc.activeRoutine) + return false +} - // create exec pipelines - for _, info = range dbTableInfos { - if err = cdc.addExecPipelineForTable(info, txnOp); err != nil { +// reader ----> sinker ----> remote db +func (cdc *CdcTask) addExecPipelineForTable(ctx context.Context, info *cdc2.DbTableInfo, txnOp client.TxnOperator) (err error) { + // step 1. init watermarkUpdater + // get watermark from db + watermark, err := cdc.watermarkUpdater.GetFromDb(info.SourceDbName, info.SourceTblName) + if moerr.IsMoErrCode(err, moerr.ErrNoWatermarkFound) { + // add watermark into db if not exists + watermark = cdc.startTs + if cdc.noFull { + watermark = types.TimestampToTS(txnOp.SnapshotTS()) + } + + if err = cdc.watermarkUpdater.InsertIntoDb(info, watermark); err != nil { return } + } else if err != nil { + return + } + // clear err msg + if err = cdc.watermarkUpdater.SaveErrMsg(info.SourceDbName, info.SourceTblName, ""); err != nil { + return + } + // add watermark into memory + cdc.watermarkUpdater.UpdateMem(info.SourceDbName, info.SourceTblName, watermark) + + tableDef, err := cdc2.GetTableDef(ctx, txnOp, cdc.cnEngine, info.SourceTblId) + if err != nil { + return } + + // step 2. new sinker + sinker, err := cdc2.NewSinker( + cdc.sinkUri, + info, + cdc.watermarkUpdater, + tableDef, + cdc2.DefaultRetryTimes, + cdc2.DefaultRetryDuration, + cdc.activeRoutine, + uint64(cdc.additionalConfig[cdc2.MaxSqlLength].(float64)), + cdc.additionalConfig[cdc2.SendSqlTimeout].(string), + ) + if err != nil { + return err + } + go sinker.Run(ctx, cdc.activeRoutine) + + // step 3. new reader + reader := cdc2.NewTableReader( + cdc.cnTxnClient, + cdc.cnEngine, + cdc.mp, + cdc.packerPool, + info, + sinker, + cdc.watermarkUpdater, + tableDef, + cdc.resetWatermarkForTable, + cdc.additionalConfig[cdc2.InitSnapshotSplitTxn].(bool), + cdc.runningReaders, + ) + go reader.Run(ctx, cdc.activeRoutine) + return } @@ -1385,7 +1062,6 @@ func (cdc *CdcTask) retrieveCdcTask(ctx context.Context) error { return err } - var sinkPwd string if sinkTyp != cdc2.ConsoleSink { //sink uri jsonSinkUri, err := res.GetString(ctx, 0, 0) @@ -1393,13 +1069,12 @@ func (cdc *CdcTask) retrieveCdcTask(ctx context.Context) error { return err } - err = cdc2.JsonDecode(jsonSinkUri, &cdc.sinkUri) - if err != nil { + if err = cdc2.JsonDecode(jsonSinkUri, &cdc.sinkUri); err != nil { return err } //sink_password - sinkPwd, err = res.GetString(ctx, 0, 2) + sinkPwd, err := res.GetString(ctx, 0, 2) if err != nil { return err } @@ -1409,8 +1084,7 @@ func (cdc *CdcTask) retrieveCdcTask(ctx context.Context) error { return err } - cdc.sinkUri.Password, err = cdc2.AesCFBDecode(ctx, sinkPwd) - if err != nil { + if cdc.sinkUri.Password, err = cdc2.AesCFBDecode(ctx, sinkPwd); err != nil { return err } } @@ -1418,26 +1092,25 @@ func (cdc *CdcTask) retrieveCdcTask(ctx context.Context) error { //update sink type after deserialize cdc.sinkUri.SinkTyp = sinkTyp - //tables + // tables jsonTables, err := res.GetString(ctx, 0, 3) if err != nil { return err } - err = cdc2.JsonDecode(jsonTables, &cdc.tables) - if err != nil { + if err = cdc2.JsonDecode(jsonTables, &cdc.tables); err != nil { return err } - //filters - jsonFilters, err := res.GetString(ctx, 0, 4) + // exclude + exclude, err := res.GetString(ctx, 0, 4) if err != nil { return err } - - err = cdc2.JsonDecode(jsonFilters, &cdc.filters) - if err != nil { - return err + if exclude != "" { + if cdc.exclude, err = regexp.Compile(exclude); err != nil { + return err + } } // noFull @@ -1445,9 +1118,10 @@ func (cdc *CdcTask) retrieveCdcTask(ctx context.Context) error { if err != nil { return err } + cdc.noFull, _ = strconv.ParseBool(noFull) + // startTs cdc.startTs = types.TS{} - cdc.noFull = noFull // additionalConfig additionalConfigStr, err := res.GetString(ctx, 0, 6) @@ -1457,54 +1131,6 @@ func (cdc *CdcTask) retrieveCdcTask(ctx context.Context) error { return json.Unmarshal([]byte(additionalConfigStr), &cdc.additionalConfig) } -func (cdc *CdcTask) retrieveTable(ctx context.Context, accId uint64, accName, dbName, tblName string, tblIdStrMap map[string]bool) (*cdc2.DbTableInfo, error) { - var dbId, tblId uint64 - var err error - ctx = defines.AttachAccountId(ctx, catalog.System_Account) - sql := getSqlForDbIdAndTableId(accId, dbName, tblName) - res := cdc.ie.Query(ctx, sql, ie.SessionOverrideOptions{}) - if res.Error() != nil { - return nil, res.Error() - } - - getTblIdStr := func(tblId uint64) string { - for i := 0; ; i++ { - tblIdStr := fmt.Sprintf("%d_%d", tblId, i) - if _, ok := tblIdStrMap[tblIdStr]; !ok { - tblIdStrMap[tblIdStr] = true - return tblIdStr - } - } - } - - /* - missing table will be handled in the future. - */ - if res.RowCount() < 1 { - return nil, moerr.NewInternalErrorf(ctx, "no table %s:%s", dbName, tblName) - } else if res.RowCount() > 1 { - return nil, moerr.NewInternalErrorf(ctx, "duplicate table %s:%s", dbName, tblName) - } - - if dbId, err = res.GetUint64(ctx, 0, 0); err != nil { - return nil, err - } - - if tblId, err = res.GetUint64(ctx, 0, 1); err != nil { - return nil, err - } - - return &cdc2.DbTableInfo{ - SourceAccountName: accName, - SourceDbName: dbName, - SourceTblName: tblName, - SourceAccountId: accId, - SourceDbId: dbId, - SourceTblId: tblId, - SourceTblIdStr: getTblIdStr(tblId), - }, err -} - var initAesKeyByInternalExecutor = func(ctx context.Context, cdc *CdcTask, accountId uint32) error { return cdc.initAesKeyByInternalExecutor(ctx, accountId) } @@ -1958,20 +1584,3 @@ func initAesKeyBySqlExecutor(ctx context.Context, executor taskservice.SqlExecut cdc2.AesKey, err = decrypt(ctx, encryptedKey, []byte(getGlobalPuWrapper(service).SV.KeyEncryptionKey)) return } - -func needSkipThisTable(accountName, dbName, tblName string, filters *cdc2.PatternTuples) bool { - if len(filters.Pts) == 0 { - return false - } - for _, filter := range filters.Pts { - if filter == nil { - continue - } - if filter.Source.Account == accountName && - filter.Source.Database == dbName && - filter.Source.Table == tblName { - return true - } - } - return false -} diff --git a/pkg/frontend/cdc_test.go b/pkg/frontend/cdc_test.go index ab9c934f35dda..ea0d35d64323b 100644 --- a/pkg/frontend/cdc_test.go +++ b/pkg/frontend/cdc_test.go @@ -19,6 +19,7 @@ import ( "database/sql" "fmt" "regexp" + "sync" "testing" "time" @@ -106,527 +107,254 @@ func Test_newCdcSqlFormat(t *testing.T) { assert.Equal(t, wantsql7, sql7) } -func Test_parseTables(t *testing.T) { +func Test_getPatternTuples(t *testing.T) { //tables := []string{ - // "acc1.users.t1:acc1.users.t1", - // "acc1.users.t*:acc1.users.t*", - // "acc*.users.t?:acc*.users.t?", - // "acc*.users|items.*[12]/:acc*.users|items.*[12]/", - // "acc*.*./table./", - // //"acc*.*.table*", - // //"/sys|acc.*/.*.t*", - // //"/sys|acc.*/.*./t.$/", - // //"/sys|acc.*/.test*./t1{1,3}$/,/acc[23]/.items./.*/", + // - table level + // "db1.t1:db2.t2,db3.t3:db4.t4", + // "db1.t1,db3.t3:db4.t4", + // "db1.t1,db3.t3", + // "db1.t1:db2.t2,db1.t1:db4.t4", // error + // - db level + // "db1:db2,db3:db4", + // "db1,db3:db4", + // "db1,db3", + // - account level //} - type tableInfo struct { - account, db, table string - tableIsRegexp bool - } - - isSame := func(info tableInfo, account, db, table string, isRegexp bool, tip string) { - assert.Equalf(t, info.account, account, tip) - assert.Equalf(t, info.db, db, tip) - assert.Equalf(t, info.table, table, tip) - assert.Equalf(t, info.tableIsRegexp, isRegexp, tip) - } - type kase struct { - input string + tables string + level string wantErr bool - src tableInfo - dst tableInfo + expect *cdc2.PatternTuples } kases := []kase{ + // table level { - input: "acc1.users.t1:acc1.users.t1", + tables: "db1.t1:db2.t2,db3.t3:db4.t4", + level: cdc2.TableLevel, wantErr: false, - src: tableInfo{ - account: "acc1", - db: "users", - table: "t1", - }, - dst: tableInfo{ - account: "acc1", - db: "users", - table: "t1", + expect: &cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: "db1", + Table: "t1", + }, + Sink: cdc2.PatternTable{ + Database: "db2", + Table: "t2", + }, + }, + { + Source: cdc2.PatternTable{ + Database: "db3", + Table: "t3", + }, + Sink: cdc2.PatternTable{ + Database: "db4", + Table: "t4", + }, + }, + }, }, }, { - input: "acc1.users.t*:acc1.users.t*", + tables: "db1.t1,db3.t3:db4.t4", + level: cdc2.TableLevel, wantErr: false, - src: tableInfo{ - account: "acc1", - db: "users", - table: "t*", - }, - dst: tableInfo{ - account: "acc1", - db: "users", - table: "t*", + expect: &cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: "db1", + Table: "t1", + }, + Sink: cdc2.PatternTable{ + Database: "db1", + Table: "t1", + }, + }, + { + Source: cdc2.PatternTable{ + Database: "db3", + Table: "t3", + }, + Sink: cdc2.PatternTable{ + Database: "db4", + Table: "t4", + }, + }, + }, }, }, { - input: "acc*.users.t?:acc*.users.t?", + tables: "db1.t1,db3.t3", + level: cdc2.TableLevel, wantErr: false, - src: tableInfo{ - account: "acc*", - db: "users", - table: "t?", - }, - dst: tableInfo{ - account: "acc*", - db: "users", - table: "t?", + expect: &cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: "db1", + Table: "t1", + }, + Sink: cdc2.PatternTable{ + Database: "db1", + Table: "t1", + }, + }, + { + Source: cdc2.PatternTable{ + Database: "db3", + Table: "t3", + }, + Sink: cdc2.PatternTable{ + Database: "db3", + Table: "t3", + }, + }, + }, }, }, { - input: "acc*.users|items.*[12]/:acc*.users|items.*[12]/", + tables: "db1.t1:db2.t2,db1.t1:db4.t4", + level: cdc2.TableLevel, + wantErr: true, + }, + + // db level + { + tables: "db1:db2,db3:db4", + level: cdc2.DbLevel, wantErr: false, - src: tableInfo{ - account: "acc*", - db: "users|items", - table: "*[12]/", - }, - dst: tableInfo{ - account: "acc*", - db: "users|items", - table: "*[12]/", + expect: &cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: "db1", + Table: cdc2.MatchAll, + }, + Sink: cdc2.PatternTable{ + Database: "db2", + Table: cdc2.MatchAll, + }, + }, + { + Source: cdc2.PatternTable{ + Database: "db3", + Table: cdc2.MatchAll, + }, + Sink: cdc2.PatternTable{ + Database: "db4", + Table: cdc2.MatchAll, + }, + }, + }, }, }, { - input: "acc*.*./table./", - wantErr: true, - src: tableInfo{}, - dst: tableInfo{}, + tables: "db1,db3:db4", + level: cdc2.DbLevel, + wantErr: false, + expect: &cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: "db1", + Table: cdc2.MatchAll, + }, + Sink: cdc2.PatternTable{ + Database: "db1", + Table: cdc2.MatchAll, + }, + }, + { + Source: cdc2.PatternTable{ + Database: "db3", + Table: cdc2.MatchAll, + }, + Sink: cdc2.PatternTable{ + Database: "db4", + Table: cdc2.MatchAll, + }, + }, + }, + }, }, { - input: "acc*.*.table*:acc*.*.table*", + tables: "db1,db3", + level: cdc2.DbLevel, wantErr: false, - src: tableInfo{ - account: "acc*", - db: "*", - table: "table*", - }, - dst: tableInfo{ - account: "acc*", - db: "*", - table: "table*", + expect: &cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: "db1", + Table: cdc2.MatchAll, + }, + Sink: cdc2.PatternTable{ + Database: "db1", + Table: cdc2.MatchAll, + }, + }, + { + Source: cdc2.PatternTable{ + Database: "db3", + Table: cdc2.MatchAll, + }, + Sink: cdc2.PatternTable{ + Database: "db3", + Table: cdc2.MatchAll, + }, + }, + }, }, }, + + // account level { - input: "", - wantErr: true, + tables: "", + level: cdc2.AccountLevel, + wantErr: false, + expect: &cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: cdc2.MatchAll, + Table: cdc2.MatchAll, + }, + Sink: cdc2.PatternTable{ + Database: cdc2.MatchAll, + Table: cdc2.MatchAll, + }, + }, + }, + }, }, } + isSame := func(pt0, pt1 *cdc2.PatternTuples) { + assert.Equal(t, len(pt0.Pts), len(pt1.Pts)) + for i := 0; i < len(pt0.Pts); i++ { + assert.Equal(t, pt0.Pts[i].Source.Database, pt1.Pts[i].Source.Database) + assert.Equal(t, pt0.Pts[i].Source.Table, pt1.Pts[i].Source.Table) + assert.Equal(t, pt0.Pts[i].Sink.Database, pt1.Pts[i].Sink.Database) + assert.Equal(t, pt0.Pts[i].Sink.Table, pt1.Pts[i].Sink.Table) + } + } + for _, tkase := range kases { - pirs, err := extractTablePairs(context.Background(), tkase.input, "") + pts, err := getPatternTuples(context.Background(), tkase.level, tkase.tables) if tkase.wantErr { - assert.Errorf(t, err, tkase.input) + assert.Errorf(t, err, tkase.tables) } else { - assert.NoErrorf(t, err, tkase.input) - assert.Equal(t, len(pirs.Pts), 1, tkase.input) - pir := pirs.Pts[0] - isSame(tkase.src, pir.Source.Account, pir.Source.Database, pir.Source.Table, pir.Source.TableIsRegexp, tkase.input) - isSame(tkase.dst, pir.Sink.Account, pir.Sink.Database, pir.Sink.Table, pir.Sink.TableIsRegexp, tkase.input) + assert.NoErrorf(t, err, tkase.tables) + isSame(pts, tkase.expect) } } } -func Test_privilegeCheck(t *testing.T) { - var tenantInfo *TenantInfo - var err error - var pts []*cdc2.PatternTuple - ctx := context.Background() - ses := &Session{} - - gen := func(pts []*cdc2.PatternTuple) *cdc2.PatternTuples { - return &cdc2.PatternTuples{Pts: pts} - } - - tenantInfo = &TenantInfo{ - Tenant: sysAccountName, - DefaultRole: moAdminRoleName, - } - ses.tenant = tenantInfo - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc1"}}, - {Source: cdc2.PatternTable{Account: sysAccountName}}, - } - err = canCreateCdcTask(ctx, ses, "Cluster", "", gen(pts)) - assert.Nil(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: sysAccountName, Database: moCatalog}}, - } - err = canCreateCdcTask(ctx, ses, "Cluster", "", gen(pts)) - assert.NotNil(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: sysAccountName}}, - } - err = canCreateCdcTask(ctx, ses, "Account", "acc1", gen(pts)) - assert.NotNil(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc2"}}, - } - err = canCreateCdcTask(ctx, ses, "Account", "acc1", gen(pts)) - assert.NotNil(t, err) - - pts = []*cdc2.PatternTuple{ - {}, - } - err = canCreateCdcTask(ctx, ses, "Account", "acc1", gen(pts)) - assert.Error(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc1"}}, - } - err = canCreateCdcTask(ctx, ses, "Account", "acc1", gen(pts)) - assert.Nil(t, err) - - tenantInfo = &TenantInfo{ - Tenant: "acc1", - DefaultRole: accountAdminRoleName, - } - ses.tenant = tenantInfo - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc1"}}, - {}, - } - err = canCreateCdcTask(ctx, ses, "Cluster", "", gen(pts)) - assert.NotNil(t, err) - - err = canCreateCdcTask(ctx, ses, "Account", "acc2", gen(pts)) - assert.NotNil(t, err) - - err = canCreateCdcTask(ctx, ses, "Account", "acc1", gen(pts)) - assert.Error(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc2"}}, - {Source: cdc2.PatternTable{Account: sysAccountName}}, - } - err = canCreateCdcTask(ctx, ses, "Account", "acc1", gen(pts)) - assert.NotNil(t, err) - -} - -func Test_attachAccoutForFilters(t *testing.T) { - var tenantInfo *TenantInfo - var err error - var pts []*cdc2.PatternTuple - ctx := context.Background() - ses := &Session{} - - gen := func(pts []*cdc2.PatternTuple) *cdc2.PatternTuples { - return &cdc2.PatternTuples{Pts: pts} - } - - tenantInfo = &TenantInfo{ - Tenant: sysAccountName, - DefaultRole: moAdminRoleName, - } - ses.tenant = tenantInfo - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc1"}}, - {Source: cdc2.PatternTable{Account: sysAccountName}}, - } - err = attachAccountToFilters(ctx, ses, "Cluster", "", gen(pts)) - assert.Nil(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: sysAccountName, Database: moCatalog}}, - } - err = attachAccountToFilters(ctx, ses, "Cluster", "", gen(pts)) - assert.Nil(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: sysAccountName}}, - } - err = attachAccountToFilters(ctx, ses, "Account", "acc1", gen(pts)) - assert.NotNil(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc2"}}, - } - err = attachAccountToFilters(ctx, ses, "Account", "acc1", gen(pts)) - assert.NotNil(t, err) - - pts = []*cdc2.PatternTuple{ - {}, - } - err = attachAccountToFilters(ctx, ses, "Account", "acc1", gen(pts)) - assert.Nil(t, err) - assert.Equalf(t, "acc1", pts[0].Source.Account, "different account") - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc1"}}, - } - err = attachAccountToFilters(ctx, ses, "Account", "acc1", gen(pts)) - assert.Nil(t, err) - - tenantInfo = &TenantInfo{ - Tenant: "acc1", - DefaultRole: accountAdminRoleName, - } - ses.tenant = tenantInfo - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc1"}}, - {}, - } - err = attachAccountToFilters(ctx, ses, "Cluster", "", gen(pts)) - assert.NotNil(t, err) - - err = attachAccountToFilters(ctx, ses, "Account", "acc2", gen(pts)) - assert.NotNil(t, err) - - err = attachAccountToFilters(ctx, ses, "Account", "acc1", gen(pts)) - assert.Nil(t, err) - - pts = []*cdc2.PatternTuple{ - {Source: cdc2.PatternTable{Account: "acc2"}}, - {Source: cdc2.PatternTable{Account: sysAccountName}}, - } - err = attachAccountToFilters(ctx, ses, "Account", "acc1", gen(pts)) - assert.NotNil(t, err) - -} - -func Test_extractTableInfo(t *testing.T) { - type args struct { - ctx context.Context - input string - mustBeConcreteTable bool - } - tests := []struct { - name string - args args - wantAccount string - wantDb string - wantTable string - wantErr assert.ErrorAssertionFunc - }{ - { - name: "t1-1", - args: args{ - input: "acc1.db.t1", - mustBeConcreteTable: true, - }, - wantAccount: "acc1", - wantDb: "db", - wantTable: "t1", - wantErr: nil, - }, - { - name: "t1-2", - args: args{ - input: "acc1.db.t*", - mustBeConcreteTable: true, - }, - wantAccount: "acc1", - wantDb: "db", - wantTable: "t*", - wantErr: nil, - }, - { - name: "t1-3-table pattern needs //", - args: args{ - input: "acc1.db.t*", - mustBeConcreteTable: false, - }, - wantAccount: "acc1", - wantDb: "db", - wantTable: "t*", - wantErr: nil, - }, - { - name: "t1-4", - args: args{ - input: "acc1.db./t*/", - mustBeConcreteTable: false, - }, - wantAccount: "acc1", - wantDb: "db", - wantTable: "/t*/", - wantErr: nil, - }, - { - name: "t2-1", - args: args{ - input: "db.t1", - mustBeConcreteTable: true, - }, - wantAccount: "", - wantDb: "db", - wantTable: "t1", - wantErr: nil, - }, - { - name: "t2-2-table pattern needs //", - args: args{ - input: "db.t*", - mustBeConcreteTable: true, - }, - wantAccount: "", - wantDb: "db", - wantTable: "t*", - wantErr: nil, - }, - { - name: "t2-3-table name can be 't*'", - args: args{ - input: "db.t*", - mustBeConcreteTable: false, - }, - wantAccount: "", - wantDb: "db", - wantTable: "t*", - wantErr: nil, - }, - { - name: "t2-4", - args: args{ - input: "db./t*/", - mustBeConcreteTable: false, - }, - wantAccount: "", - wantDb: "db", - wantTable: "/t*/", - wantErr: nil, - }, - { - name: "t2-5", - args: args{ - input: "db./t*/", - mustBeConcreteTable: true, - }, - wantAccount: "", - wantDb: "db", - wantTable: "/t*/", - wantErr: nil, - }, - { - name: "t3--invalid format", - args: args{ - input: "nodot", - mustBeConcreteTable: false, - }, - wantAccount: "", - wantDb: "", - wantTable: "", - wantErr: assert.Error, - }, - { - name: "t3--invalid account name", - args: args{ - input: "1234*90.db.t1", - mustBeConcreteTable: false, - }, - wantAccount: "1234*90", - wantDb: "db", - wantTable: "t1", - wantErr: nil, - }, - { - name: "t3--invalid database name", - args: args{ - input: "acc.12ddg.t1", - mustBeConcreteTable: false, - }, - wantAccount: "acc", - wantDb: "12ddg", - wantTable: "t1", - wantErr: nil, - }, - { - name: "t4--invalid database name", - args: args{ - input: "acc*./users|items/./t.*[12]/", - mustBeConcreteTable: false, - }, - wantAccount: "", - wantDb: "", - wantTable: "", - wantErr: assert.Error, - }, - { - name: "t4-- X ", - args: args{ - input: "/sys|acc.*/.*.t*", - mustBeConcreteTable: false, - }, - wantAccount: "", - wantDb: "", - wantTable: "", - wantErr: assert.Error, - }, - { - name: "t4-- XX", - args: args{ - input: "/sys|acc.*/.*./t.$/", - mustBeConcreteTable: false, - }, - wantAccount: "", - wantDb: "", - wantTable: "", - wantErr: assert.Error, - }, - { - name: "t4-- XXX", - args: args{ - input: "/sys|acc.*/.test*./t1{1,3}$/,/acc[23]/.items./.*/", - mustBeConcreteTable: false, - }, - wantAccount: "", - wantDb: "", - wantTable: "", - wantErr: assert.Error, - }, - { - name: "accountNameIsLegal", - args: args{ - input: "s:s.db.table", - mustBeConcreteTable: false, - }, - wantErr: assert.Error, - }, - { - name: "dbNameIsLegal", - args: args{ - input: "sys.d:b.table", - mustBeConcreteTable: false, - }, - wantErr: assert.Error, - }, - { - name: "tableNameIsLegal", - args: args{ - input: "sys.db.ta:le", - mustBeConcreteTable: false, - }, - wantErr: assert.Error, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gotAccount, gotDb, gotTable, _, err := extractTableInfo(tt.args.ctx, tt.args.input, tt.args.mustBeConcreteTable) - if tt.wantErr != nil && tt.wantErr(t, err, fmt.Sprintf("extractTableInfo(%v, %v, %v)", tt.args.ctx, tt.args.input, tt.args.mustBeConcreteTable)) { - return - } else { - assert.Equalf(t, tt.wantAccount, gotAccount, "extractTableInfo(%v, %v, %v)", tt.args.ctx, tt.args.input, tt.args.mustBeConcreteTable) - assert.Equalf(t, tt.wantDb, gotDb, "extractTableInfo(%v, %v, %v)", tt.args.ctx, tt.args.input, tt.args.mustBeConcreteTable) - assert.Equalf(t, tt.wantTable, gotTable, "extractTableInfo(%v, %v, %v)", tt.args.ctx, tt.args.input, tt.args.mustBeConcreteTable) - } - }) - } -} - func Test_handleCreateCdc(t *testing.T) { type args struct { ses *Session @@ -647,25 +375,8 @@ func Test_handleCreateCdc(t *testing.T) { db, mock, err := sqlmock.New() assert.NoError(t, err) - ///////mock result - sql1 := "select account_id from `mo_catalog`.`mo_account` where account_name='sys'" - mock.ExpectQuery(sql1).WillReturnRows(sqlmock.NewRows([]string{"account_id"}).AddRow(uint64(sysAccountID))) - - sql2 := "select rel_id from `mo_catalog`.`mo_tables` where account_id = 0 and reldatabase ='db1' and relname = 't1'" - mock.ExpectQuery(sql2).WillReturnRows(sqlmock.NewRows([]string{"table_id"}).AddRow(uint64(11))) - - sql3 := "select count.*att_constraint_type.* from `mo_catalog`.`mo_columns` where account_id = 0 and att_database = 'db1' and att_relname = 't1' and att_constraint_type = 'p'" - mock.ExpectQuery(sql3).WillReturnRows(sqlmock.NewRows([]string{"count(att_constraint_type)"}).AddRow(uint64(1))) - - sql2_1 := "select rel_id from `mo_catalog`.`mo_tables` where account_id = 0 and reldatabase ='db1' and relname = 't2'" - mock.ExpectQuery(sql2_1).WillReturnRows(sqlmock.NewRows([]string{"table_id"}).AddRow(uint64(12))) - - sql3_1 := "select count.*att_constraint_type.* from `mo_catalog`.`mo_columns` where account_id = 0 and att_database = 'db1' and att_relname = 't2' and att_constraint_type = 'p'" - mock.ExpectQuery(sql3_1).WillReturnRows(sqlmock.NewRows([]string{"count(att_constraint_type)"}).AddRow(uint64(1))) - - sql4 := "insert into mo_catalog.mo_cdc_task values.*0,\".*\",\"task1\",\".*\",\"\",\".*\",\"mysql\",\".*\",\"\",\"\",\"\",\".*\",\".*\",\"\",\"common\",\"common\",\"\",\"\",\"\",\".*\",\"running\",0,\"0\",\"false\",\"\",'.*',\"\",\"\",\"\",\"\".*" + sql4 := "insert into mo_catalog.mo_cdc_task .*" mock.ExpectExec(sql4).WillReturnResult(sqlmock.NewResult(1, 1)) - //// pu := config.ParameterUnit{} pu.TaskService = &testTaskService{ @@ -682,10 +393,10 @@ func Test_handleCreateCdc(t *testing.T) { Tables: "db1.t1:db1.t1,db1.t2", Option: []string{ "Level", - cdc2.AccountLevel, + cdc2.TableLevel, "Account", sysAccountName, - "Rules", + "Exclude", "db2.t3,db2.t4", cdc2.InitSnapshotSplitTxn, "false", @@ -1081,9 +792,6 @@ const ( ) func TestRegisterCdcExecutor(t *testing.T) { - cdc2.AesKey = "test-aes-key-not-use-it-in-cloud" - defer func() { cdc2.AesKey = "" }() - type args struct { logger *zap.Logger ts *testTaskService @@ -1098,6 +806,9 @@ func TestRegisterCdcExecutor(t *testing.T) { curDTaskId int } + cdc2.AesKey = "test-aes-key-not-use-it-in-cloud" + defer func() { cdc2.AesKey = "" }() + cdc2.EnableConsoleSink = true defer func() { cdc2.EnableConsoleSink = false @@ -1120,15 +831,12 @@ func TestRegisterCdcExecutor(t *testing.T) { Pts: []*cdc2.PatternTuple{ { Source: cdc2.PatternTable{ - AccountId: uint64(sysAccountID), - Account: sysAccountName, - Database: "db1", - Table: "t1", + Database: "db1", + Table: "t1", }, }, }, - }, - ) + }) assert.NoError(t, err) filters, err := cdc2.JsonEncode(cdc2.PatternTuples{}) assert.NoError(t, err) @@ -1159,82 +867,21 @@ func TestRegisterCdcExecutor(t *testing.T) { cdc2.SendSqlTimeout, cdc2.DefaultSendSqlTimeout, cdc2.MaxSqlLength, cdc2.DefaultMaxSqlLength, ), - ), - ) - - sql2 := "select reldatabase_id,rel_id from mo_catalog.mo_tables where account_id = 0 and reldatabase = 'db1' and relname = 't1'" - mock.ExpectQuery(sql2).WillReturnRows(sqlmock.NewRows( - []string{ - "reldatabase_id", - "rel_id", - }).AddRow( - uint64(10), - uint64(1001), - ), - ) - - sql3 := "select table_id, watermark from mo_catalog.mo_cdc_watermark where account_id = 0 and task_id = '00000000-0000-0000-0000-000000000000'" - mock.ExpectQuery(sql3).WillReturnRows( - sqlmock.NewRows([]string{"table_id", "watermark"}). - AddRow("1001_1", "0-0")) - - sql4 := "insert into mo_catalog.mo_cdc_watermark values .*0, '00000000-0000-0000-0000-000000000000', '1001_0', 'db1', 't1', '0-0'.*" - mock.ExpectExec(sql4).WillReturnResult(sqlmock.NewResult(1, 1)) - - sql41 := "delete from mo_catalog.mo_cdc_watermark where account_id = 0 and task_id = '00000000-0000-0000-0000-000000000000' and table_id = '1001_1'" - mock.ExpectExec(sql41).WillReturnResult(sqlmock.NewResult(1, 1)) - - sql5 := "select watermark from mo_catalog.mo_cdc_watermark where account_id = 0 and task_id = '00000000-0000-0000-0000-000000000000' and table_id = '1001_0'" - mock.ExpectQuery(sql5).WillReturnRows(sqlmock.NewRows( - []string{ - "watermark", - }, - ).AddRow( - "0-0", )) sql7 := "update `mo_catalog`.`mo_cdc_task` set state = 'running', err_msg = '' where account_id = 0 and task_id = '00000000-0000-0000-0000-000000000000'" mock.ExpectExec(sql7).WillReturnResult(sqlmock.NewResult(1, 1)) - sql6 := "update mo_catalog.mo_cdc_watermark set watermark='0-0' where account_id = 0 and task_id = '00000000-0000-0000-0000-000000000000' and table_id = '1001_0'" - mock.ExpectExec(sql6).WillReturnResult(sqlmock.NewResult(1, 1)) - genSqlIdx := func(sql string) int { mSql1, err := regexp.MatchString(sql1, sql) assert.NoError(t, err) - mSql2, err := regexp.MatchString(sql2, sql) - assert.NoError(t, err) - mSql3, err := regexp.MatchString(sql3, sql) - assert.NoError(t, err) - mSql4, err := regexp.MatchString(sql4, sql) - assert.NoError(t, err) - mSql41, err := regexp.MatchString(sql41, sql) - assert.NoError(t, err) - mSql5, err := regexp.MatchString(sql5, sql) - assert.NoError(t, err) - mSql6, err := regexp.MatchString(sql6, sql) - assert.NoError(t, err) if mSql1 { return mSqlIdx1 - } else if mSql2 { - return mSqlIdx2 - } else if mSql3 { - return mSqlIdx3 - } else if mSql4 { - return mSqlIdx4 - } else if mSql41 { - return mSqlIdx41 - } else if mSql5 { - return mSqlIdx5 - } else if mSql6 { - return mSqlIdx6 } return -1 } - //////// - ///////////mock engine eng := mock_frontend.NewMockEngine(ctrl) eng.EXPECT().New(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -1243,21 +890,6 @@ func TestRegisterCdcExecutor(t *testing.T) { }).AnyTimes() eng.EXPECT().LatestLogtailAppliedTime().Return(timestamp.Timestamp{}).AnyTimes() - table := mock_frontend.NewMockRelation(ctrl) - - tableDef := &plan.TableDef{ - Pkey: &plan.PrimaryKeyDef{ - Names: []string{ - "a", - }, - }, - } - - table.EXPECT().CopyTableDef(gomock.Any()).Return(tableDef).AnyTimes() - table.EXPECT().CollectChanges(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, moerr.NewInternalErrorNoCtx("invalid handle")) - - eng.EXPECT().GetRelationById(gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", table, nil).AnyTimes() - txnOperator := mock_frontend.NewMockTxnOperator(ctrl) txnOperator.EXPECT().Txn().Return(txn.TxnMeta{}).AnyTimes() eng.EXPECT().Database(ctx, gomock.Any(), txnOperator).Return(nil, nil).AnyTimes() @@ -1272,8 +904,6 @@ func TestRegisterCdcExecutor(t *testing.T) { txnClient := mock_frontend.NewMockTxnClient(ctrl) txnClient.EXPECT().New(gomock.Any(), gomock.Any(), gomock.Any()).Return(txnOperator, nil).AnyTimes() - ////////// - tIEFactory1 := func() ie.InternalExecutor { return &testIE{ db: db, @@ -1311,6 +941,14 @@ func TestRegisterCdcExecutor(t *testing.T) { assert.NoError(t, err) defer mpool.DeleteMPool(mp) + gostub.Stub(&cdc2.GetTableScanner, func(cnUUID string) *cdc2.TableScanner { + return &cdc2.TableScanner{ + Mutex: sync.Mutex{}, + Mp: make(map[uint32]cdc2.TblMap), + Callbacks: map[string]func(map[uint32]cdc2.TblMap){"id": func(mp map[uint32]cdc2.TblMap) {}}, + } + }) + tests := []struct { name string args args @@ -2497,85 +2135,21 @@ func Test_handleShowCdc(t *testing.T) { } func TestCdcTask_ResetWatermarkForTable(t *testing.T) { - type fields struct { - logger *zap.Logger - ie ie.InternalExecutor - cnUUID string - cnTxnClient client.TxnClient - cnEngine engine.Engine - fileService fileservice.FileService - cdcTask *task.CreateCdcDetails - mp *mpool.MPool - packerPool *fileservice.Pool[*types.Packer] - sinkUri cdc2.UriInfo - tables cdc2.PatternTuples - filters cdc2.PatternTuples - startTs types.TS - noFull string - activeRoutine *cdc2.ActiveRoutine - sunkWatermarkUpdater *cdc2.WatermarkUpdater - } - type args struct { - info *cdc2.DbTableInfo + cdc := &CdcTask{ + watermarkUpdater: &mockWatermarkUpdater{}, } - db, mock, err := sqlmock.New() - assert.NoError(t, err) - tie := &testIE{ - db: db, + info := &cdc2.DbTableInfo{ + SourceDbId: 0, + SourceDbName: "", + SourceTblId: 0, + SourceTblName: "", + SourceCreateSql: "", + SinkDbName: "", + SinkTblName: "", } - sqlx := "delete from mo_catalog.mo_cdc_watermark where account_id = .* and task_id = .* and table_id = .*" - mock.ExpectExec(sqlx).WillReturnResult(sqlmock.NewResult(1, 1)) - - sqlx1 := "insert into mo_catalog.mo_cdc_watermark values .*" - mock.ExpectExec(sqlx1).WillReturnResult(sqlmock.NewResult(1, 1)) - - tests := []struct { - name string - fields fields - args args - wantErr assert.ErrorAssertionFunc - }{ - { - name: "t1", - fields: fields{ - sunkWatermarkUpdater: cdc2.NewWatermarkUpdater( - sysAccountID, "taskID-1", tie, - ), - ie: tie, - }, - args: args{ - info: &cdc2.DbTableInfo{ - SourceTblId: 10, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cdc := &CdcTask{ - logger: tt.fields.logger, - ie: tt.fields.ie, - cnUUID: tt.fields.cnUUID, - cnTxnClient: tt.fields.cnTxnClient, - cnEngine: tt.fields.cnEngine, - fileService: tt.fields.fileService, - cdcTask: tt.fields.cdcTask, - mp: tt.fields.mp, - packerPool: tt.fields.packerPool, - sinkUri: tt.fields.sinkUri, - tables: tt.fields.tables, - filters: tt.fields.filters, - startTs: tt.fields.startTs, - noFull: tt.fields.noFull, - activeRoutine: tt.fields.activeRoutine, - sunkWatermarkUpdater: tt.fields.sunkWatermarkUpdater, - } - err := cdc.resetWatermarkForTable(tt.args.info) - assert.NoErrorf(t, err, fmt.Sprintf("resetWatermarkForTable(%v)", tt.args.info)) - }) - } + assert.NoError(t, cdc.resetWatermarkForTable(info)) } func TestCdcTask_Resume(t *testing.T) { @@ -2606,7 +2180,7 @@ func TestCdcTask_Restart(t *testing.T) { cdc := &CdcTask{ activeRoutine: cdc2.NewCdcActiveRoutine(), - sunkWatermarkUpdater: cdc2.NewWatermarkUpdater( + watermarkUpdater: cdc2.NewWatermarkUpdater( sysAccountID, "taskID-0", tie, @@ -2658,7 +2232,7 @@ func TestCdcTask_Cancel(t *testing.T) { } cdc := &CdcTask{ activeRoutine: cdc2.NewCdcActiveRoutine(), - sunkWatermarkUpdater: cdc2.NewWatermarkUpdater( + watermarkUpdater: cdc2.NewWatermarkUpdater( sysAccountID, "taskID-1", tie, @@ -2686,9 +2260,9 @@ func TestCdcTask_retrieveCdcTask(t *testing.T) { packerPool *fileservice.Pool[*types.Packer] sinkUri cdc2.UriInfo tables cdc2.PatternTuples - filters cdc2.PatternTuples + exclude *regexp.Regexp startTs types.TS - noFull string + noFull bool activeRoutine *cdc2.ActiveRoutine sunkWatermarkUpdater *cdc2.WatermarkUpdater } @@ -2715,10 +2289,8 @@ func TestCdcTask_retrieveCdcTask(t *testing.T) { Pts: []*cdc2.PatternTuple{ { Source: cdc2.PatternTable{ - AccountId: uint64(sysAccountID), - Account: sysAccountName, - Database: "db1", - Table: "t1", + Database: "db1", + Table: "t1", }, }, }, @@ -2792,22 +2364,22 @@ func TestCdcTask_retrieveCdcTask(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cdc := &CdcTask{ - logger: tt.fields.logger, - ie: tt.fields.ie, - cnUUID: tt.fields.cnUUID, - cnTxnClient: tt.fields.cnTxnClient, - cnEngine: tt.fields.cnEngine, - fileService: tt.fields.fileService, - cdcTask: tt.fields.cdcTask, - mp: tt.fields.mp, - packerPool: tt.fields.packerPool, - sinkUri: tt.fields.sinkUri, - tables: tt.fields.tables, - filters: tt.fields.filters, - startTs: tt.fields.startTs, - noFull: tt.fields.noFull, - activeRoutine: tt.fields.activeRoutine, - sunkWatermarkUpdater: tt.fields.sunkWatermarkUpdater, + logger: tt.fields.logger, + ie: tt.fields.ie, + cnUUID: tt.fields.cnUUID, + cnTxnClient: tt.fields.cnTxnClient, + cnEngine: tt.fields.cnEngine, + fileService: tt.fields.fileService, + cdcTask: tt.fields.cdcTask, + mp: tt.fields.mp, + packerPool: tt.fields.packerPool, + sinkUri: tt.fields.sinkUri, + tables: tt.fields.tables, + exclude: tt.fields.exclude, + startTs: tt.fields.startTs, + noFull: tt.fields.noFull, + activeRoutine: tt.fields.activeRoutine, + watermarkUpdater: tt.fields.sunkWatermarkUpdater, } err := cdc.retrieveCdcTask(tt.args.ctx) assert.NoError(t, err, fmt.Sprintf("retrieveCdcTask(%v)", tt.args.ctx)) @@ -2946,39 +2518,6 @@ func Test_initAesKey(t *testing.T) { } } -func Test_extractTablePair(t *testing.T) { - type args struct { - ctx context.Context - pattern string - defaultAcc string - } - tests := []struct { - name string - args args - want *cdc2.PatternTuple - wantErr assert.ErrorAssertionFunc - }{ - { - name: "t1", - args: args{ - ctx: context.Background(), - pattern: "source:sink:other", - defaultAcc: "sys", - }, - wantErr: assert.Error, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := extractTablePair(tt.args.ctx, tt.args.pattern, tt.args.defaultAcc) - if !tt.wantErr(t, err, fmt.Sprintf("extractTablePair(%v, %v, %v)", tt.args.ctx, tt.args.pattern, tt.args.defaultAcc)) { - return - } - assert.Equalf(t, tt.want, got, "extractTablePair(%v, %v, %v)", tt.args.ctx, tt.args.pattern, tt.args.defaultAcc) - }) - } -} - var _ ie.InternalExecutor = &mockIe{} type mockIe struct { @@ -3106,3 +2645,181 @@ func TestCdcTask_initAesKeyByInternalExecutor(t *testing.T) { err = initAesKeyByInternalExecutor(context.Background(), cdcTask, 0) assert.Error(t, err) } + +func TestCdcTask_handleNewTables(t *testing.T) { + stub1 := gostub.Stub(&cdc2.GetTxnOp, func(context.Context, engine.Engine, client.TxnClient, string) (client.TxnOperator, error) { + return nil, nil + }) + defer stub1.Reset() + + stub2 := gostub.Stub(&cdc2.FinishTxnOp, func(context.Context, error, client.TxnOperator, engine.Engine) {}) + defer stub2.Reset() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().New(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + cdc := &CdcTask{ + cdcTask: &task.CreateCdcDetails{ + Accounts: []*task.Account{{Id: 0}}, + }, + tables: cdc2.PatternTuples{ + Pts: []*cdc2.PatternTuple{ + { + Source: cdc2.PatternTable{ + Database: "db1", + Table: cdc2.MatchAll, + }, + }, + }, + }, + exclude: regexp.MustCompile("db1.tb1"), + cnEngine: eng, + runningReaders: &sync.Map{}, + } + + mp := map[uint32]cdc2.TblMap{ + 0: { + "db1.tb1": &cdc2.DbTableInfo{}, + "db2.tb1": &cdc2.DbTableInfo{}, + }, + } + cdc.handleNewTables(mp) +} + +type mockWatermarkUpdater struct{} + +func (m mockWatermarkUpdater) Run(context.Context, *cdc2.ActiveRoutine) { + //TODO implement me + panic("implement me") +} + +func (m mockWatermarkUpdater) InsertIntoDb(*cdc2.DbTableInfo, types.TS) error { + return nil +} + +func (m mockWatermarkUpdater) GetFromMem(string, string) types.TS { + //TODO implement me + panic("implement me") +} + +func (m mockWatermarkUpdater) GetFromDb(dbName, tblName string) (watermark types.TS, err error) { + err = moerr.NewErrNoWatermarkFoundNoCtx(dbName, tblName) + return +} + +func (m mockWatermarkUpdater) UpdateMem(string, string, types.TS) {} + +func (m mockWatermarkUpdater) DeleteFromMem(string, string) {} + +func (m mockWatermarkUpdater) DeleteFromDb(string, string) error { + return nil +} + +func (m mockWatermarkUpdater) DeleteAllFromDb() error { + //TODO implement me + panic("implement me") +} + +func (m mockWatermarkUpdater) SaveErrMsg(string, string, string) error { + return nil +} + +type mockReader struct{} + +func (m mockReader) Run(ctx context.Context, ar *cdc2.ActiveRoutine) {} + +func (m mockReader) Close() {} + +type mockSinker struct{} + +func (m mockSinker) Run(ctx context.Context, ar *cdc2.ActiveRoutine) {} + +func (m mockSinker) Sink(ctx context.Context, data *cdc2.DecoderOutput) { + //TODO implement me + panic("implement me") +} + +func (m mockSinker) SendBegin() { + //TODO implement me + panic("implement me") +} + +func (m mockSinker) SendCommit() { + //TODO implement me + panic("implement me") +} + +func (m mockSinker) SendRollback() { + //TODO implement me + panic("implement me") +} + +func (m mockSinker) SendDummy() { + //TODO implement me + panic("implement me") +} + +func (m mockSinker) Error() error { + //TODO implement me + panic("implement me") +} + +func (m mockSinker) Reset() { + //TODO implement me + panic("implement me") +} + +func (m mockSinker) Close() { + //TODO implement me + panic("implement me") +} + +func TestCdcTask_addExecPipelineForTable(t *testing.T) { + cdc := &CdcTask{ + watermarkUpdater: &mockWatermarkUpdater{}, + runningReaders: &sync.Map{}, + noFull: true, + additionalConfig: map[string]interface{}{ + cdc2.MaxSqlLength: float64(cdc2.DefaultMaxSqlLength), + cdc2.SendSqlTimeout: cdc2.DefaultSendSqlTimeout, + cdc2.InitSnapshotSplitTxn: cdc2.DefaultInitSnapshotSplitTxn, + }, + } + + info := &cdc2.DbTableInfo{ + SourceDbId: 0, + SourceDbName: "", + SourceTblId: 0, + SourceTblName: "", + SourceCreateSql: "", + SinkDbName: "", + SinkTblName: "", + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + txnOperator := mock_frontend.NewMockTxnOperator(ctrl) + txnOperator.EXPECT().SnapshotTS().Return(timestamp.Timestamp{}).AnyTimes() + + stubGetTableDef := gostub.Stub(&cdc2.GetTableDef, func(context.Context, client.TxnOperator, engine.Engine, uint64) (*plan.TableDef, error) { + return nil, nil + }) + defer stubGetTableDef.Reset() + + stubSinker := gostub.Stub(&cdc2.NewSinker, func(cdc2.UriInfo, *cdc2.DbTableInfo, cdc2.IWatermarkUpdater, + *plan.TableDef, int, time.Duration, *cdc2.ActiveRoutine, uint64, string) (cdc2.Sinker, error) { + return &mockSinker{}, nil + }) + defer stubSinker.Reset() + + stubReader := gostub.Stub(&cdc2.NewTableReader, func(client.TxnClient, engine.Engine, *mpool.MPool, *fileservice.Pool[*types.Packer], + *cdc2.DbTableInfo, cdc2.Sinker, cdc2.IWatermarkUpdater, *plan.TableDef, func(*cdc2.DbTableInfo) error, bool, *sync.Map) cdc2.Reader { + return &mockReader{} + }) + defer stubReader.Reset() + + assert.NoError(t, cdc.addExecPipelineForTable(context.Background(), info, txnOperator)) +} diff --git a/pkg/frontend/predefined.go b/pkg/frontend/predefined.go index 7f9f538598db2..b82cc16f32dea 100644 --- a/pkg/frontend/predefined.go +++ b/pkg/frontend/predefined.go @@ -260,12 +260,11 @@ var ( MoCatalogMoCdcWatermarkDDL = `create table mo_catalog.mo_cdc_watermark ( account_id bigint unsigned, task_id uuid, - table_id varchar(64), db_name varchar(256), table_name varchar(256), watermark varchar(128), err_msg varchar(256), - primary key(account_id,task_id,table_id) + primary key(account_id,task_id,db_name,table_name) )` MoCatalogMoSessionsDDL = `CREATE VIEW mo_catalog.mo_sessions AS SELECT node_id, conn_id, session_id, account, user, host, db, session_start, command, info, txn_id, statement_id, statement_type, query_type, sql_source_type, query_start, client_host, role, proxy_host FROM mo_sessions() AS mo_sessions_tmp` diff --git a/pkg/frontend/util.go b/pkg/frontend/util.go index 0e2d51d381cb4..2302152fe0d96 100644 --- a/pkg/frontend/util.go +++ b/pkg/frontend/util.go @@ -1559,10 +1559,14 @@ rule: it means all most all string can be legal. */ func dbNameIsLegal(name string) bool { + name = strings.TrimSpace(name) if hasSpecialChars(name) { return false } - name = strings.TrimSpace(name) + if name == cdc.MatchAll { + return true + } + createDBSqls := []string{ "create database " + name, "create database `" + name + "`", @@ -1580,10 +1584,14 @@ rule: it means all most all string can be legal. */ func tableNameIsLegal(name string) bool { + name = strings.TrimSpace(name) if hasSpecialChars(name) { return false } - name = strings.TrimSpace(name) + if name == cdc.MatchAll { + return true + } + createTableSqls := []string{ "create table " + name + "(a int)", "create table `" + name + "`(a int)", From 9fe036163a0077f83da013733297f9f46b0d9016 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Wed, 18 Dec 2024 16:47:31 +0800 Subject: [PATCH 2/2] [improvement] clusterservice: at least one tn in the service. (#20810) make sure there is at least one TN in the service. Approved by: @zhangxu19830126 --- pkg/clusterservice/cluster.go | 16 +++++++++++++--- pkg/clusterservice/cluster_test.go | 22 ++++++++++++---------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/pkg/clusterservice/cluster.go b/pkg/clusterservice/cluster.go index b51367c033203..a70a6973704c5 100644 --- a/pkg/clusterservice/cluster.go +++ b/pkg/clusterservice/cluster.go @@ -16,6 +16,7 @@ package clusterservice import ( "context" + "sort" "sync" "sync/atomic" "time" @@ -343,13 +344,22 @@ func (c *cluster) refresh() { c.logger.Debug("cn service added", zap.String("cn", v.DebugString())) } } + // sort as the tick, with the bigger one at the front. + sort.Slice(details.TNStores, func(i, j int) bool { + return details.TNStores[i].Tick > details.TNStores[j].Tick + }) for _, tn := range details.TNStores { - if tn.State == logpb.NormalState { - v := newTNService(tn) - new.addTN([]metadata.TNService{v}) + v := newTNService(tn) + new.addTN([]metadata.TNService{v}) + if c.logger.Enabled(zap.DebugLevel) { c.logger.Debug("dn service added", zap.String("dn", v.DebugString())) } } + // if there are multiple tn services, only take the first one, which has + // the biggest tick. + if len(new.tn) > 1 { + new.tn = new.tn[:1] + } c.services.Store(new) c.readyOnce.Do(func() { close(c.readyC) diff --git a/pkg/clusterservice/cluster_test.go b/pkg/clusterservice/cluster_test.go index c8e140953aee5..fc9f5439ff18e 100644 --- a/pkg/clusterservice/cluster_test.go +++ b/pkg/clusterservice/cluster_test.go @@ -78,7 +78,7 @@ func TestClusterRefresh(t *testing.T) { c.GetTNService(NewServiceIDSelector("dn0"), apply) assert.Equal(t, 0, cnt) - hc.addTN(logpb.NormalState, "dn0") + hc.addTN(0, "dn0") time.Sleep(time.Millisecond * 100) c.GetTNService(NewServiceIDSelector("dn0"), apply) assert.Equal(t, 1, cnt) @@ -96,7 +96,7 @@ func BenchmarkGetService(b *testing.B) { } c.GetTNService(NewServiceIDSelector("dn0"), apply) - hc.addTN(logpb.NormalState, "dn0") + hc.addTN(0, "dn0") c.ForceRefresh(true) b.ResetTimer() @@ -150,18 +150,20 @@ func TestCluster_GetTNService(t *testing.T) { runClusterTest( time.Hour, func(hc *testHAKeeperClient, c *cluster) { - hc.addTN(logpb.NormalState, "dn0") - hc.addTN(logpb.TimeoutState, "dn1") + hc.addTN(100, "dn0") + hc.addTN(200, "dn1") + hc.addTN(50, "dn2") c.ForceRefresh(true) - var count int + var tns []metadata.TNService c.GetTNService( NewSelector(), func(service metadata.TNService) bool { - count++ + tns = append(tns, service) return true }, ) - require.Equal(t, 1, count) + require.Equal(t, 1, len(tns)) + require.Equal(t, "dn1", tns[0].ServiceID) }, ) } @@ -199,13 +201,13 @@ func (c *testHAKeeperClient) addCN(serviceIDs ...string) { } } -func (c *testHAKeeperClient) addTN(state logpb.NodeState, serviceIDs ...string) { +func (c *testHAKeeperClient) addTN(tick uint64, serviceIDs ...string) { c.Lock() defer c.Unlock() for _, id := range serviceIDs { c.value.TNStores = append(c.value.TNStores, logpb.TNStore{ - UUID: id, - State: state, + UUID: id, + Tick: tick, }) } }