Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: don't purge relay logs when connected to last source #1400

Merged
merged 19 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ func (h *realRelayHolder) stopRelay(ctx context.Context, op pb.RelayOp) error {
h.stage = pb.Stage_Stopped
h.Unlock() // unlock to make `run` can return

// purge relay dir when delete source
if err := h.relay.PurgeRelayDir(); err != nil {
return err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit aggressive.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to change 🤔

Copy link
Contributor

@lichunzhu lichunzhu Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving relay dir to another name (relay_dir -> relay_dir_deprecated) instead of deleting it directly. We have purge-relay command and I think the purge action should only happen in this command.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relay_dir_deprecated is out of relay_dir thus can't be deleted using purge-relay. And disk space is not freed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea. stop source is a normal operation. shouldn't purge data or move to another directory.

// now, when try to stop relay unit, we close relay holder
h.Close()
return nil
Expand Down
1 change: 1 addition & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return err
}
startRelay = !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running
// TODO: PurgeRelayDir if not found relay, not if not found, that's not bound !
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
go func() {
w.Start(startRelay)
Expand Down
6 changes: 3 additions & 3 deletions relay/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, en
lm.BinlogGTID = binlogGTID
lm.gset = gset

return true, nil
return true, lm.doFlush()
}

// Save implements Meta.Save
Expand Down Expand Up @@ -222,8 +222,8 @@ func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error {

// Flush implements Meta.Flush
func (lm *LocalMeta) Flush() error {
lm.RLock()
defer lm.RUnlock()
lm.Lock()
defer lm.Unlock()

return lm.doFlush()
}
Expand Down
34 changes: 25 additions & 9 deletions relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package relay
import (
"io/ioutil"
"os"
"path"
"strings"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -96,52 +97,67 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
dirty := lm.Dirty()
c.Assert(dirty, IsFalse)

// set currentUUID because lm.doFlush need it
currentUUID := "uuid.000001"
c.Assert(os.MkdirAll(path.Join(dir, currentUUID), 0777), IsNil)
setLocalMetaWithCurrentUUID := func() {
lm = NewLocalMeta("mysql", dir)
lm.(*LocalMeta).currentUUID = currentUUID
}

// adjust to start pos
setLocalMetaWithCurrentUUID()
latestBinlogName := "mysql-bin.000009"
latestGTIDStr := "85ab69d1-b21f-11e6-9c5e-64006a8978d2:45-57"
cs0 := cases[0]
adjusted, err := lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, cs0.pos.Name)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, "")

// adjust to start pos with enableGTID
setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, cs0.pos.Name)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset, DeepEquals, cs0.gset)

// adjust to the last binlog if start pos is empty
setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos("", cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, "")

setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos("", "", true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, latestGTIDStr)

// reset
lm.(*LocalMeta).currentUUID = ""

