Skip to content

Commit

Permalink
Merge branch '2.0-dev' into 1217-refine-bytes-2.0-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
daviszhen committed Dec 18, 2024
2 parents 6748b5c + a7f27c7 commit 8debb2a
Show file tree
Hide file tree
Showing 21 changed files with 1,268 additions and 1,700 deletions.
10 changes: 10 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,13 @@ const (
SAVED_ROW_COUNT_IDX = 14
QUERY_ROW_COUNT_IDX = 15
)

var SystemDatabases = []string{
"information_schema",
"mo_catalog",
"mo_debug",
"mo_task",
"mysql",
"system",
"system_metrics",
}
18 changes: 12 additions & 6 deletions pkg/cdc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cdc

import (
"context"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand All @@ -39,27 +40,29 @@ type tableReader struct {
packerPool *fileservice.Pool[*types.Packer]
info *DbTableInfo
sinker Sinker
wMarkUpdater *WatermarkUpdater
wMarkUpdater IWatermarkUpdater
tick *time.Ticker
resetWatermarkFunc func(*DbTableInfo) error
initSnapshotSplitTxn bool
runningReaders *sync.Map

tableDef *plan.TableDef
insTsColIdx, insCompositedPkColIdx int
delTsColIdx, delCompositedPkColIdx int
}

func NewTableReader(
var NewTableReader = func(
cnTxnClient client.TxnClient,
cnEngine engine.Engine,
mp *mpool.MPool,
packerPool *fileservice.Pool[*types.Packer],
info *DbTableInfo,
sinker Sinker,
wMarkUpdater *WatermarkUpdater,
wMarkUpdater IWatermarkUpdater,
tableDef *plan.TableDef,
resetWatermarkFunc func(*DbTableInfo) error,
initSnapshotSplitTxn bool,
runningReaders *sync.Map,
) Reader {
reader := &tableReader{
cnTxnClient: cnTxnClient,
Expand All @@ -72,6 +75,7 @@ func NewTableReader(
tick: time.NewTicker(200 * time.Millisecond),
resetWatermarkFunc: resetWatermarkFunc,
initSnapshotSplitTxn: initSnapshotSplitTxn,
runningReaders: runningReaders,
tableDef: tableDef,
}

Expand All @@ -98,14 +102,16 @@ func (reader *tableReader) Run(
logutil.Infof("cdc tableReader(%v).Run: start", reader.info)
defer func() {
if err != nil {
if err = reader.wMarkUpdater.SaveErrMsg(reader.info.SourceTblIdStr, err.Error()); err != nil {
if err = reader.wMarkUpdater.SaveErrMsg(reader.info.SourceDbName, reader.info.SourceTblName, err.Error()); err != nil {
logutil.Infof("cdc tableReader(%v).Run: save err msg failed, err: %v", reader.info, err)
}
}
reader.Close()
reader.runningReaders.Delete(GenDbTblKey(reader.info.SourceDbName, reader.info.SourceTblName))
logutil.Infof("cdc tableReader(%v).Run: end", reader.info)
}()

reader.runningReaders.Store(GenDbTblKey(reader.info.SourceDbName, reader.info.SourceTblName), reader)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -192,7 +198,7 @@ func (reader *tableReader) readTableWithTxn(
//step2 : define time range
// from = last wmark
// to = txn operator snapshot ts
fromTs := reader.wMarkUpdater.GetFromMem(reader.info.SourceTblIdStr)
fromTs := reader.wMarkUpdater.GetFromMem(reader.info.SourceDbName, reader.info.SourceTblName)
toTs := types.TimestampToTS(GetSnapshotTS(txnOp))
start := time.Now()
changes, err = CollectChanges(ctx, rel, fromTs, toTs, reader.mp)
Expand Down Expand Up @@ -264,7 +270,7 @@ func (reader *tableReader) readTableWithTxn(
}

if err == nil {
reader.wMarkUpdater.UpdateMem(reader.info.SourceTblIdStr, toTs)
reader.wMarkUpdater.UpdateMem(reader.info.SourceDbName, reader.info.SourceTblName, toTs)
}
} else { // has error already
if hasBegin {
Expand Down
62 changes: 48 additions & 14 deletions pkg/cdc/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ import (

func TestNewTableReader(t *testing.T) {
type args struct {
cnTxnClient client.TxnClient
cnEngine engine.Engine
mp *mpool.MPool
packerPool *fileservice.Pool[*types.Packer]
info *DbTableInfo
sinker Sinker
wMarkUpdater *WatermarkUpdater
tableDef *plan.TableDef
restartFunc func(*DbTableInfo) error
cnTxnClient client.TxnClient
cnEngine engine.Engine
mp *mpool.MPool
packerPool *fileservice.Pool[*types.Packer]
info *DbTableInfo
sinker Sinker
wMarkUpdater *WatermarkUpdater
tableDef *plan.TableDef
restartFunc func(*DbTableInfo) error
runningReaders *sync.Map
}

tableDef := &plan.TableDef{
Expand Down Expand Up @@ -76,7 +77,29 @@ func TestNewTableReader(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NotNilf(t, NewTableReader(tt.args.cnTxnClient, tt.args.cnEngine, tt.args.mp, tt.args.packerPool, tt.args.info, tt.args.sinker, tt.args.wMarkUpdater, tt.args.tableDef, tt.args.restartFunc, true), "NewTableReader(%v, %v, %v, %v, %v, %v, %v, %v, %v)", tt.args.cnTxnClient, tt.args.cnEngine, tt.args.mp, tt.args.packerPool, tt.args.info, tt.args.sinker, tt.args.wMarkUpdater, tt.args.tableDef, tt.args.restartFunc)
assert.NotNilf(t, NewTableReader(
tt.args.cnTxnClient,
tt.args.cnEngine,
tt.args.mp,
tt.args.packerPool,
tt.args.info,
tt.args.sinker,
tt.args.wMarkUpdater,
tt.args.tableDef,
tt.args.restartFunc,
true,
tt.args.runningReaders,
),
"NewTableReader(%v,%v,%v,%v,%v,%v,%v,%v,%v)",
tt.args.cnTxnClient,
tt.args.cnEngine,
tt.args.mp,
tt.args.packerPool,
tt.args.info,
tt.args.sinker,
tt.args.wMarkUpdater,
tt.args.tableDef,
tt.args.restartFunc)
})
}
}
Expand Down Expand Up @@ -218,6 +241,7 @@ func Test_tableReader_Run(t *testing.T) {
insCompositedPkColIdx: tt.fields.insCompositedPkColIdx,
delTsColIdx: tt.fields.delTsColIdx,
delCompositedPkColIdx: tt.fields.delCompositedPkColIdx,
runningReaders: &sync.Map{},
}
reader.Run(tt.args.ctx, tt.args.ar)
})
Expand All @@ -238,6 +262,11 @@ func Test_tableReader_Run_StaleRead(t *testing.T) {
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
resetWatermarkFunc: func(*DbTableInfo) error { return nil },
runningReaders: &sync.Map{},
info: &DbTableInfo{
SourceDbName: "db1",
SourceTblName: "t1",
},
}
reader.Run(ctx, NewCdcActiveRoutine())
cancel()
Expand All @@ -254,10 +283,12 @@ func Test_tableReader_Run_StaleRead(t *testing.T) {
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
info: &DbTableInfo{
SourceTblIdStr: "1_0",
SourceDbName: "db1",
SourceTblName: "t1",
},
wMarkUpdater: u,
resetWatermarkFunc: func(*DbTableInfo) error { return moerr.NewInternalErrorNoCtx("") },
runningReaders: &sync.Map{},
}
reader.Run(ctx, NewCdcActiveRoutine())
cancel()
Expand All @@ -283,10 +314,12 @@ func Test_tableReader_Run_NonStaleReadErr(t *testing.T) {
tick: time.NewTicker(time.Millisecond * 300),
sinker: NewConsoleSinker(nil, nil),
info: &DbTableInfo{
SourceTblIdStr: "1_0",
SourceDbName: "db1",
SourceTblName: "t1",
},
wMarkUpdater: u,
resetWatermarkFunc: func(*DbTableInfo) error { return nil },
runningReaders: &sync.Map{},
}
reader.Run(ctx, NewCdcActiveRoutine())
}
Expand Down Expand Up @@ -319,15 +352,16 @@ func Test_tableReader_readTableWithTxn(t *testing.T) {

reader := &tableReader{
info: &DbTableInfo{
SourceTblName: "t1",
SourceTblIdStr: "123",
SourceDbName: "db1",
SourceTblName: "t1",
},
packerPool: pool,
wMarkUpdater: watermarkUpdater,
mp: mp,
insTsColIdx: 0,
insCompositedPkColIdx: 3,
sinker: NewConsoleSinker(nil, nil),
runningReaders: &sync.Map{},
}

getRelationByIdStub := gostub.Stub(&GetRelationById, func(_ context.Context, _ engine.Engine, _ client.TxnOperator, _ uint64) (string, string, engine.Relation, error) {
Expand Down
47 changes: 32 additions & 15 deletions pkg/cdc/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import (

const (
// sqlBufReserved leave 5 bytes for mysql driver
sqlBufReserved = 5
sqlPrintLen = 200
fakeSql = "fakeSql"
sqlBufReserved = 5
sqlPrintLen = 200
fakeSql = "fakeSql"
createTable = "create table"
createTableIfNotExists = "create table if not exists"
)

var (
Expand All @@ -47,10 +49,10 @@ var (
dummy = []byte("")
)

func NewSinker(
var NewSinker = func(
sinkUri UriInfo,
dbTblInfo *DbTableInfo,
watermarkUpdater *WatermarkUpdater,
watermarkUpdater IWatermarkUpdater,
tableDef *plan.TableDef,
retryTimes int,
retryDuration time.Duration,
Expand All @@ -68,19 +70,34 @@ func NewSinker(
return nil, err
}

ctx := context.Background()
padding := strings.Repeat(" ", sqlBufReserved)
// create db
_ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbTblInfo.SinkDbName)))
// use db
_ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("use `%s`", dbTblInfo.SinkDbName)))
// create table
createSql := strings.TrimSpace(dbTblInfo.SourceCreateSql)
if len(createSql) < len(createTableIfNotExists) || !strings.EqualFold(createSql[:len(createTableIfNotExists)], createTableIfNotExists) {
createSql = createTableIfNotExists + createSql[len(createTable):]
}
createSql = strings.ReplaceAll(createSql, dbTblInfo.SourceDbName, dbTblInfo.SinkDbName)
createSql = strings.ReplaceAll(createSql, dbTblInfo.SourceTblName, dbTblInfo.SinkTblName)
_ = sink.Send(ctx, ar, []byte(padding+createSql))

return NewMysqlSinker(sink, dbTblInfo, watermarkUpdater, tableDef, ar, maxSqlLength), nil
}

var _ Sinker = new(consoleSinker)

type consoleSinker struct {
dbTblInfo *DbTableInfo
watermarkUpdater *WatermarkUpdater
watermarkUpdater IWatermarkUpdater
}

func NewConsoleSinker(
dbTblInfo *DbTableInfo,
watermarkUpdater *WatermarkUpdater,
watermarkUpdater IWatermarkUpdater,
) Sinker {
return &consoleSinker{
dbTblInfo: dbTblInfo,
Expand Down Expand Up @@ -142,7 +159,7 @@ var _ Sinker = new(mysqlSinker)
type mysqlSinker struct {
mysql Sink
dbTblInfo *DbTableInfo
watermarkUpdater *WatermarkUpdater
watermarkUpdater IWatermarkUpdater
ar *ActiveRoutine

// buf of sql statement
Expand Down Expand Up @@ -179,7 +196,7 @@ type mysqlSinker struct {
var NewMysqlSinker = func(
mysql Sink,
dbTblInfo *DbTableInfo,
watermarkUpdater *WatermarkUpdater,
watermarkUpdater IWatermarkUpdater,
tableDef *plan.TableDef,
ar *ActiveRoutine,
maxSqlLength uint64,
Expand Down Expand Up @@ -288,7 +305,7 @@ func (s *mysqlSinker) Run(ctx context.Context, ar *ActiveRoutine) {
}

func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) {
watermark := s.watermarkUpdater.GetFromMem(s.dbTblInfo.SourceTblIdStr)
watermark := s.watermarkUpdater.GetFromMem(s.dbTblInfo.SourceDbName, s.dbTblInfo.SourceTblName)
if data.toTs.LE(&watermark) {
logutil.Errorf("cdc mysqlSinker(%v): unexpected watermark: %s, current watermark: %s",
s.dbTblInfo, data.toTs.ToString(), watermark.ToString())
Expand Down Expand Up @@ -417,7 +434,7 @@ func (s *mysqlSinker) sinkSnapshot(ctx context.Context, bat *batch.Batch) {
}

// step3: append to sqlBuf, send sql if sqlBuf is full
if err = s.appendSqlBuf(ctx, InsertRow); err != nil {
if err = s.appendSqlBuf(InsertRow); err != nil {
s.err.Store(err)
return
}
Expand Down Expand Up @@ -501,7 +518,7 @@ func (s *mysqlSinker) sinkInsert(ctx context.Context, insertIter *atomicBatchRow
}

// step3: append to sqlBuf
if err = s.appendSqlBuf(ctx, InsertRow); err != nil {
if err = s.appendSqlBuf(InsertRow); err != nil {
return
}

Expand Down Expand Up @@ -530,7 +547,7 @@ func (s *mysqlSinker) sinkDelete(ctx context.Context, deleteIter *atomicBatchRow
}

// step3: append to sqlBuf
if err = s.appendSqlBuf(ctx, DeleteRow); err != nil {
if err = s.appendSqlBuf(DeleteRow); err != nil {
return
}

Expand All @@ -539,7 +556,7 @@ func (s *mysqlSinker) sinkDelete(ctx context.Context, deleteIter *atomicBatchRow

// appendSqlBuf appends rowBuf to sqlBuf if not exceed its cap
// otherwise, send sql to downstream first, then reset sqlBuf and append
func (s *mysqlSinker) appendSqlBuf(ctx context.Context, rowType RowType) (err error) {
func (s *mysqlSinker) appendSqlBuf(rowType RowType) (err error) {
// insert suffix: `;`, delete suffix: `);`
suffixLen := 1
if rowType == DeleteRow {
Expand Down Expand Up @@ -668,8 +685,8 @@ var NewMysqlSink = func(
return ret, err
}

// Send must leave 5 bytes at the head of sqlBuf
func (s *mysqlSink) Send(ctx context.Context, ar *ActiveRoutine, sqlBuf []byte) error {
// must leave 5 bytes at the head of sqlBuf
reuseQueryArg := sql.NamedArg{
Name: mysql.ReuseQueryBuf,
Value: sqlBuf,
Expand Down
Loading

0 comments on commit 8debb2a

Please sign in to comment.