Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: don't purge relay logs when connected to last source #1400

Merged
merged 19 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (h *realRelayHolder) Operate(ctx context.Context, op pb.RelayOp) error {
return terror.ErrWorkerRelayOperNotSupport.Generate(op.String())
}

func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) pauseRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
if h.stage != pb.Stage_Running {
h.Unlock()
Expand All @@ -210,7 +210,7 @@ func (h *realRelayHolder) pauseRelay(ctx context.Context, op pb.RelayOp) error {
return nil
}

func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) resumeRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
defer h.Unlock()
if h.stage != pb.Stage_Paused {
Expand All @@ -225,7 +225,7 @@ func (h *realRelayHolder) resumeRelay(ctx context.Context, op pb.RelayOp) error
return nil
}

func (h *realRelayHolder) stopRelay(ctx context.Context, op pb.RelayOp) error {
func (h *realRelayHolder) stopRelay(_ context.Context, op pb.RelayOp) error {
h.Lock()
if h.stage == pb.Stage_Stopped {
h.Unlock()
Expand Down
9 changes: 8 additions & 1 deletion pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Set interface {
// should become `00c04543-f584-11e9-a765-0242ac120002:1-60`.
Truncate(end Set) error

// ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed
ResetStart() bool

String() string
}

Expand Down Expand Up @@ -260,7 +263,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error {
// ResetStart resets the start part of GTID sets,
// like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`.
// return `true` if reset real happen.
// NOTE: for MariaDB GTID, no this function exists because its format is `domainID-serverID-SeqNum`.
func (g *MySQLGTIDSet) ResetStart() bool {
if g == nil || g.set == nil {
return false
Expand Down Expand Up @@ -442,6 +444,11 @@ func (m *MariadbGTIDSet) Truncate(end Set) error {
return nil
}

// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`.
func (m *MariadbGTIDSet) ResetStart() bool {
return false
}

func (m *MariadbGTIDSet) String() string {
if m.set == nil {
return ""
Expand Down
22 changes: 14 additions & 8 deletions pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,9 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) {
}
}

func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) {
func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) {
var (
gMaria, _ = ParserGTID("", "1-2-3")
flavor = "mysql"
gNil *MySQLGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
Expand All @@ -403,24 +404,29 @@ func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) {
g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100")
g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100")
)

c.Assert(gMaria.ResetStart(), IsFalse)
c.Assert(gNil.ResetStart(), IsFalse)
c.Assert(gEmpty.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(gEmpty.ResetStart(), IsFalse)

c.Assert(g1.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(g1.ResetStart(), IsFalse)

c.Assert(g2.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g2.ResetStart(), IsTrue)
c.Assert(g2.Equal(g1), IsTrue)

c.Assert(g3.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g3.ResetStart(), IsTrue)
c.Assert(g3.Equal(g1), IsTrue)

c.Assert(g4.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(g4.ResetStart(), IsFalse)

c.Assert(g5.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g5.ResetStart(), IsTrue)
c.Assert(g5.Equal(g4), IsTrue)

c.Assert(g6.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g6.ResetStart(), IsTrue)
c.Assert(g6.Equal(g4), IsTrue)

c.Assert(g7.ResetStart(), IsTrue)
// TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon
}
6 changes: 2 additions & 4 deletions pkg/v1dbschema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,8 @@ func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reade
defer func() {
if err == nil && gs != nil {
oldGs := gs.Clone()
if mysqlGs, ok := gs.(*gtid.MySQLGTIDSet); ok {
if mysqlGs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", mysqlGs))
}
if gs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs))
}
}
}()
Expand Down
6 changes: 3 additions & 3 deletions relay/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, en
lm.BinlogGTID = binlogGTID
lm.gset = gset

return true, nil
return true, lm.doFlush()
}

// Save implements Meta.Save
Expand Down Expand Up @@ -222,8 +222,8 @@ func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error {

// Flush implements Meta.Flush
func (lm *LocalMeta) Flush() error {
lm.RLock()
defer lm.RUnlock()
lm.Lock()
defer lm.Unlock()

return lm.doFlush()
}
Expand Down
34 changes: 25 additions & 9 deletions relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package relay
import (
"io/ioutil"
"os"
"path"
"strings"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -96,52 +97,67 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
dirty := lm.Dirty()
c.Assert(dirty, IsFalse)

// set currentUUID because lm.doFlush need it
currentUUID := "uuid.000001"
c.Assert(os.MkdirAll(path.Join(dir, currentUUID), 0777), IsNil)
setLocalMetaWithCurrentUUID := func() {
lm = NewLocalMeta("mysql", dir)
lm.(*LocalMeta).currentUUID = currentUUID
}

// adjust to start pos
setLocalMetaWithCurrentUUID()
latestBinlogName := "mysql-bin.000009"
latestGTIDStr := "85ab69d1-b21f-11e6-9c5e-64006a8978d2:45-57"
cs0 := cases[0]
adjusted, err := lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, cs0.pos.Name)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, "")

// adjust to start pos with enableGTID
setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, cs0.pos.Name)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset, DeepEquals, cs0.gset)

// adjust to the last binlog if start pos is empty
setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos("", cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, "")

setLocalMetaWithCurrentUUID()
adjusted, err = lm.AdjustWithStartPos("", "", true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(uuid, Equals, currentUUID)
c.Assert(gset.String(), Equals, latestGTIDStr)

// reset
lm.(*LocalMeta).currentUUID = ""

for _, cs := range cases {
err = lm.AddDir(cs.uuid, nil, nil, 0)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -209,7 +225,7 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
dirty = lm.Dirty()
c.Assert(dirty, IsFalse)

currentUUID, pos := lm.Pos()
currentUUID, pos = lm.Pos()
c.Assert(currentUUID, Equals, cs.uuidWithSuffix)
c.Assert(pos, DeepEquals, cs.pos)

Expand Down
Loading