Skip to content

Commit

Permalink
dm: upgrade go-mysql library (#11043) (#11189)
Browse files Browse the repository at this point in the history
close #11041
  • Loading branch information
ti-chi-bot authored May 28, 2024
1 parent bf93d7d commit 26b901c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 28 deletions.
2 changes: 1 addition & 1 deletion dm/pkg/binlog/common/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
var (
// MaxBinlogSyncerReconnect is the max reconnection times for binlog syncer in go-mysql.
MaxBinlogSyncerReconnect = 60
// SlaveReadTimeout is slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-slave.html#sysvar_slave_net_timeout
// SlaveReadTimeout is slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-replica.html#sysvar_slave_net_timeout
SlaveReadTimeout = 1 * time.Minute
// MasterHeartbeatPeriod is the master server send heartbeat period, ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html
MasterHeartbeatPeriod = 30 * time.Second
Expand Down
18 changes: 13 additions & 5 deletions dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{mariaGTID.DomainID: mariaGTID}
gtidSet.Sets = map[uint32]map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: {
mariaGTID.ServerID: mariaGTID,
},
}
clone = gtidSet
default:
err = terror.ErrBinlogGTIDSetNotValid.Generate(gSet, flavor)
Expand Down Expand Up @@ -203,11 +207,15 @@ func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet)
}
if len(mariaGTIDs.Sets) != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(len(mariaGTIDs.Sets), gSet)
}
gtidCount := 0
var mariaGTID *gmysql.MariadbGTID
for _, mariaGTID = range mariaGTIDs.Sets {
for _, set := range mariaGTIDs.Sets {
gtidCount += len(set)
for _, mariaGTID = range set {
}
}
if gtidCount != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(gtidCount, gSet)
}
return mariaGTID, nil
default:
Expand Down
32 changes: 17 additions & 15 deletions dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,21 +736,23 @@ func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32,
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Number of GTIDs %d", numOfGTIDs)
}

for _, mGTID := range mariaDBGSet.Sets {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
for _, set := range mariaDBGSet.Sets {
for _, mGTID := range set {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ func TestGenRowsEvent(t *testing.T) {
require.Equal(t, tableID, rowsEvBody.TableID)
require.Equal(t, uint64(len(rows[0])), rowsEvBody.ColumnCount)
require.Equal(t, 0, rowsEvBody.Version) // WRITE_ROWS_EVENTv0
require.Nil(t, rowsEvBody.ExtraData)
require.Equal(t, rows, rowsEvBody.Rows)

// multi rows, with different length, invalid
Expand Down Expand Up @@ -664,7 +663,7 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.True(t, ok)
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 1)
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID], gtidListEvBody.GTIDs[0])
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID][gtidListEvBody.GTIDs[0].ServerID], gtidListEvBody.GTIDs[0])

// valid gSet with multi GTIDs
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,4-4-4")
Expand All @@ -684,7 +683,9 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 4)
for _, mGTID := range gtidListEvBody.GTIDs {
mGTID2, ok := mGSet.Sets[mGTID.DomainID]
set, ok := mGSet.Sets[mGTID.DomainID]
require.True(t, ok)
mGTID2, ok := set[mGTID.ServerID]
require.True(t, ok)
require.Equal(t, *mGTID2, mGTID)
}
Expand Down
6 changes: 5 additions & 1 deletion dm/pkg/binlog/event/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(previousGTIDs)
}
prevGTID, ok := prevGSet.Sets[mariaGTID.DomainID]
set, ok := prevGSet.Sets[mariaGTID.DomainID]
if !ok {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
prevGTID, ok := set[mariaGTID.ServerID]
if !ok || prevGTID.ServerID != mariaGTID.ServerID || prevGTID.SequenceNumber != mariaGTID.SequenceNumber {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/glebarez/go-sqlite v1.21.2
github.com/glebarez/sqlite v1.7.0
github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd
github.com/go-mysql-org/go-mysql v1.7.1-0.20240507075657-2bd4573edde2
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/go-sql-driver/mysql v1.7.1
github.com/goccy/go-json v0.10.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd h1:lqWdv8GEYqF1deivEmnSx81GfcAUZ/FoxilGxm/kwWs=
github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd/go.mod h1:kOk/pFv3q5EPspyQfDRGLmEA6wfMvIeV4DmThwzkNzs=
github.com/go-mysql-org/go-mysql v1.7.1-0.20240507075657-2bd4573edde2 h1:DU4x9rUmzjuOnXg5hgX5kUqj1HmKzN+rU3bsIy7oWok=
github.com/go-mysql-org/go-mysql v1.7.1-0.20240507075657-2bd4573edde2/go.mod h1:kwbF156Z9Sy8amP3E1SZp7/s/0PuJj/xKaOWToQiq0Y=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
Expand Down

0 comments on commit 26b901c

Please sign in to comment.