Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/gtid: remove gtid.Set interface #5246

Merged
merged 11 commits into from
May 23, 2022
3 changes: 1 addition & 2 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/unit"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/log"
pkgstreamer "github.com/pingcap/tiflow/dm/pkg/streamer"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -132,7 +131,7 @@ func (d *DummyRelay) Close() {}
func (d *DummyRelay) IsClosed() bool { return false }

// SaveMeta implements Process interface.
func (d *DummyRelay) SaveMeta(pos mysql.Position, gset gtid.Set) error {
func (d *DummyRelay) SaveMeta(pos mysql.Position, gset mysql.GTIDSet) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,21 +670,21 @@ func getFakeLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig)
gset1, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-30")
gset2, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50")
gset3, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50,ba8f633f-1f15-11eb-b1c7-0242ac110002:1")
loc1 := binlog.InitLocation(
loc1 := binlog.NewLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
},
gset1,
)
loc2 := binlog.InitLocation(
loc2 := binlog.NewLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 12,
},
gset2,
)
loc3 := binlog.InitLocation(
loc3 := binlog.NewLocation(
mysql.Position{
Name: "mysql-binlog.00003",
},
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (w *SourceWorker) updateSourceStatus(ctx context.Context, needLock bool) er
if err != nil {
return err
}
status.Location = binlog.InitLocation(pos, gtidSet)
status.Location = binlog.NewLocation(pos, gtidSet)
ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout)
defer cancel2()
binlogs, err := binlog.GetBinaryLogs(ctx2, w.sourceDB.DB)
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ func (st *SubTask) ShardDDLOperation() *pessimism.Operation {
// from Load unit to Sync unit, wait for relay-log catched up with mydumper binlog position.
func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error {
var (
gset1 gtid.Set
gset2 gtid.Set
gset1 mysql.GTIDSet
gset2 mysql.GTIDSet
pos1 *mysql.Position
pos2 *mysql.Position
err error
Expand Down
4 changes: 3 additions & 1 deletion dm/dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/unit"
Expand Down Expand Up @@ -48,7 +49,8 @@ var _ = Suite(&testSubTask{})

func (t *testSubTask) TestCreateUnits(c *C) {
cfg := &config.SubTaskConfig{
Mode: "xxx",
Mode: "xxx",
Flavor: mysql.MySQLFlavor,
}
worker := "worker"
c.Assert(createUnits(cfg, nil, worker, nil), HasLen, 0)
Expand Down
23 changes: 9 additions & 14 deletions dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand All @@ -29,7 +28,7 @@ type DDLDMLResult struct {
Events []*replication.BinlogEvent
Data []byte // data contain all events
LatestPos uint32
LatestGTID gtid.Set
LatestGTID gmysql.GTIDSet
}

// GenCommonFileHeader generates a common binlog file header.
Expand All @@ -42,7 +41,7 @@ type DDLDMLResult struct {
// 2. FormatDescriptionEvent
// 3. MariadbGTIDListEvent, depends on genGTID
// -. MariadbBinlogCheckPointEvent, not added yet
func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set, genGTID bool, ts int64) ([]*replication.BinlogEvent, []byte, error) {
func GenCommonFileHeader(flavor string, serverID uint32, gSet gmysql.GTIDSet, genGTID bool, ts int64) ([]*replication.BinlogEvent, []byte, error) {
if ts == 0 {
ts = time.Now().Unix()
}
Expand Down Expand Up @@ -101,7 +100,7 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set, genGTID
}

// GenCommonGTIDEvent generates a common GTID event.
func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gtid.Set, anonymous bool, ts int64) (*replication.BinlogEvent, error) {
func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gmysql.GTIDSet, anonymous bool, ts int64) (*replication.BinlogEvent, error) {
singleGTID, err := verifySingleGTID(flavor, gSet)
if err != nil {
return nil, terror.Annotate(err, "verify single GTID in set")
Expand Down Expand Up @@ -147,7 +146,7 @@ func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet g
}

// GTIDIncrease returns a new GTID with GNO/SequenceNumber +1.
func GTIDIncrease(flavor string, gSet gtid.Set) (gtid.Set, error) {
func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {
singleGTID, err := verifySingleGTID(flavor, gSet)
if err != nil {
return nil, terror.Annotate(err, "verify single GTID in set")
Expand All @@ -161,32 +160,28 @@ func GTIDIncrease(flavor string, gSet gtid.Set) (gtid.Set, error) {
uuidSet.Intervals[0].Stop++
gtidSet := new(gmysql.MysqlGTIDSet)
gtidSet.Sets = map[string]*gmysql.UUIDSet{uuidSet.SID.String(): uuidSet}
err = clone.Set(gtidSet)
clone = gtidSet
case gmysql.MariaDBFlavor:
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{mariaGTID.DomainID: mariaGTID}
err = clone.Set(gtidSet)
clone = gtidSet
default:
err = terror.ErrBinlogGTIDSetNotValid.Generate(gSet, flavor)
}
return clone, err
}

// verifySingleGTID verifies gSet whether only containing a single valid GTID.
func verifySingleGTID(flavor string, gSet gtid.Set) (interface{}, error) {
func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
if gSet == nil || len(gSet.String()) == 0 {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}
origin := gSet.Origin()
if origin == nil {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(gSet)
}

switch flavor {
case gmysql.MySQLFlavor:
mysqlGTIDs, ok := origin.(*gmysql.MysqlGTIDSet)
mysqlGTIDs, ok := gSet.(*gmysql.MysqlGTIDSet)
if !ok {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(gSet)
}
Expand All @@ -206,7 +201,7 @@ func verifySingleGTID(flavor string, gSet gtid.Set) (interface{}, error) {
}
return uuidSet, nil
case gmysql.MariaDBFlavor:
mariaGTIDs, ok := origin.(*gmysql.MariadbGTIDSet)
mariaGTIDs, ok := gSet.(*gmysql.MariadbGTIDSet)
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet)
}
Expand Down
8 changes: 4 additions & 4 deletions dm/pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) {
flavor = gmysql.MySQLFlavor
serverID uint32 = 101
gSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:1-94321383,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,686e1ab6-c47e-11e7-a42c-6c92bf46f384:1-34981190,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-7041423,05474d3c-28c7-11e7-8352-203db246dd3d:1-170,10b039fc-c843-11e7-8f6a-1866daf8d810:1-308290454"
gSet gtid.Set
gSet gmysql.GTIDSet
)
gSet, err := gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) {
var (
flavor = gmysql.MySQLFlavor
serverID uint32 = 101
gSet gtid.Set
gSet gmysql.GTIDSet
latestPos uint32 = 123
)

Expand Down Expand Up @@ -201,8 +201,8 @@ func (t *testCommonSuite) TestGTIDIncrease(c *C) {
var (
flavor = gmysql.MySQLFlavor
gSetStr = "03fc0263-28c7-11e7-a653-6c0b84d59f30:123"
gSetIn gtid.Set
gSetOut gtid.Set
gSetIn gmysql.GTIDSet
gSetOut gmysql.GTIDSet
)

// increase for MySQL
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/binlog/event/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import (
"bytes"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

// GenDDLEvents generates binlog events for DDL statements.
// events: [GTIDEvent, QueryEvent]
func GenDDLEvents(flavor string, serverID, latestPos uint32, latestGTID gtid.Set, schema, query string, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
func GenDDLEvents(flavor string, serverID, latestPos uint32, latestGTID mysql.GTIDSet, schema, query string, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
if ts == 0 {
ts = time.Now().Unix()
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/binlog/event/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"bytes"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand All @@ -42,7 +42,7 @@ type DMLData struct {
// if DMLData.Query is not empty:
// events: [GTIDEvent, QueryEvent, QueryEvent, ..., XIDEvent]
// NOTE: multi <QueryEvent> can be in events.
func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, eventType replication.EventType, xid uint64, dmlData []*DMLData, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID mysql.GTIDSet, eventType replication.EventType, xid uint64, dmlData []*DMLData, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
if len(dmlData) == 0 {
return nil, terror.ErrBinlogDMLEmptyData.Generate()
}
Expand Down
18 changes: 4 additions & 14 deletions dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -200,18 +199,13 @@ func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogNa
// we ref:
// a. https://github.com/vitessio/vitess/blob/28e7e5503a6c3d3b18d4925d95f23ebcb6f25c8e/go/mysql/binlog_event_mysql56.go#L56
// b. https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gmysql.GTIDSet) (*replication.BinlogEvent, error) {
if gSet == nil {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}

origin := gSet.Origin()
if origin == nil {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(gSet.String())
}

// event payload, GTID set encoded in it
payload := origin.Encode()
payload := gSet.Encode()

buf := new(bytes.Buffer)
event := &replication.PreviousGTIDsEvent{}
Expand Down Expand Up @@ -720,16 +714,12 @@ func GenXIDEvent(header *replication.EventHeader, latestPos uint32, xid uint64)

// GenMariaDBGTIDListEvent generates a MariadbGTIDListEvent.
// ref: https://mariadb.com/kb/en/library/gtid_list_event/
func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32, gSet gmysql.GTIDSet) (*replication.BinlogEvent, error) {
if gSet == nil || len(gSet.String()) == 0 {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}

origin := gSet.Origin()
if origin == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet.String())
}
mariaDBGSet, ok := origin.(*gmysql.MariadbGTIDSet)
mariaDBGSet, ok := gSet.(*gmysql.MariadbGTIDSet)
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet.String())
}
Expand Down
8 changes: 4 additions & 4 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ func (t *testEventSuite) TestGenMariaDBGTIDListEvent(c *C) {
ServerID: 11,
Flags: 0x01,
}
latestPos uint32 = 4
gSet gtid.Set // invalid
latestPos uint32 = 4
gSet gmysql.GTIDSet // invalid
)

// invalid gSet
Expand All @@ -654,7 +654,7 @@ func (t *testEventSuite) TestGenMariaDBGTIDListEvent(c *C) {
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-3")
c.Assert(err, IsNil)
c.Assert(gSet, NotNil)
mGSet, ok := gSet.Origin().(*gmysql.MariadbGTIDSet)
mGSet, ok := gSet.(*gmysql.MariadbGTIDSet)
c.Assert(ok, IsTrue)
c.Assert(mGSet, NotNil)

Expand All @@ -676,7 +676,7 @@ func (t *testEventSuite) TestGenMariaDBGTIDListEvent(c *C) {
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,4-4-4")
c.Assert(err, IsNil)
c.Assert(gSet, NotNil)
mGSet, ok = gSet.Origin().(*gmysql.MariadbGTIDSet)
mGSet, ok = gSet.(*gmysql.MariadbGTIDSet)
c.Assert(ok, IsTrue)
c.Assert(mGSet, NotNil)

Expand Down
19 changes: 7 additions & 12 deletions dm/pkg/binlog/event/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ type Generator struct {
Flavor string
ServerID uint32
LatestPos uint32
LatestGTID gtid.Set
ExecutedGTIDs gtid.Set
LatestGTID gmysql.GTIDSet
ExecutedGTIDs gmysql.GTIDSet
LatestXID uint64

GenGTID bool
AnonymousGTID bool
}

// NewGenerator creates a new instance of Generator.
func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, previousGTIDs gtid.Set, latestXID uint64) (*Generator, error) {
func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID gmysql.GTIDSet, previousGTIDs gmysql.GTIDSet, latestXID uint64) (*Generator, error) {
return newGenerator(flavor, "5.7.0", serverID, latestPos, latestGTID, previousGTIDs, latestXID, true)
}

Expand All @@ -49,12 +49,7 @@ func NewGeneratorV2(flavor, version, latestGTIDStr string, enableGTID bool) (*Ge
return newGenerator(flavor, version, 1, 0, latestGTID, previousGTIDSet, 0, enableGTID)
}

func newGenerator(flavor, version string, serverID uint32, latestPos uint32, latestGTID gtid.Set, previousGTIDs gtid.Set, latestXID uint64, genGTID bool) (*Generator, error) {
prevOrigin := previousGTIDs.Origin()
if prevOrigin == nil {
return nil, terror.ErrPreviousGTIDsNotValid.Generate(previousGTIDs)
}

func newGenerator(flavor, version string, serverID uint32, latestPos uint32, latestGTID gmysql.GTIDSet, previousGTIDs gmysql.GTIDSet, latestXID uint64, genGTID bool) (*Generator, error) {
singleGTID, err := verifySingleGTID(flavor, latestGTID)
if err != nil {
return nil, terror.Annotate(err, "verify single latest GTID in set")
Expand All @@ -63,7 +58,7 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
switch flavor {
case gmysql.MySQLFlavor:
uuidSet := singleGTID.(*gmysql.UUIDSet)
prevGSet, ok := prevOrigin.(*gmysql.MysqlGTIDSet)
prevGSet, ok := previousGTIDs.(*gmysql.MysqlGTIDSet)
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(previousGTIDs)
}
Expand All @@ -88,7 +83,7 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
return nil, terror.ErrBinlogMariaDBServerIDMismatch.Generate(mariaGTID.ServerID, serverID)
}
// latestGTID should be one of previousGTIDs
prevGSet, ok := prevOrigin.(*gmysql.MariadbGTIDSet)
prevGSet, ok := previousGTIDs.(*gmysql.MariadbGTIDSet)
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(previousGTIDs)
}
Expand Down Expand Up @@ -220,7 +215,7 @@ func (g *Generator) Rotate(nextName string, ts int64) (*replication.BinlogEvent,
return ev, ev.RawData, nil
}

func (g *Generator) updateLatestPosGTID(latestPos uint32, latestGTID gtid.Set) {
func (g *Generator) updateLatestPosGTID(latestPos uint32, latestGTID gmysql.GTIDSet) {
g.LatestPos = latestPos
if latestGTID != nil {
g.LatestGTID = latestGTID
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *testGeneratorSuite) TestGenerateForMariaDB(c *C) {
t.testGenerate(c, flavor, serverID, latestGTID, previousGTIDSet, latestXID)
}

func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, latestGTID gtid.Set, previousGTIDSet gtid.Set, latestXID uint64) {
func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, latestGTID gmysql.GTIDSet, previousGTIDSet gmysql.GTIDSet, latestXID uint64) {
// write some events to file
dir := c.MkDir()
filename := filepath.Join(dir, "mysql-bin-test.000001")
Expand Down
Loading