Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
schemaTracker: lazy init table info in schema tracker (#1271) (#1274)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: GMHDBJD <[email protected]>
  • Loading branch information
ti-srebot and GMHDBJD authored Nov 4, 2020
1 parent 86e7e4c commit c139e91
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 22 deletions.
2 changes: 1 addition & 1 deletion dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
}
defer checkpoint.Close()

err = checkpoint.Load(tctx, nil)
err = checkpoint.Load(tctx)
if err != nil {
return nil, errors.Annotate(err, "get min position from checkpoint")
}
Expand Down
33 changes: 20 additions & 13 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ type CheckPoint interface {
Clear(tctx *tcontext.Context) error

// Load loads all checkpoints saved by CheckPoint
Load(tctx *tcontext.Context, schemaTracker *schema.Tracker) error
Load(tctx *tcontext.Context) error

// LoadMeta loads checkpoints from meta config item or file
LoadMeta() error
Expand Down Expand Up @@ -233,6 +233,10 @@ type CheckPoint interface {
// corresponding to Meta.Check
CheckGlobalPoint() bool

// GetFlushedTableInfo gets flushed table info
// use for lazy create table in schemaTracker
GetFlushedTableInfo(schema string, table string) *model.TableInfo

// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
Rollback(schemaTracker *schema.Tracker)

Expand Down Expand Up @@ -682,7 +686,7 @@ func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error {
}

// Load implements CheckPoint.Load
func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.Tracker) error {
func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
cp.Lock()
defer cp.Unlock()

Expand Down Expand Up @@ -771,29 +775,20 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T
continue // skip global checkpoint
}

var ti model.TableInfo
var ti *model.TableInfo
if !bytes.Equal(tiBytes, []byte("null")) {
// only create table if `table_info` is not `null`.
if err = json.Unmarshal(tiBytes, &ti); err != nil {
return terror.ErrSchemaTrackerInvalidJSON.Delegate(err, cpSchema, cpTable)
}

if schemaTracker != nil {
if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil {
return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema)
}
if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil {
return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable)
}
}
}

mSchema, ok := cp.points[cpSchema]
if !ok {
mSchema = make(map[string]*binlogPoint)
cp.points[cpSchema] = mSchema
}
mSchema[cpTable] = newBinlogPoint(location, location, &ti, &ti, cp.cfg.EnableGTID)
mSchema[cpTable] = newBinlogPoint(location, location, ti, ti, cp.cfg.EnableGTID)
}

return terror.WithScope(terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError), terror.ScopeDownstream)
Expand Down Expand Up @@ -914,3 +909,15 @@ func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, *binlog.Location,

return loc, loc2, err
}

// GetFlushedTableInfo implements CheckPoint.GetFlushedTableInfo
func (cp *RemoteCheckPoint) GetFlushedTableInfo(schema string, table string) *model.TableInfo {
cp.Lock()
defer cp.Unlock()
if tables, ok := cp.points[schema]; ok {
if point, ok2 := tables[table]; ok2 {
return point.flushedTI
}
}
return nil
}
14 changes: 7 additions & 7 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {

// try load, but should load nothing
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err := cp.Load(tctx, s.tracker)
err := cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, binlog.MinPosition)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)
Expand All @@ -165,7 +165,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
}

s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
cp.SaveGlobalPoint(binlog.Location{Position: pos1})

Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
cp.SaveGlobalPoint(binlog.Location{Position: pos3})
columns := []string{"cp_schema", "cp_table", "binlog_name", "binlog_pos", "binlog_gtid", "exit_safe_binlog_name", "exit_safe_binlog_pos", "exit_safe_binlog_gtid", "table_info", "is_global"}
s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(columns).AddRow("", "", pos2.Name, pos2.Pos, "", "", 0, "", "null", true))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos2)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos2)
Expand All @@ -251,7 +251,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)

s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, binlog.MinPosition)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)
Expand All @@ -277,7 +277,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos1)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1)
Expand Down Expand Up @@ -316,7 +316,7 @@ SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos1)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1)
Expand Down Expand Up @@ -468,7 +468,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(
sqlmock.NewRows(columns).AddRow("", "", pos2.Name, pos2.Pos, gs.String(), pos2.Name, pos2.Pos, gs.String(), "null", true).
AddRow(schema, table, pos2.Name, pos2.Pos, gs.String(), "", 0, "", tiBytes, false))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint(), DeepEquals, binlog.InitLocation(pos2, gs))
rcp = cp.(*RemoteCheckPoint)
Expand Down
11 changes: 10 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
}
rollbackHolder.Add(fr.FuncRollback{Name: "close-checkpoint", Fn: s.checkpoint.Close})

err = s.checkpoint.Load(tctx, s.schemaTracker)
err = s.checkpoint.Load(tctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -588,6 +588,15 @@ func (s *Syncer) getTable(origSchema, origTable, renamedSchema, renamedTable str
return nil, terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, origSchema)
}

// if table already exists in checkpoint, create it in schema tracker
if ti = s.checkpoint.GetFlushedTableInfo(origSchema, origTable); ti != nil {
if err = s.schemaTracker.CreateTableIfNotExists(origSchema, origTable, ti); err != nil {
return nil, terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, origSchema, origTable)
}
s.tctx.L().Debug("lazy init table info in schema tracker", zap.String("schema", origSchema), zap.String("table", origTable))
return ti, nil
}

// in optimistic shard mode, we should try to get the init schema (the one before modified by other tables) first.
if s.cfg.ShardMode == config.ShardOptimistic {
ti, err = s.trackInitTableInfoOptimistic(origSchema, origTable, renamedSchema, renamedTable)
Expand Down
47 changes: 47 additions & 0 deletions tests/start_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,54 @@ function prepare_data() {
done
}

function lazy_init_tracker() {
run_sql 'DROP DATABASE if exists start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'CREATE DATABASE start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1
for j in $(seq 100); do
run_sql "CREATE TABLE start_task.t$j(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "INSERT INTO start_task.t$j VALUES (1,10001),(1,10011);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

dmctl_start_task_standalone "$cur/conf/dm-task.yaml" "--remove-meta"

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# only table 1-50 flush checkpoint
for j in $(seq 50); do
run_sql "INSERT INTO start_task.t$j VALUES (2,20002),(2,20022);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test" \
"\"result\": true" 2
dmctl_start_task_standalone "$cur/conf/dm-task.yaml"

for j in $(seq 100); do
run_sql "INSERT INTO start_task.t$j VALUES (3,30003),(3,30033);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "INSERT INTO start_task.t$j VALUES (4,40004),(4,40044);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 20

check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t50' 1
check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t51'

cleanup_data start_task
cleanup_process $*
}

function run() {
lazy_init_tracker
failpoints=(
# 1152 is ErrAbortingConnection
"github.com/pingcap/dm/pkg/utils/FetchTargetDoTablesFailed=return(1152)"
Expand Down

0 comments on commit c139e91

Please sign in to comment.