diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 6dcbe0d859..ebd9ea37ae 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -191,7 +191,7 @@ func (h *realRelayHolder) Operate(ctx context.Context, op pb.RelayOp) error { return terror.ErrWorkerRelayOperNotSupport.Generate(op.String()) } -func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error { +func (h *realRelayHolder) pauseRelay(_ context.Context, op pb.RelayOp) error { h.Lock() if h.stage != pb.Stage_Running { h.Unlock() @@ -210,7 +210,7 @@ func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error { return nil } -func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error { +func (h *realRelayHolder) resumeRelay(_ context.Context, op pb.RelayOp) error { h.Lock() defer h.Unlock() if h.stage != pb.Stage_Paused { @@ -225,7 +225,7 @@ func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error return nil } -func (h *realRelayHolder) stopRelay(ctx context.Context, op pb.RelayOp) error { +func (h *realRelayHolder) stopRelay(_ context.Context, op pb.RelayOp) error { h.Lock() if h.stage == pb.Stage_Stopped { h.Unlock() diff --git a/pkg/gtid/gtid.go b/pkg/gtid/gtid.go index f43cb67fe1..cf4a2b91ae 100644 --- a/pkg/gtid/gtid.go +++ b/pkg/gtid/gtid.go @@ -42,6 +42,9 @@ type Set interface { // should become `00c04543-f584-11e9-a765-0242ac120002:1-60`. Truncate(end Set) error + // ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed + ResetStart() bool + String() string } @@ -260,7 +263,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error { // ResetStart resets the start part of GTID sets, // like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`. // return `true` if reset real happen. -// NOTE: for MariaDB GTID, no this function exists because its format is `domainID-serverID-SeqNum`. func (g *MySQLGTIDSet) ResetStart() bool { if g == nil || g.set == nil { return false @@ -442,6 +444,11 @@ func (m *MariadbGTIDSet) Truncate(end Set) error { return nil } +// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`. +func (m *MariadbGTIDSet) ResetStart() bool { + return false +} + func (m *MariadbGTIDSet) String() string { if m.set == nil { return "" diff --git a/pkg/gtid/gtid_test.go b/pkg/gtid/gtid_test.go index b51fe146f3..1f0084d0e2 100644 --- a/pkg/gtid/gtid_test.go +++ b/pkg/gtid/gtid_test.go @@ -402,8 +402,9 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) { } } -func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) { +func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) { var ( + gMaria, _ = ParserGTID("", "1-2-3") flavor = "mysql" gNil *MySQLGTIDSet gEmpty, _ = ParserGTID(flavor, "") @@ -413,24 +414,29 @@ func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) { g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100") g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100") g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100") + g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100") ) + c.Assert(gMaria.ResetStart(), IsFalse) c.Assert(gNil.ResetStart(), IsFalse) - c.Assert(gEmpty.(*MySQLGTIDSet).ResetStart(), IsFalse) + c.Assert(gEmpty.ResetStart(), IsFalse) - c.Assert(g1.(*MySQLGTIDSet).ResetStart(), IsFalse) + c.Assert(g1.ResetStart(), IsFalse) - c.Assert(g2.(*MySQLGTIDSet).ResetStart(), IsTrue) + c.Assert(g2.ResetStart(), IsTrue) c.Assert(g2.Equal(g1), IsTrue) - c.Assert(g3.(*MySQLGTIDSet).ResetStart(), IsTrue) + c.Assert(g3.ResetStart(), IsTrue) c.Assert(g3.Equal(g1), IsTrue) - c.Assert(g4.(*MySQLGTIDSet).ResetStart(), IsFalse) + c.Assert(g4.ResetStart(), IsFalse) - c.Assert(g5.(*MySQLGTIDSet).ResetStart(), IsTrue) + c.Assert(g5.ResetStart(), IsTrue) c.Assert(g5.Equal(g4), IsTrue) - c.Assert(g6.(*MySQLGTIDSet).ResetStart(), IsTrue) + c.Assert(g6.ResetStart(), IsTrue) c.Assert(g6.Equal(g4), IsTrue) + + c.Assert(g7.ResetStart(), IsTrue) + // TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon } diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index 7fa5cb9485..a6d362680a 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -185,10 +185,8 @@ func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reade defer func() { if err == nil && gs != nil { oldGs := gs.Clone() - if mysqlGs, ok := gs.(*gtid.MySQLGTIDSet); ok { - if mysqlGs.ResetStart() { - tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", mysqlGs)) - } + if gs.ResetStart() { + tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs)) } } }() diff --git a/relay/meta.go b/relay/meta.go index 9fff82ab8c..e525ec13db 100644 --- a/relay/meta.go +++ b/relay/meta.go @@ -194,7 +194,7 @@ func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, en lm.BinlogGTID = binlogGTID lm.gset = gset - return true, nil + return true, lm.doFlush() } // Save implements Meta.Save @@ -222,8 +222,8 @@ func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error { // Flush implements Meta.Flush func (lm *LocalMeta) Flush() error { - lm.RLock() - defer lm.RUnlock() + lm.Lock() + defer lm.Unlock() return lm.doFlush() } diff --git a/relay/meta_test.go b/relay/meta_test.go index 9e5ca98073..d12de3d3b2 100644 --- a/relay/meta_test.go +++ b/relay/meta_test.go @@ -16,6 +16,7 @@ package relay import ( "io/ioutil" "os" + "path" "strings" . "github.com/pingcap/check" @@ -96,7 +97,16 @@ func (r *testMetaSuite) TestLocalMeta(c *C) { dirty := lm.Dirty() c.Assert(dirty, IsFalse) + // set currentUUID because lm.doFlush need it + currentUUID := "uuid.000001" + c.Assert(os.MkdirAll(path.Join(dir, currentUUID), 0777), IsNil) + setLocalMetaWithCurrentUUID := func() { + lm = NewLocalMeta("mysql", dir) + lm.(*LocalMeta).currentUUID = currentUUID + } + // adjust to start pos + setLocalMetaWithCurrentUUID() latestBinlogName := "mysql-bin.000009" latestGTIDStr := "85ab69d1-b21f-11e6-9c5e-64006a8978d2:45-57" cs0 := cases[0] @@ -104,44 +114,50 @@ func (r *testMetaSuite) TestLocalMeta(c *C) { c.Assert(err, IsNil) c.Assert(adjusted, IsTrue) uuid, pos = lm.Pos() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(pos.Name, Equals, cs0.pos.Name) uuid, gset = lm.GTID() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(gset.String(), Equals, "") // adjust to start pos with enableGTID + setLocalMetaWithCurrentUUID() adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), true, latestBinlogName, latestGTIDStr) c.Assert(err, IsNil) c.Assert(adjusted, IsTrue) uuid, pos = lm.Pos() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(pos.Name, Equals, cs0.pos.Name) uuid, gset = lm.GTID() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(gset, DeepEquals, cs0.gset) // adjust to the last binlog if start pos is empty + setLocalMetaWithCurrentUUID() adjusted, err = lm.AdjustWithStartPos("", cs0.gset.String(), false, latestBinlogName, latestGTIDStr) c.Assert(err, IsNil) c.Assert(adjusted, IsTrue) uuid, pos = lm.Pos() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(pos.Name, Equals, latestBinlogName) uuid, gset = lm.GTID() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(gset.String(), Equals, "") + setLocalMetaWithCurrentUUID() adjusted, err = lm.AdjustWithStartPos("", "", true, latestBinlogName, latestGTIDStr) c.Assert(err, IsNil) c.Assert(adjusted, IsTrue) uuid, pos = lm.Pos() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(pos.Name, Equals, latestBinlogName) uuid, gset = lm.GTID() - c.Assert(uuid, Equals, "") + c.Assert(uuid, Equals, currentUUID) c.Assert(gset.String(), Equals, latestGTIDStr) + // reset + lm.(*LocalMeta).currentUUID = "" + for _, cs := range cases { err = lm.AddDir(cs.uuid, nil, nil, 0) c.Assert(err, IsNil) @@ -209,7 +225,7 @@ func (r *testMetaSuite) TestLocalMeta(c *C) { dirty = lm.Dirty() c.Assert(dirty, IsFalse) - currentUUID, pos := lm.Pos() + currentUUID, pos = lm.Pos() c.Assert(currentUUID, Equals, cs.uuidWithSuffix) c.Assert(pos, DeepEquals, cs.pos) diff --git a/relay/relay.go b/relay/relay.go index 7608398694..7c7675af14 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -204,19 +204,55 @@ func (r *Relay) process(ctx context.Context) error { return err } - if isNew || r.cfg.UUIDSuffix > 0 { + if isNew { // re-setup meta for new server or new source err = r.reSetupMeta(ctx) if err != nil { return err } } else { + // connected to last source r.updateMetricsRelaySubDirIndex() // if not a new server, try to recover the latest relay log file. err = r.tryRecoverLatestFile(ctx, parser2) if err != nil { return err } + + // resuming will take the risk that upstream has purge the binlog relay is needed. + // when this worker is down, HA may schedule the source to other workers and forward the sync progress, + // and then when the source is scheduled back to this worker, we could start relay from sync checkpoint's + // location which is newer, and now could purge the outdated relay logs. + // + // locations in `r.cfg` is set to min needed location of subtasks (higher priority) or source config specified + isRelayMetaOutdated := false + neededBinlogName := r.cfg.BinLogName + neededBinlogGset, err2 := gtid.ParserGTID(r.cfg.Flavor, r.cfg.BinlogGTID) + if err2 != nil { + return err2 + } + if r.cfg.EnableGTID { + _, metaGset := r.meta.GTID() + if neededBinlogGset.Contain(metaGset) && !neededBinlogGset.Equal(metaGset) { + isRelayMetaOutdated = true + } + } else { + _, metaPos := r.meta.Pos() + if neededBinlogName > metaPos.Name { + isRelayMetaOutdated = true + } + } + + if isRelayMetaOutdated { + err2 = r.PurgeRelayDir() + if err2 != nil { + return err2 + } + err2 = r.SaveMeta(mysql.Position{Name: neededBinlogName, Pos: binlog.MinPosition.Pos}, neededBinlogGset) + if err2 != nil { + return err2 + } + } } reader2, err := r.setUpReader(ctx) @@ -279,6 +315,7 @@ func (r *Relay) process(ctx context.Context) error { func (r *Relay) PurgeRelayDir() error { dir := r.cfg.RelayDir d, err := os.Open(dir) + r.logger.Info("will try purge whole relay dir for new relay log", zap.String("relayDir", dir)) // fail to open dir, return directly if err != nil { if err == os.ErrNotExist { @@ -334,26 +371,27 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser // NOTE: recover a relay log file with too many binlog events may take a little long time. result, err := writer2.Recover(ctx) if err == nil { - if result.Recovered { + relayLogHasMore := result.LatestPos.Compare(latestPos) > 0 || + (result.LatestGTIDs != nil && !result.LatestGTIDs.Equal(latestGTID) && result.LatestGTIDs.Contain(latestGTID)) + + if result.Truncated || relayLogHasMore { r.logger.Warn("relay log file recovered", zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs)) if result.LatestGTIDs != nil { - if mysqlGS, ok := result.LatestGTIDs.(*gtid.MySQLGTIDSet); ok { - // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, - // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, - // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. - // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like - // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` - // may occur, so we force to reset the START part of any GTID set. - oldGs1 := mysqlGS.Clone() - if mysqlGS.ResetStart() { - r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", mysqlGS)) - // also need to reset start for `latestGTID`. - oldGs2 := latestGTID.Clone() - if latestGTID.(*gtid.MySQLGTIDSet).ResetStart() { - r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID)) - } + gs := result.LatestGTIDs + oldGs1 := gs.Clone() + // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, + // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, + // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. + // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like + // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` + // may occur, so we force to reset the START part of any GTID set. + if gs.ResetStart() { + r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", gs)) + oldGs2 := latestGTID.Clone() + if latestGTID.ResetStart() { + r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID)) } } } @@ -369,12 +407,7 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser if err != nil { return terror.Annotatef(err, "save position %s, GTID sets %v after recovered", result.LatestPos, result.LatestGTIDs) } - } else if result.LatestPos.Compare(latestPos) > 0 || - (result.LatestGTIDs != nil && !result.LatestGTIDs.Equal(latestGTID) && result.LatestGTIDs.Contain(latestGTID)) { - r.logger.Warn("relay log file have more events", - zap.Stringer("after position", latestPos), zap.Stringer("until position", result.LatestPos), log.WrapStringerField("after GTID set", latestGTID), log.WrapStringerField("until GTID set", result.LatestGTIDs)) } - } return terror.Annotatef(err, "recover for UUID %s with config %+v", uuid, cfg) } @@ -585,21 +618,21 @@ func (r *Relay) reSetupMeta(ctx context.Context) error { } _, pos := r.meta.Pos() - _, gtid := r.meta.GTID() + _, gs := r.meta.GTID() if r.cfg.EnableGTID { // Adjust given gtid // This means we always pull the binlog from the beginning of file. - gtid, err = r.adjustGTID(ctx, gtid) + gs, err = r.adjustGTID(ctx, gs) if err != nil { return terror.Annotate(err, "fail to adjust gtid for relay") } - err = r.SaveMeta(pos, gtid) + err = r.SaveMeta(pos, gs) if err != nil { return err } } - r.logger.Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gtid)) + r.logger.Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gs)) r.updateMetricsRelaySubDirIndex() r.logger.Info("resetup meta", zap.String("uuid", uuid)) @@ -629,6 +662,11 @@ func (r *Relay) doIntervalOps(ctx context.Context) { for { select { case <-flushTicker.C: + r.RLock() + if r.closed.Get() { + r.RUnlock() + return + } if r.meta.Dirty() { err := r.FlushMeta() if err != nil { @@ -637,28 +675,43 @@ func (r *Relay) doIntervalOps(ctx context.Context) { r.logger.Info("flush meta finished", zap.Stringer("meta", r.meta)) } } + r.RUnlock() case <-masterStatusTicker.C: + r.RLock() + if r.closed.Get() { + r.RUnlock() + return + } ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout) pos, _, err := utils.GetMasterStatus(ctx2, r.db, r.cfg.Flavor) cancel2() if err != nil { r.logger.Warn("get master status", zap.Error(err)) + r.RUnlock() continue } index, err := binlog.GetFilenameIndex(pos.Name) if err != nil { r.logger.Error("parse binlog file name", zap.String("file name", pos.Name), log.ShortError(err)) + r.RUnlock() continue } relayLogFileGauge.WithLabelValues("master").Set(float64(index)) relayLogPosGauge.WithLabelValues("master").Set(float64(pos.Pos)) + r.RUnlock() case <-trimUUIDsTicker.C: + r.RLock() + if r.closed.Get() { + r.RUnlock() + return + } trimmed, err := r.meta.TrimUUIDs() if err != nil { r.logger.Error("trim UUIDs", zap.Error(err)) } else if len(trimmed) > 0 { r.logger.Info("trim UUIDs", zap.String("UUIDs", strings.Join(trimmed, ";"))) } + r.RUnlock() case <-ctx.Done(): return } @@ -936,6 +989,8 @@ func (r *Relay) setSyncConfig() error { } // AdjustGTID implements Relay.AdjustGTID +// starting sync at returned gset will wholly fetch a binlog from beginning of the file. +// TODO: check if starting fetch at the middle of binlog is also acceptable func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) { // setup a TCP binlog reader (because no relay can be used when upgrading). syncCfg := r.syncerCfg @@ -946,5 +1001,20 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) syncCfg.ServerID = randomServerID tcpReader := binlogReader.NewTCPReader(syncCfg) - return binlogReader.GetPreviousGTIDFromGTIDSet(ctx, tcpReader, gset) + resultGs, err := binlogReader.GetPreviousGTIDFromGTIDSet(ctx, tcpReader, gset) + if err != nil { + return nil, err + } + + // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, + // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, + // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. + // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like + // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` + // may occur, so we force to reset the START part of any GTID set. + oldGs := resultGs.Clone() + if resultGs.ResetStart() { + r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", resultGs)) + } + return resultGs, nil } diff --git a/relay/relay_test.go b/relay/relay_test.go index 3816e66f0d..ed06e41621 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -206,7 +206,7 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { c.Assert(err, IsNil) latestGTID2, err := gtid.ParserGTID(relayCfg.Flavor, latestGTIDStr2) c.Assert(err, IsNil) - g, _, data := genBinlogEventsWithGTIDs(c, relayCfg.Flavor, previousGTIDSet, latestGTID1, latestGTID2) + g, events, data := genBinlogEventsWithGTIDs(c, relayCfg.Flavor, previousGTIDSet, latestGTID1, latestGTID2) // write events into relay log file err = ioutil.WriteFile(filepath.Join(r.meta.Dir(), filename), data, 0600) @@ -214,8 +214,11 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { // all events/transactions are complete, no need to recover c.Assert(r.tryRecoverLatestFile(context.Background(), parser2), IsNil) - // now, we do not update position/GTID set in meta if not recovered - t.verifyMetadata(c, r, uuidWithSuffix, startPos, "", []string{uuidWithSuffix}) + // now, we will update position/GTID set in meta to latest location in relay logs + lastEvent := events[len(events)-1] + pos := startPos + pos.Pos = lastEvent.Header.LogPos + t.verifyMetadata(c, r, uuidWithSuffix, pos, recoverGTIDSetStr, []string{uuidWithSuffix}) // write some invalid data into the relay log file f, err = os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600) diff --git a/relay/writer/file.go b/relay/writer/file.go index 7c56904dff..a74679047c 100644 --- a/relay/writer/file.go +++ b/relay/writer/file.go @@ -411,7 +411,7 @@ func (w *FileWriter) doRecovering(ctx context.Context) (RecoverResult, error) { // in most cases, we think the file is fine, so compare the size is simpler. if fs.Size() == latestPos { return RecoverResult{ - Recovered: false, // no recovering for the file + Truncated: false, LatestPos: gmysql.Position{Name: w.filename.Get(), Pos: uint32(latestPos)}, LatestGTIDs: latestGTIDs, }, nil @@ -431,7 +431,7 @@ func (w *FileWriter) doRecovering(ctx context.Context) (RecoverResult, error) { } return RecoverResult{ - Recovered: true, + Truncated: true, LatestPos: gmysql.Position{Name: w.filename.Get(), Pos: uint32(latestPos)}, LatestGTIDs: latestGTIDs, }, nil diff --git a/relay/writer/file_test.go b/relay/writer/file_test.go index 7d00dc1617..e98599e96e 100644 --- a/relay/writer/file_test.go +++ b/relay/writer/file_test.go @@ -88,7 +88,7 @@ func (t *testFileWriterSuite) TestInterfaceMethods(c *check.C) { // recover rres, err := w.Recover(context.Background()) c.Assert(err, check.IsNil) - c.Assert(rres.Recovered, check.IsFalse) + c.Assert(rres.Truncated, check.IsFalse) // write event res, err := w.WriteEvent(ev) @@ -599,7 +599,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) { // try recover, but in fact do nothing result, err := w.Recover(context.Background()) c.Assert(err, check.IsNil) - c.Assert(result.Recovered, check.IsFalse) + c.Assert(result.Truncated, check.IsFalse) c.Assert(result.LatestPos, check.DeepEquals, expectedPos) c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) @@ -629,7 +629,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) { // try recover, truncate the incomplete event result, err = w.Recover(context.Background()) c.Assert(err, check.IsNil) - c.Assert(result.Recovered, check.IsTrue) + c.Assert(result.Truncated, check.IsTrue) c.Assert(result.LatestPos, check.DeepEquals, expectedPos) c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) @@ -657,7 +657,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) { // try recover, truncate the incomplete transaction result, err = w.Recover(context.Background()) c.Assert(err, check.IsNil) - c.Assert(result.Recovered, check.IsTrue) + c.Assert(result.Truncated, check.IsTrue) c.Assert(result.LatestPos, check.DeepEquals, expectedPos) c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) @@ -688,7 +688,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) { c.Assert(err, check.IsNil) result, err = w.Recover(context.Background()) c.Assert(err, check.IsNil) - c.Assert(result.Recovered, check.IsFalse) + c.Assert(result.Truncated, check.IsFalse) c.Assert(result.LatestPos, check.DeepEquals, expectedPos) c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) @@ -715,7 +715,7 @@ func (t *testFileWriterSuite) TestRecoverMySQLNone(c *check.C) { // no file specified to recover result, err := w1.Recover(context.Background()) c.Assert(err, check.IsNil) - c.Assert(result.Recovered, check.IsFalse) + c.Assert(result.Truncated, check.IsFalse) cfg.Filename = "mysql-bin.000001" w2 := NewFileWriter(log.L(), cfg, t.parser) @@ -725,5 +725,5 @@ func (t *testFileWriterSuite) TestRecoverMySQLNone(c *check.C) { // file not exist, no need to recover result, err = w2.Recover(context.Background()) c.Assert(err, check.IsNil) - c.Assert(result.Recovered, check.IsFalse) + c.Assert(result.Truncated, check.IsFalse) } diff --git a/relay/writer/writer.go b/relay/writer/writer.go index 2d0c7c5010..07fff224db 100644 --- a/relay/writer/writer.go +++ b/relay/writer/writer.go @@ -35,9 +35,8 @@ type Result struct { // RecoverResult represents a result for a binlog recover operation. type RecoverResult struct { - // true if recover operation has done and successfully. - // false if no recover operation has done or unsuccessfully. - Recovered bool + // if truncate trailing incomplete events during recovering in relay log + Truncated bool // the latest binlog position after recover operation has done. LatestPos gmysql.Position // the latest binlog GTID set after recover operation has done. diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index f8cdcfc73a..9689ed2a82 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -720,6 +720,9 @@ function test_last_bound() { start_2_worker_ensure_bound 1 2 check_bound + # only contains 1 "will try purge ..." which is printed the first time dm worker start + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "will try purge whole relay dir for new relay log" 1 + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "will try purge whole relay dir for new relay log" 1 kill_2_worker_ensure_unbound 1 2 @@ -727,10 +730,17 @@ function test_last_bound() { start_2_worker_ensure_bound 2 1 check_bound + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "will try purge whole relay dir for new relay log" 1 + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "will try purge whole relay dir for new relay log" 1 # kill 12, start 34, kill 34 kill_2_worker_ensure_unbound 1 2 start_2_worker_ensure_bound 3 4 + # let other workers rather then 1 2 forward the syncer's progress + run_sql_file_withdb $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 $ha_test + run_sql "flush logs;" $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_file_withdb $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 $ha_test + sleep 1 kill_2_worker_ensure_unbound 3 4 # start 1 then 2 @@ -738,6 +748,11 @@ function test_last_bound() { # check check_bound + # other workers has forwarded the sync progress, if moved to a new binlog file, original relay log could be removed + num1=`grep "will try purge whole relay dir for new relay log" $WORK_DIR/worker1/log/dm-worker.log | wc -l` + num2=`grep "will try purge whole relay dir for new relay log" $WORK_DIR/worker2/log/dm-worker.log | wc -l` + echo "num1$num1 num2$num2" + [[ $num1+$num2 -eq 3 ]] echo "[$(date)] <<<<<< finish test_last_bound >>>>>>" } diff --git a/tests/relay_interrupt/run.sh b/tests/relay_interrupt/run.sh index 00ae4f6ba7..fab8593b85 100644 --- a/tests/relay_interrupt/run.sh +++ b/tests/relay_interrupt/run.sh @@ -91,25 +91,24 @@ function run() { "\"result\": false" 1 \ "subtasks with name test for sources \[mysql-replica-01\] already exist" 1 -# TODO(csuzhangxc): support relay log again. -# run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ -# "query-status test" \ -# "\"binlogType\": \"local\"" 1 -# -# check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - -# prepare_data2 $i -# echo "read binlog from relay log failed, and will use remote binlog" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"binlogType\": \"local\"" 1 + + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + echo "read binlog from relay log failed, and will use remote binlog" kill_dm_worker export GO_FAILPOINTS="github.com/pingcap/dm/pkg/streamer/GetEventFromLocalFailed=return()" run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + prepare_data2 $i sleep 8 -# run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ -# "query-status test" \ -# "\"binlogType\": \"remote\"" 1 -# -# check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"binlogType\": \"remote\"" 1 + + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml export GO_FAILPOINTS='' cleanup_process