Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1430 to release-2.0 (#1441)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: lance6716 <[email protected]>
  • Loading branch information
ti-srebot and lance6716 authored Feb 19, 2021
1 parent 0429f76 commit 1235c74
Show file tree
Hide file tree
Showing 21 changed files with 331 additions and 116 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check-and-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/checkout@v2

- name: Cache go modules
uses: actions/cache@v2
uses: actions/cache@v2.1.3 # latest v2 can't work in macOS https://github.com/actions/cache/issues/527
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-dm-${{ hashFiles('**/go.sum') }}
Expand Down
30 changes: 0 additions & 30 deletions pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type Set interface {
// should become `00c04543-f584-11e9-a765-0242ac120002:1-60`.
Truncate(end Set) error

// ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed
ResetStart() bool

String() string
}

Expand Down Expand Up @@ -260,28 +257,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error {
return nil
}

// ResetStart resets the start part of GTID sets,
// like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`.
// return `true` if reset real happen.
func (g *MySQLGTIDSet) ResetStart() bool {
if g == nil || g.set == nil {
return false
}

reset := false
for _, set := range g.set.Sets {
for i, inter := range set.Intervals {
if inter.Start > 1 {
inter.Start = 1
set.Intervals[i] = inter // re-assign
reset = true
}
}
}

return reset
}

func (g *MySQLGTIDSet) String() string {
if g.set == nil {
return ""
Expand Down Expand Up @@ -444,11 +419,6 @@ func (m *MariadbGTIDSet) Truncate(end Set) error {
return nil
}

// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`.
func (m *MariadbGTIDSet) ResetStart() bool {
return false
}

func (m *MariadbGTIDSet) String() string {
if m.set == nil {
return ""
Expand Down
39 changes: 0 additions & 39 deletions pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,42 +401,3 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) {
}
}
}

func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) {
var (
gMaria, _ = ParserGTID("", "1-2-3")
flavor = "mysql"
gNil *MySQLGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
g1, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100")
g2, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100")
g3, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:50-100")
g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100")
g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100")
)

c.Assert(gMaria.ResetStart(), IsFalse)
c.Assert(gNil.ResetStart(), IsFalse)
c.Assert(gEmpty.ResetStart(), IsFalse)

c.Assert(g1.ResetStart(), IsFalse)

c.Assert(g2.ResetStart(), IsTrue)
c.Assert(g2.Equal(g1), IsTrue)

c.Assert(g3.ResetStart(), IsTrue)
c.Assert(g3.Equal(g1), IsTrue)

c.Assert(g4.ResetStart(), IsFalse)

c.Assert(g5.ResetStart(), IsTrue)
c.Assert(g5.Equal(g4), IsTrue)

c.Assert(g6.ResetStart(), IsTrue)
c.Assert(g6.Equal(g4), IsTrue)

c.Assert(g7.ResetStart(), IsTrue)
// TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon
}
42 changes: 42 additions & 0 deletions pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,45 @@ func ExtractTiDBVersion(version string) (*semver.Version, error) {
rawVersion = strings.TrimPrefix(rawVersion, "v")
return semver.NewVersion(rawVersion)
}

// AddGSetWithPurged is used to handle this case: https://github.com/pingcap/dm/issues/1418
// we might get a gtid set from Previous_gtids event in binlog, but that gtid set can't be used to start a gtid sync
// because it doesn't cover all gtid_purged. The error of using it will be
// ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
// so we add gtid_purged to it.
func AddGSetWithPurged(ctx context.Context, gset gtid.Set, conn *sql.Conn) (gtid.Set, error) {
if _, ok := gset.(*gtid.MariadbGTIDSet); ok {
return gset, nil
}

var (
gtidStr string
row *sql.Row
err error
)

failpoint.Inject("GetGTIDPurged", func(val failpoint.Value) {
str := val.(string)
gtidStr = str
failpoint.Goto("bypass")
})
row = conn.QueryRowContext(ctx, "select @@GLOBAL.gtid_purged")
err = row.Scan(&gtidStr)
if err != nil {
log.L().Error("can't get @@GLOBAL.gtid_purged when try to add it to gtid set", zap.Error(err))
return gset, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
failpoint.Label("bypass")
if gtidStr == "" {
return gset, nil
}

newGset := gset.Origin()
err = newGset.Update(gtidStr)
if err != nil {
return nil, err
}
ret := &gtid.MySQLGTIDSet{}
_ = ret.Set(newGset)
return ret, nil
}
19 changes: 4 additions & 15 deletions pkg/v1dbschema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN
if err != nil {
return terror.Annotatef(err, "get GTID sets for position %s", pos)
}
gs, err = utils.AddGSetWithPurged(tctx.Context(), gs, dbConn.DBConn)
if err != nil {
return terror.Annotatef(err, "get GTID sets for position %s", pos)
}
logger.Info("got global checkpoint GTID sets", log.WrapStringerField("GTID sets", gs))
}
}
Expand Down Expand Up @@ -176,21 +180,6 @@ func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sour

