Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
relay: begin with the latest binlog file if relay position is not set (
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Oct 17, 2019
1 parent 812c714 commit 5ba776e
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 38 deletions.
6 changes: 0 additions & 6 deletions pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ func GetMasterStatus(db *sql.DB, flavor string) (gmysql.Position, gtid.Set, erro
)

rows, err := db.Query(`SHOW MASTER STATUS`)

failpoint.Inject("GetMasterStatusFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
log.L().Warn("GetMasterStatus failed", zap.String("failpoint", "GetMasterStatusFailed"), zap.Error(err))
})

if err != nil {
return binlogPos, gs, err
}
Expand Down
40 changes: 20 additions & 20 deletions relay/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type Meta interface {
Load() error

// AdjustWithStartPos adjusts current pos / GTID with start pos
// if current pos / GTID is meaningless, update to start pos
// if current pos / GTID is meaningless, update to start pos or last pos when start pos is meaningless
// else do nothing
AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool) (bool, error)
AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error)

// Save saves meta information
Save(pos mysql.Position, gset gtid.Set) error
Expand Down Expand Up @@ -152,7 +152,7 @@ func (lm *LocalMeta) Load() error {
}

// AdjustWithStartPos implements Meta.AdjustWithStartPos, return whether adjusted
func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool) (bool, error) {
func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error) {
lm.Lock()
defer lm.Unlock()

Expand All @@ -165,32 +165,32 @@ func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, en
}
}

if (enableGTID && len(binlogGTID) == 0) || (!enableGTID && len(binlogName) == 0) {
return false, nil // no meaningful start pos specified
}
var gset = lm.emptyGSet.Clone()
var err error

if !enableGTID && len(binlogName) > 0 {
if !binlog.VerifyFilename(binlogName) {
return false, terror.ErrRelayBinlogNameNotValid.Generate(binlogName)
if enableGTID {
if len(binlogGTID) == 0 {
binlogGTID = latestBinlogGTID
binlogName = latestBinlogName
}
}
var gset = lm.emptyGSet.Clone()
if enableGTID && len(binlogGTID) > 0 {
var err error
gset, err = gtid.ParserGTID(lm.flavor, binlogGTID)
if err != nil {
return false, terror.Annotatef(err, "relay-binlog-gtid %s", binlogGTID)
}
}

// verified, update them
if enableGTID {
lm.BinLogName = minCheckpoint.Name
} else {
lm.BinLogName = binlogName
if len(binlogName) == 0 { // no meaningful start pos specified
binlogGTID = latestBinlogGTID
binlogName = latestBinlogName
} else {
if !binlog.VerifyFilename(binlogName) {
return false, terror.ErrRelayBinlogNameNotValid.Generate(binlogName)
}
}
}

lm.BinLogName = binlogName
lm.BinLogPos = minCheckpoint.Pos // always set pos to 4
lm.BinlogGTID = gset.String()
lm.BinlogGTID = binlogGTID
lm.gset = gset

return true, nil
Expand Down
32 changes: 28 additions & 4 deletions relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
c.Assert(dirty, IsFalse)

// adjust to start pos
latestBinlogName := "mysql-bin.000009"
latestGTIDStr := "85ab69d1-b21f-11e6-9c5e-64006a8978d2:45-57"
cs0 := cases[0]
adjusted, err := lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), false)
adjusted, err := lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
Expand All @@ -107,17 +109,39 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(gset.String(), Equals, "")

// adjust to start pos with enableGTID
adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), true)
adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(pos.Name, Equals, minCheckpoint.Name)
c.Assert(pos.Name, Equals, cs0.pos.Name)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(gset, DeepEquals, cs0.gset)

