Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
relay: don't purge relay logs when connected to last source (#1400)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Feb 4, 2021
1 parent 1acfc29 commit efa5b30
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 84 deletions.
6 changes: 3 additions & 3 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (h *realRelayHolder) Operate(ctx context.Context, op pb.RelayOp) error {
return terror.ErrWorkerRelayOperNotSupport.Generate(op.String())
}

func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) pauseRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
if h.stage != pb.Stage_Running {
h.Unlock()
Expand All @@ -210,7 +210,7 @@ func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error {
return nil
}

func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) resumeRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
defer h.Unlock()
if h.stage != pb.Stage_Paused {
Expand All @@ -225,7 +225,7 @@ func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error
return nil
}

func (h *realRelayHolder) stopRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) stopRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
if h.stage == pb.Stage_Stopped {
h.Unlock()
Expand Down
9 changes: 8 additions & 1 deletion pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Set interface {
// should become `00c04543-f584-11e9-a765-0242ac120002:1-60`.
Truncate(end Set) error

// ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed
ResetStart() bool

String() string
}

Expand Down Expand Up @@ -260,7 +263,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error {
// ResetStart resets the start part of GTID sets,
// like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`.
// return `true` if reset real happen.
// NOTE: for MariaDB GTID, no this function exists because its format is `domainID-serverID-SeqNum`.
func (g *MySQLGTIDSet) ResetStart() bool {
if g == nil || g.set == nil {
return false
Expand Down Expand Up @@ -442,6 +444,11 @@ func (m *MariadbGTIDSet) Truncate(end Set) error {
return nil
}

// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`.
func (m *MariadbGTIDSet) ResetStart() bool {
return false
}

func (m *MariadbGTIDSet) String() string {
if m.set == nil {
return ""
Expand Down
22 changes: 14 additions & 8 deletions pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,9 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) {
}
}

func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) {
func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) {
var (
gMaria, _ = ParserGTID("", "1-2-3")
flavor = "mysql"
gNil *MySQLGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
Expand All @@ -413,24 +414,29 @@ func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) {
g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100")
g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100")
)

c.Assert(gMaria.ResetStart(), IsFalse)
c.Assert(gNil.ResetStart(), IsFalse)
c.Assert(gEmpty.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(gEmpty.ResetStart(), IsFalse)

c.Assert(g1.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(g1.ResetStart(), IsFalse)

c.Assert(g2.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g2.ResetStart(), IsTrue)
c.Assert(g2.Equal(g1), IsTrue)

c.Assert(g3.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g3.ResetStart(), IsTrue)
c.Assert(g3.Equal(g1), IsTrue)

c.Assert(g4.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(g4.ResetStart(), IsFalse)

c.Assert(g5.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g5.ResetStart(), IsTrue)
c.Assert(g5.Equal(g4), IsTrue)

c.Assert(g6.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g6.ResetStart(), IsTrue)
c.Assert(g6.Equal(g4), IsTrue)

c.Assert(g7.ResetStart(), IsTrue)
// TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon
}
6 changes: 2 additions & 4 deletions pkg/v1dbschema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,8 @@ func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reade
defer func() {
if err == nil && gs != nil {
oldGs := gs.Clone()
if mysqlGs, ok := gs.(*gtid.MySQLGTIDSet); ok {
if mysqlGs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", mysqlGs))
}
if gs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs))
}
}
}()
Expand Down
6 changes: 3 additions & 3 deletions relay/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, en
lm.BinlogGTID = binlogGTID
lm.gset = gset

return true, nil
return true, lm.doFlush()
}

// Save implements Meta.Save
Expand Down Expand Up @@ -222,8 +222,8 @@ func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error {

// Flush implements Meta.Flush
func (lm *LocalMeta) Flush() error {
lm.RLock()
defer lm.RUnlock()
lm.Lock()
defer lm.Unlock()

return lm.doFlush()
}
Expand Down
34 changes: 25 additions & 9 deletions relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package relay
import (
"io/ioutil"
"os"
"path"
"strings"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -96,52 +97,67 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
dirty := lm.Dirty()
c.Assert(dirty, IsFalse)

// set currentUUID because lm.doFlush need it
currentUUID := "uuid.000001"
c.Assert(os.MkdirAll(path.Join(dir, currentUUID), 0777), IsNil)
setLocalMetaWithCurrentUUID := func() {
lm = NewLocalMeta("mysql", dir)
lm.(*LocalMeta).currentUUID = currentUUID
}

// adjust to start pos
setLocalMetaWithCurrentUUID()
latestBinlogName := "mysql-bin.000009"
latestGTIDStr := "85ab69d1-b21f-11e6-9c5e-64006a8978d2:45-57"
cs0 := cases[0]
adjusted, err := lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, cs0.pos.Name)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, "")

// adjust to start pos with enableGTID
setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, cs0.pos.Name)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset, DeepEquals, cs0.gset)

// adjust to the last binlog if start pos is empty
setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos("", cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, "")

setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos("", "", true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, latestGTIDStr)

// reset
lm.(*LocalMeta).currentUUID = ""

for _, cs := range cases {
err = lm.AddDir(cs.uuid, nil, nil, 0)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -209,7 +225,7 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
dirty = lm.Dirty()
c.Assert(dirty, IsFalse)

currentUUID, pos := lm.Pos()
currentUUID, pos = lm.Pos()
c.Assert(currentUUID, Equals, cs.uuidWithSuffix)
c.Assert(pos, DeepEquals, cs.pos)

Expand Down
Loading

0 comments on commit efa5b30

Please sign in to comment.