// getGTIDsForPos gets the GTID sets for the position.
func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader) (gs gtid.Set, err error) {
// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
defer func() {
if err == nil && gs != nil {
oldGs := gs.Clone()
if gs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs))
}
}
}()

// NOTE: because we have multiple unit test cases updating/clearing binlog in the upstream,
// we may encounter errors when reading binlog event but cleared by another test case.
failpoint.Inject("MockGetGTIDsForPos", func(val failpoint.Value) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/v1dbschema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) {
endGS, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "ccb992ad-a557-11ea-ba6a-0242ac140002:1-16")
)

c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil) // need `ResetStart`.
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos")
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:1-9")`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged")

dbConn, err := t.db.GetBaseConn(tctx.Ctx)
c.Assert(err, IsNil)

defer func() {
_, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{
`DROP DATABASE ` + cfg.MetaSchema,
Expand Down
44 changes: 18 additions & 26 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,20 +379,18 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))

if result.LatestGTIDs != nil {
gs := result.LatestGTIDs
oldGs1 := gs.Clone()
// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
if gs.ResetStart() {
r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", gs))
oldGs2 := latestGTID.Clone()
if latestGTID.ResetStart() {
r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID))
}
dbConn, err2 := r.db.Conn(ctx)
if err2 != nil {
return err2
}
defer dbConn.Close()
result.LatestGTIDs, err2 = utils.AddGSetWithPurged(ctx, result.LatestGTIDs, dbConn)
if err2 != nil {
return err2
}
latestGTID, err2 = utils.AddGSetWithPurged(ctx, latestGTID, dbConn)
if err2 != nil {
return err2
}
}

Expand Down Expand Up @@ -965,7 +963,7 @@ func (r *Relay) setSyncConfig() error {
}

syncerCfg := replication.BinlogSyncerConfig{
ServerID: uint32(r.cfg.ServerID),
ServerID: r.cfg.ServerID,
Flavor: r.cfg.Flavor,
Host: r.cfg.From.Host,
Port: uint16(r.cfg.From.Port),
Expand All @@ -990,7 +988,6 @@ func (r *Relay) setSyncConfig() error {

// AdjustGTID implements Relay.AdjustGTID
// starting sync at returned gset will wholly fetch a binlog from beginning of the file.
// TODO: check if starting fetch at the middle of binlog is also acceptable
func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) {
// setup a TCP binlog reader (because no relay can be used when upgrading).
syncCfg := r.syncerCfg
Expand All @@ -1006,15 +1003,10 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error)
return nil, err
}

// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
oldGs := resultGs.Clone()
if resultGs.ResetStart() {
r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", resultGs))
dbConn, err2 := r.db.Conn(ctx)
if err2 != nil {
return nil, err2
}
return resultGs, nil
defer dbConn.Close()
return utils.AddGSetWithPurged(ctx, resultGs, dbConn)
}
11 changes: 8 additions & 3 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
"github.com/siddontang/go-mysql/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -178,6 +179,9 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
relayCfg = newRelayCfg(c, mysql.MySQLFlavor)
r = NewRelay(relayCfg).(*Relay)
)
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("406a3f61-690d-11e7-87c5-6c92bf46f384:1-122")`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged")
c.Assert(r.Init(context.Background()), IsNil)
// purge old relay dir
f, err := os.Create(filepath.Join(r.cfg.RelayDir, "old_relay_log"))
Expand Down Expand Up @@ -256,9 +260,10 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) {
previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14"
latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495"
recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" // 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 --> 406a3f61-690d-11e7-87c5-6c92bf46f384:1-456
filename = "mysql-bin.000001"
startPos = gmysql.Position{Name: filename, Pos: 123}
// if no @@gtid_purged, 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 should be not changed
recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
filename = "mysql-bin.000001"
startPos = gmysql.Position{Name: filename, Pos: 123}

parser2 = parser.New()
relayCfg = newRelayCfg(c, mysql.MySQLFlavor)
Expand Down
56 changes: 56 additions & 0 deletions tests/gtid/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# diff Configuration.

log-level = "info"

chunk-size = 1000

check-thread-count = 4

sample-percent = 100

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "gtid"
tables = ["~t.*"]

[[table-config]]
schema = "gtid"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "gtid"
table = "t1"

[[table-config]]
schema = "gtid"
table = "t2"

[[table-config.source-tables]]
instance-id = "source-2"
schema = "gtid"
table = "t2"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = "123456"
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
5 changes: 5 additions & 0 deletions tests/gtid/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"

rpc-timeout = "30s"
23 changes: 23 additions & 0 deletions tests/gtid/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
name: test
task-mode: all
is-sharding: false
clean-dump-file: false

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"

- source-id: "mysql-replica-02"
block-allow-list: "instance"

black-white-list: # compatible with deprecated config
instance:
do-dbs: ["gtid"]

2 changes: 2 additions & 0 deletions tests/gtid/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
2 changes: 2 additions & 0 deletions tests/gtid/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261"
Loading

0 comments on commit 1235c74

Please sign in to comment.