// adjust to the last binlog if start pos is empty
adjusted, err = lm.AdjustWithStartPos("", cs0.gset.String(), false, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(gset.String(), Equals, "")

adjusted, err = lm.AdjustWithStartPos("", "", true, latestBinlogName, latestGTIDStr)
c.Assert(err, IsNil)
c.Assert(adjusted, IsTrue)
uuid, pos = lm.Pos()
c.Assert(uuid, Equals, "")
c.Assert(pos.Name, Equals, latestBinlogName)
uuid, gset = lm.GTID()
c.Assert(uuid, Equals, "")
c.Assert(gset.String(), Equals, latestGTIDStr)

for _, cs := range cases {
err = lm.AddDir(cs.uuid, nil, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -148,7 +172,7 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {

// try adjust to start pos again
csn1 := cases[len(cases)-1]
adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), false)
adjusted, err = lm.AdjustWithStartPos(cs0.pos.Name, cs0.gset.String(), false, "", "")
c.Assert(err, IsNil)
c.Assert(adjusted, IsFalse)
uuid, pos = lm.Pos()
Expand Down
22 changes: 17 additions & 5 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,26 @@ func (r *Relay) reSetupMeta() error {
return err
}

// try adjust meta with start pos from config
if (r.cfg.EnableGTID && len(r.cfg.BinlogGTID) > 0) || len(r.cfg.BinLogName) > 0 {
adjusted, err := r.meta.AdjustWithStartPos(r.cfg.BinLogName, r.cfg.BinlogGTID, r.cfg.EnableGTID)
var latestPosName, latestGTIDStr string
if (r.cfg.EnableGTID && len(r.cfg.BinlogGTID) == 0) || (!r.cfg.EnableGTID && len(r.cfg.BinLogName) == 0) {
latestPos, latestGTID, err := utils.GetMasterStatus(r.db, r.cfg.Flavor)
if err != nil {
return err
} else if adjusted {
r.tctx.L().Info("adjusted meta to start pos", zap.String("start pos's binlog name", r.cfg.BinLogName), zap.String("start pos's binlog gtid", r.cfg.BinlogGTID))
}
latestPosName = latestPos.Name
latestGTIDStr = latestGTID.String()
}

// try adjust meta with start pos from config
adjusted, err := r.meta.AdjustWithStartPos(r.cfg.BinLogName, r.cfg.BinlogGTID, r.cfg.EnableGTID, latestPosName, latestGTIDStr)
if err != nil {
return err
}

if adjusted {
_, pos := r.meta.Pos()
_, gtid := r.meta.GTID()
r.tctx.L().Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gtid))
}

r.updateMetricsRelaySubDirIndex()
Expand Down
2 changes: 1 addition & 1 deletion relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (t *testRelaySuite) TestReSetupMeta(c *C) {
r.cfg.BinLogName = "mysql-bin.000005"
c.Assert(r.reSetupMeta(), IsNil)
uuid001 := fmt.Sprintf("%s.000001", uuid)
t.verifyMetadata(c, r, uuid001, minCheckpoint, r.cfg.BinlogGTID, []string{uuid001})
t.verifyMetadata(c, r, uuid001, gmysql.Position{Name: r.cfg.BinLogName, Pos: 4}, r.cfg.BinlogGTID, []string{uuid001})

// re-setup meta again, often happen when connecting a server behind a VIP.
c.Assert(r.reSetupMeta(), IsNil)
Expand Down
11 changes: 10 additions & 1 deletion syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"
Expand Down Expand Up @@ -84,7 +86,14 @@ func (conn *Conn) ResetConn(tctx *tcontext.Context) error {
}

func (conn *Conn) getMasterStatus(flavor string) (mysql.Position, gtid.Set, error) {
return utils.GetMasterStatus(conn.baseConn.DB, flavor)
pos, gtidSet, err := utils.GetMasterStatus(conn.baseConn.DB, flavor)

failpoint.Inject("GetMasterStatusFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
log.L().Warn("GetMasterStatus failed", zap.String("failpoint", "GetMasterStatusFailed"), zap.Error(err))
})

return pos, gtidSet, err
}

func (conn *Conn) getServerUUID(flavor string) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion tests/initial_unit/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ function run() {
failpoints=(
# 1152 is ErrAbortingConnection
"github.com/pingcap/dm/syncer/LoadCheckpointFailed=return(1152)"
"github.com/pingcap/dm/pkg/utils/GetMasterStatusFailed=return(1152)"
"github.com/pingcap/dm/syncer/GetMasterStatusFailed=return(1152)"
)

for(( i=0;i<${#failpoints[@]};i++)) do
Expand Down

0 comments on commit 5ba776e

Please sign in to comment.