Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
remove the behaviour "purge relay when stop source"; add test
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Feb 1, 2021
1 parent 88656cd commit 379151d
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 22 deletions.
11 changes: 3 additions & 8 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (h *realRelayHolder) Operate(ctx context.Context, op pb.RelayOp) error {
return terror.ErrWorkerRelayOperNotSupport.Generate(op.String())
}

func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) pauseRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
if h.stage != pb.Stage_Running {
h.Unlock()
Expand All @@ -210,7 +210,7 @@ func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error {
return nil
}

func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) resumeRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
defer h.Unlock()
if h.stage != pb.Stage_Paused {
Expand All @@ -225,7 +225,7 @@ func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error
return nil
}

func (h *realRelayHolder) stopRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) stopRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
if h.stage == pb.Stage_Stopped {
h.Unlock()
Expand All @@ -234,11 +234,6 @@ func (h *realRelayHolder) stopRelay(ctx context.Context, op pb.RelayOp) error {
h.stage = pb.Stage_Stopped
h.Unlock() // unlock to make `run` can return

// purge relay dir when delete source
if err := h.relay.PurgeRelayDir(); err != nil {
return err
}

// now, when try to stop relay unit, we close relay holder
h.Close()
return nil
Expand Down
1 change: 0 additions & 1 deletion dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,6 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return err
}
startRelay = !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running
// TODO: try PurgeRelayDir if not found relay, but if not found, that's not bound so we didn't know the path of dir
}
go func() {
w.Start(startRelay)
Expand Down
24 changes: 13 additions & 11 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,32 +221,34 @@ func (r *Relay) process(ctx context.Context) error {

// resuming will take the risk that upstream has purge the binlog relay is needed.
// when this worker is down, HA may schedule the source to other workers and forward the sync progress,
// when the source is scheduled back to this worker, we could start relay from sync checkpoint's location which
// is newer, and purge the outdated relay logs.
checkpointIsNewer := false
checkpointBinlogName := r.cfg.BinLogName
checkpointBinlogGset, err2 := gtid.ParserGTID(r.cfg.Flavor, r.cfg.BinlogGTID)
// and then when the source is scheduled back to this worker, we could start relay from sync checkpoint's
// location which is newer, and now could purge the outdated relay logs.
//
// locations in `r.cfg` is set to min needed location of subtasks (higher priority) or source config specified
isRelayMetaOutdated := false
neededBinlogName := r.cfg.BinLogName
neededBinlogGset, err2 := gtid.ParserGTID(r.cfg.Flavor, r.cfg.BinlogGTID)
if err2 != nil {
return err2
}
if r.cfg.EnableGTID {
_, metaGset := r.meta.GTID()
if checkpointBinlogGset.Contain(metaGset) && !checkpointBinlogGset.Equal(metaGset) {
checkpointIsNewer = true
if neededBinlogGset.Contain(metaGset) && !neededBinlogGset.Equal(metaGset) {
isRelayMetaOutdated = true
}
} else {
_, metaPos := r.meta.Pos()
if checkpointBinlogName > metaPos.Name {
checkpointIsNewer = true
if neededBinlogName > metaPos.Name {
isRelayMetaOutdated = true
}
}

if checkpointIsNewer {
if isRelayMetaOutdated {
err2 = r.PurgeRelayDir()
if err2 != nil {
return err2
}
err2 = r.SaveMeta(mysql.Position{Name: checkpointBinlogName, Pos: binlog.MinPosition.Pos}, checkpointBinlogGset)
err2 = r.SaveMeta(mysql.Position{Name: neededBinlogName, Pos: binlog.MinPosition.Pos}, neededBinlogGset)
if err2 != nil {
return err2
}
Expand Down
12 changes: 10 additions & 2 deletions tests/ha_cases/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -736,15 +736,23 @@ function test_last_bound() {
# kill 12, start 34, kill 34
kill_2_worker_ensure_unbound 1 2
start_2_worker_ensure_bound 3 4
# let other workers rather then 1 2 forward the syncer's progress
run_sql_file_withdb $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 $ha_test
run_sql "flush logs;" $MYSQL_PORT2 $MYSQL_PASSWORD2
run_sql_file_withdb $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 $ha_test
sleep 1
kill_2_worker_ensure_unbound 3 4

# start 1 then 2
start_2_worker_ensure_bound 1 2

# check
check_bound
check_log_contains $WORK_DIR/worker1/log/dm-worker.log "will try purge whole relay dir for new relay log" 1
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "will try purge whole relay dir for new relay log" 1
# other workers has forwarded the sync progress, if moved to a new binlog file, original relay log could be removed
num1=`grep "will try purge whole relay dir for new relay log" $WORK_DIR/worker1/log/dm-worker.log | wc -l`
num2=`grep "will try purge whole relay dir for new relay log" $WORK_DIR/worker2/log/dm-worker.log | wc -l`
echo "num1$num1 num2$num2"
[[ $num1+$num2 -eq 3 ]]

echo "[$(date)] <<<<<< finish test_last_bound >>>>>>"
}
Expand Down

0 comments on commit 379151d

Please sign in to comment.