for _, cs := range cases {
err = lm.AddDir(cs.uuid, nil, nil, 0)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -209,7 +225,7 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
dirty = lm.Dirty()
c.Assert(dirty, IsFalse)

currentUUID, pos := lm.Pos()
currentUUID, pos = lm.Pos()
c.Assert(currentUUID, Equals, cs.uuidWithSuffix)
c.Assert(pos, DeepEquals, cs.pos)

Expand Down
53 changes: 42 additions & 11 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (r *Relay) process(ctx context.Context) error {
return err
}

if isNew || r.cfg.UUIDSuffix > 0 {
if isNew {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
// re-setup meta for new server or new source
err = r.reSetupMeta(ctx)
if err != nil {
Expand Down Expand Up @@ -279,6 +279,7 @@ func (r *Relay) process(ctx context.Context) error {
func (r *Relay) PurgeRelayDir() error {
dir := r.cfg.RelayDir
d, err := os.Open(dir)
r.logger.Info("will try purge whole relay dir for new relay log", zap.String("relayDir", dir))
// fail to open dir, return directly
if err != nil {
if err == os.ErrNotExist {
Expand Down Expand Up @@ -334,7 +335,10 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
// NOTE: recover a relay log file with too many binlog events may take a little long time.
result, err := writer2.Recover(ctx)
if err == nil {
if result.Recovered {
relayLogHasMore := result.LatestPos.Compare(latestPos) > 0 ||
(result.LatestGTIDs != nil && !result.LatestGTIDs.Equal(latestGTID) && result.LatestGTIDs.Contain(latestGTID))

if result.Truncated || relayLogHasMore {
r.logger.Warn("relay log file recovered",
zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))

Expand Down Expand Up @@ -369,12 +373,7 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
if err != nil {
return terror.Annotatef(err, "save position %s, GTID sets %v after recovered", result.LatestPos, result.LatestGTIDs)
}
} else if result.LatestPos.Compare(latestPos) > 0 ||
(result.LatestGTIDs != nil && !result.LatestGTIDs.Equal(latestGTID) && result.LatestGTIDs.Contain(latestGTID)) {
r.logger.Warn("relay log file have more events",
zap.Stringer("after position", latestPos), zap.Stringer("until position", result.LatestPos), log.WrapStringerField("after GTID set", latestGTID), log.WrapStringerField("until GTID set", result.LatestGTIDs))
}

}
return terror.Annotatef(err, "recover for UUID %s with config %+v", uuid, cfg)
}
Expand Down Expand Up @@ -572,21 +571,33 @@ func (r *Relay) reSetupMeta(ctx context.Context) error {
}

_, pos := r.meta.Pos()
_, gtid := r.meta.GTID()
_, gs := r.meta.GTID()
if r.cfg.EnableGTID {
// Adjust given gtid
// This means we always pull the binlog from the beginning of file.
gtid, err = r.adjustGTID(ctx, gtid)
gs, err = r.adjustGTID(ctx, gs)
if err != nil {
return terror.Annotate(err, "fail to adjust gtid for relay")
}
err = r.SaveMeta(pos, gtid)
// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
oldGs := gs.Clone()
if mysqlGs, ok := gs.(*gtid.MySQLGTIDSet); ok {
if mysqlGs.ResetStart() {
r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", mysqlGs))
}
}
err = r.SaveMeta(pos, gs)
if err != nil {
return err
}
}

r.logger.Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gtid))
r.logger.Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gs))
r.updateMetricsRelaySubDirIndex()

return nil
Expand Down Expand Up @@ -615,6 +626,11 @@ func (r *Relay) doIntervalOps(ctx context.Context) {
for {
select {
case <-flushTicker.C:
r.RLock()
if r.closed.Get() {
r.RUnlock()
return
}
if r.meta.Dirty() {
err := r.FlushMeta()
if err != nil {
Expand All @@ -623,28 +639,43 @@ func (r *Relay) doIntervalOps(ctx context.Context) {
r.logger.Info("flush meta finished", zap.Stringer("meta", r.meta))
}
}
r.RUnlock()
case <-masterStatusTicker.C:
r.RLock()
if r.closed.Get() {
r.RUnlock()
return
}
ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout)
pos, _, err := utils.GetMasterStatus(ctx2, r.db, r.cfg.Flavor)
cancel2()
if err != nil {
r.logger.Warn("get master status", zap.Error(err))
r.RUnlock()
continue
}
index, err := binlog.GetFilenameIndex(pos.Name)
if err != nil {
r.logger.Error("parse binlog file name", zap.String("file name", pos.Name), log.ShortError(err))
r.RUnlock()
continue
}
relayLogFileGauge.WithLabelValues("master").Set(float64(index))
relayLogPosGauge.WithLabelValues("master").Set(float64(pos.Pos))
r.RUnlock()
case <-trimUUIDsTicker.C:
r.RLock()
if r.closed.Get() {
r.RUnlock()
return
}
trimmed, err := r.meta.TrimUUIDs()
if err != nil {
r.logger.Error("trim UUIDs", zap.Error(err))
} else if len(trimmed) > 0 {
r.logger.Info("trim UUIDs", zap.String("UUIDs", strings.Join(trimmed, ";")))
}
r.RUnlock()
case <-ctx.Done():
return
}
Expand Down
9 changes: 6 additions & 3 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,19 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
c.Assert(err, IsNil)
latestGTID2, err := gtid.ParserGTID(relayCfg.Flavor, latestGTIDStr2)
c.Assert(err, IsNil)
g, _, data := genBinlogEventsWithGTIDs(c, relayCfg.Flavor, previousGTIDSet, latestGTID1, latestGTID2)
g, events, data := genBinlogEventsWithGTIDs(c, relayCfg.Flavor, previousGTIDSet, latestGTID1, latestGTID2)

// write events into relay log file
err = ioutil.WriteFile(filepath.Join(r.meta.Dir(), filename), data, 0600)
c.Assert(err, IsNil)

// all events/transactions are complete, no need to recover
c.Assert(r.tryRecoverLatestFile(context.Background(), parser2), IsNil)
// now, we do not update position/GTID set in meta if not recovered
t.verifyMetadata(c, r, uuidWithSuffix, startPos, "", []string{uuidWithSuffix})
// now, we will update position/GTID set in meta to latest location in relay logs
lastEvent := events[len(events)-1]
pos := startPos
pos.Pos = lastEvent.Header.LogPos
t.verifyMetadata(c, r, uuidWithSuffix, pos, recoverGTIDSetStr, []string{uuidWithSuffix})

// write some invalid data into the relay log file
f, err = os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
Expand Down
4 changes: 2 additions & 2 deletions relay/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (w *FileWriter) doRecovering(ctx context.Context) (RecoverResult, error) {
// in most cases, we think the file is fine, so compare the size is simpler.
if fs.Size() == latestPos {
return RecoverResult{
Recovered: false, // no recovering for the file
Truncated: false,
LatestPos: gmysql.Position{Name: w.filename.Get(), Pos: uint32(latestPos)},
LatestGTIDs: latestGTIDs,
}, nil
Expand All @@ -431,7 +431,7 @@ func (w *FileWriter) doRecovering(ctx context.Context) (RecoverResult, error) {
}

return RecoverResult{
Recovered: true,
Truncated: true,
LatestPos: gmysql.Position{Name: w.filename.Get(), Pos: uint32(latestPos)},
LatestGTIDs: latestGTIDs,
}, nil
Expand Down
14 changes: 7 additions & 7 deletions relay/writer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (t *testFileWriterSuite) TestInterfaceMethods(c *check.C) {
// recover
rres, err := w.Recover(context.Background())
c.Assert(err, check.IsNil)
c.Assert(rres.Recovered, check.IsFalse)
c.Assert(rres.Truncated, check.IsFalse)

// write event
res, err := w.WriteEvent(ev)
Expand Down Expand Up @@ -599,7 +599,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) {
// try recover, but in fact do nothing
result, err := w.Recover(context.Background())
c.Assert(err, check.IsNil)
c.Assert(result.Recovered, check.IsFalse)
c.Assert(result.Truncated, check.IsFalse)
c.Assert(result.LatestPos, check.DeepEquals, expectedPos)
c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs)

Expand Down Expand Up @@ -629,7 +629,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) {
// try recover, truncate the incomplete event
result, err = w.Recover(context.Background())
c.Assert(err, check.IsNil)
c.Assert(result.Recovered, check.IsTrue)
c.Assert(result.Truncated, check.IsTrue)
c.Assert(result.LatestPos, check.DeepEquals, expectedPos)
c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs)

Expand Down Expand Up @@ -657,7 +657,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) {
// try recover, truncate the incomplete transaction
result, err = w.Recover(context.Background())
c.Assert(err, check.IsNil)
c.Assert(result.Recovered, check.IsTrue)
c.Assert(result.Truncated, check.IsTrue)
c.Assert(result.LatestPos, check.DeepEquals, expectedPos)
c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs)

Expand Down Expand Up @@ -688,7 +688,7 @@ func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) {
c.Assert(err, check.IsNil)
result, err = w.Recover(context.Background())
c.Assert(err, check.IsNil)
c.Assert(result.Recovered, check.IsFalse)
c.Assert(result.Truncated, check.IsFalse)
c.Assert(result.LatestPos, check.DeepEquals, expectedPos)
c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs)

Expand All @@ -715,7 +715,7 @@ func (t *testFileWriterSuite) TestRecoverMySQLNone(c *check.C) {
// no file specified to recover
result, err := w1.Recover(context.Background())
c.Assert(err, check.IsNil)
c.Assert(result.Recovered, check.IsFalse)
c.Assert(result.Truncated, check.IsFalse)

cfg.Filename = "mysql-bin.000001"
w2 := NewFileWriter(log.L(), cfg, t.parser)
Expand All @@ -725,5 +725,5 @@ func (t *testFileWriterSuite) TestRecoverMySQLNone(c *check.C) {
// file not exist, no need to recover
result, err = w2.Recover(context.Background())
c.Assert(err, check.IsNil)
c.Assert(result.Recovered, check.IsFalse)
c.Assert(result.Truncated, check.IsFalse)
}
5 changes: 2 additions & 3 deletions relay/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ type Result struct {

// RecoverResult represents a result for a binlog recover operation.
type RecoverResult struct {
// true if recover operation has done and successfully.
// false if no recover operation has done or unsuccessfully.
Recovered bool
// if truncate trailing incomplete events during recovering in relay log
Truncated bool
// the latest binlog position after recover operation has done.
LatestPos gmysql.Position
// the latest binlog GTID set after recover operation has done.
Expand Down
Loading