Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: use random value as server-id if server-id is not set #329

Merged
merged 32 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3f9b12a
use random server-id if not set
WangXiangUSTC Oct 21, 2019
6b1ee0a
remove server-id in integration test
WangXiangUSTC Oct 21, 2019
177dbdf
add unit test for get slave hosts
WangXiangUSTC Oct 21, 2019
ce20637
add unit test for config
WangXiangUSTC Oct 21, 2019
8b8453c
remove useless code
WangXiangUSTC Oct 21, 2019
2474fe0
update variable name
WangXiangUSTC Oct 21, 2019
2a1e83c
Merge branch 'master' into xiang/server_id
csuzhangxc Oct 22, 2019
394d4c2
address comment
WangXiangUSTC Oct 22, 2019
d14bd4c
minor update
WangXiangUSTC Oct 22, 2019
84efa8c
fix function name
WangXiangUSTC Oct 22, 2019
b389461
Update dm/worker/config.go
WangXiangUSTC Oct 22, 2019
4830291
address comment
WangXiangUSTC Oct 22, 2019
38f6c59
Merge branch 'xiang/server_id' of https://github.com/pingcap/dm into …
WangXiangUSTC Oct 22, 2019
457bbbf
change server id to uint32
WangXiangUSTC Oct 22, 2019
ec9bb07
fix
WangXiangUSTC Oct 22, 2019
3fb9a44
update comment
WangXiangUSTC Oct 22, 2019
e668f68
address comnemnt
WangXiangUSTC Oct 22, 2019
0c4b592
fix test
WangXiangUSTC Oct 22, 2019
7d0f754
format code
WangXiangUSTC Oct 22, 2019
3bc5c5f
fix test
WangXiangUSTC Oct 22, 2019
3aedf77
Merge branch 'master' into xiang/server_id
WangXiangUSTC Oct 23, 2019
0a4ce74
fix test
WangXiangUSTC Oct 23, 2019
aa2b214
Merge branch 'xiang/server_id' of https://github.com/pingcap/dm into …
WangXiangUSTC Oct 23, 2019
02a6fcb
resolve conflit
WangXiangUSTC Oct 23, 2019
6ca9c90
address comment
WangXiangUSTC Oct 23, 2019
898754f
add rand seed
WangXiangUSTC Oct 23, 2019
69ca67e
fix unit for heartbeat
WangXiangUSTC Oct 23, 2019
7b923f4
fix heartbeat
WangXiangUSTC Oct 23, 2019
5bbc661
fix
WangXiangUSTC Oct 23, 2019
354a2fc
address comment
WangXiangUSTC Oct 25, 2019
b1a8fbd
address comment
WangXiangUSTC Oct 25, 2019
9b89300
address comemnt
WangXiangUSTC Oct 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type SubTaskConfig struct {
IgnoreCheckingItems []string `toml:"ignore-checking-items" json:"ignore-checking-items"`
// it represents a MySQL/MariaDB instance or a replica group
SourceID string `toml:"source-id" json:"source-id"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
MetaSchema string `toml:"meta-schema" json:"meta-schema"`
RemoveMeta bool `toml:"remove-meta" json:"remove-meta"`
Expand Down
9 changes: 5 additions & 4 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"math"
"math/rand"
"strings"
"time"
Expand All @@ -46,7 +47,7 @@ const (
// flavorGetTimeout is timeout for getting some information from DB
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
dbGetTimeout = 30 * time.Second

maxServerID = 1<<32 - 1
maxServerID uint32 = math.MaxUint32
)

// SampleConfigFile is sample config file of dm-worker
Expand Down Expand Up @@ -100,7 +101,7 @@ type Config struct {
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
MetaDir string `toml:"meta-dir" json:"meta-dir"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`

Expand Down Expand Up @@ -342,9 +343,9 @@ func (c *Config) adjustServerID(ctx context.Context, db *sql.DB) error {
}

for i := 0; i < 5; i++ {
randomValue := rand.Intn(100000)
randomValue := uint32(rand.Intn(100000))
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
randomServerID := maxServerID/10 + randomValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sames that the only use of maxServerID is maxServerID/10. How about set maxServerID directly to math.MaxUint32/10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update 354a2fc

Copy link
Contributor

@lichunzhu lichunzhu Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about storing math.MaxUint32/10 into a variable so that we don't need to compute it every time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 9b89300

if _, ok := serverIDs[int64(randomServerID)]; ok {
if _, ok := serverIDs[randomServerID]; ok {
continue
}

Expand Down
36 changes: 22 additions & 14 deletions pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,23 @@ func GetFlavor(ctx context.Context, db *sql.DB) (string, error) {
}

// GetAllServerID gets all slave server id and master server id
func GetAllServerID(ctx context.Context, db *sql.DB) (map[int64]interface{}, error) {
func GetAllServerID(ctx context.Context, db *sql.DB) (map[uint32]interface{}, error) {
serverIDs, err := GetSlaveServerID(ctx, db)
if err != nil {
return nil, err
}

masterServerID, err := GetServerID(db)
if err != nil {
return nil, err
}

serverIDs[masterServerID] = struct{}{}
return serverIDs, nil
}

// GetSlaveServerID gets all slave server id
func GetSlaveServerID(ctx context.Context, db *sql.DB) (map[uint32]interface{}, error) {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
rows, err := db.QueryContext(ctx, `SHOW SLAVE HOSTS`)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
Expand Down Expand Up @@ -92,7 +108,7 @@ func GetAllServerID(ctx context.Context, db *sql.DB) (map[int64]interface{}, err
masterID sql.NullInt64
slaveUUID sql.NullString
)
serverIDs := make(map[int64]interface{})
serverIDs := make(map[uint32]interface{})
for rows.Next() {
if len(rowColumns) == 5 {
err = rows.Scan(&serverID, &host, &port, &masterID, &slaveUUID)
Expand All @@ -104,20 +120,12 @@ func GetAllServerID(ctx context.Context, db *sql.DB) (map[int64]interface{}, err
}

if serverID.Valid {
serverIDs[serverID.Int64] = struct{}{}
serverIDs[uint32(serverID.Int64)] = struct{}{}
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
} else {
// should never happened
log.L().Warn("get invalid server_id when execute `SHOW SLAVE HOSTS;`")
continue
}

if masterID.Valid {
serverIDs[masterID.Int64] = struct{}{}
} else {
// should never happened
log.L().Warn("get invalid master_id when execute `SHOW SLAVE HOSTS;`")
continue
}
}

if rows.Err() != nil {
Expand Down Expand Up @@ -260,14 +268,14 @@ func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) {
}

// GetServerID gets server's `server_id`
func GetServerID(db *sql.DB) (int64, error) {
func GetServerID(db *sql.DB) (uint32, error) {
serverIDStr, err := GetGlobalVariable(db, "server_id")
if err != nil {
return 0, err
}

serverID, err := strconv.ParseInt(serverIDStr, 10, 64)
return serverID, terror.ErrInvalidServerID.Delegate(err, serverIDStr)
serverID, err := strconv.ParseInt(serverIDStr, 10, 32)
return uint32(serverID), terror.ErrInvalidServerID.Delegate(err, serverIDStr)
}

// GetMariaDBGtidDomainID gets MariaDB server's `gtid_domain_id`
Expand Down
23 changes: 17 additions & 6 deletions pkg/utils/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (

func (t *testUtilsSuite) TestGetAllServerID(c *C) {
testCases := []struct {
masterID int64
serverIDs []int64
}{
{
[]int64{1, 2, 3},
1,
[]int64{2, 3, 4},
}, {
2,
[]int64{},
},
}
Expand All @@ -39,24 +42,29 @@ func (t *testUtilsSuite) TestGetAllServerID(c *C) {

for _, testCase := range testCases {
for _, flavor := range flavors {
t.createMockResult(mock, testCase.serverIDs, flavor)
slaveHosts, err := GetAllServerID(context.Background(), db)
t.createMockResult(mock, testCase.masterID, testCase.serverIDs, flavor)
serverIDs, err := GetAllServerID(context.Background(), db)
c.Assert(err, IsNil)

for _, serverID := range testCase.serverIDs {
_, ok := slaveHosts[serverID]
_, ok := serverIDs[serverID]
c.Assert(ok, IsTrue)
}

_, ok := serverIDs[testCase.masterID]
c.Assert(ok, IsTrue)
}
}

err = mock.ExpectationsWereMet()
c.Assert(err, IsNil)
}

func (t *testUtilsSuite) createMockResult(mock sqlmock.Sqlmock, serverIDs []int64, flavor string) {
func (t *testUtilsSuite) createMockResult(mock sqlmock.Sqlmock, masterID int64, serverIDs []int64, flavor string) {
expectQuery := mock.ExpectQuery("SHOW SLAVE HOSTS")

host := "test"
port := 3306
masterID := 1
slaveUUID := "test"

if flavor == mysql.MariaDBFlavor {
Expand All @@ -74,5 +82,8 @@ func (t *testUtilsSuite) createMockResult(mock sqlmock.Sqlmock, serverIDs []int6
expectQuery.WillReturnRows(rows)
}

mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'server_id'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("server_id", masterID))


return
}
2 changes: 1 addition & 1 deletion relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Config struct {
EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`
From DBConfig `toml:"data-source" json:"data-source"`
Expand Down
10 changes: 5 additions & 5 deletions syncer/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type HeartbeatConfig struct {
// serverID from dm-worker (relay)
// now, heartbeat not be synced to downstream
// so it will not be used by user directly and also enough to differ from other dm-worker's
serverID int
serverID uint32
masterCfg config.DBConfig // master server's DBConfig
}

Expand Down Expand Up @@ -191,13 +191,13 @@ func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]int
}

latest := data[len(data)-1]
serverID, ok := latest[1].(int32)
serverID, ok := latest[1].(uint32)
if !ok {
h.logger.Warn("invalid data server_id for heartbeat", zap.Reflect("server ID", latest[1]))
return
}
if int(serverID) != h.cfg.serverID {
h.logger.Debug("ignore mismatched server_id for heartbeat", zap.Int32("obtained server ID", serverID), zap.Int("excepted server ID", h.cfg.serverID))
if serverID != h.cfg.serverID {
h.logger.Debug("ignore mismatched server_id for heartbeat", zap.Uint32("obtained server ID", serverID), zap.Uint32("excepted server ID", h.cfg.serverID))
return // only ignore
}

Expand Down Expand Up @@ -295,7 +295,7 @@ func (h *Heartbeat) updateTS() error {
// when we not need to support MySQL <=5.5, we can replace with `UTC_TIMESTAMP(6)`
query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(), ?)", h.schema, h.table)
_, err := h.master.Exec(query, h.cfg.serverID)
h.logger.Debug("update ts", zap.String("sql", query), zap.Int("server ID", h.cfg.serverID))
h.logger.Debug("update ts", zap.String("sql", query), zap.Uint32("server ID", h.cfg.serverID))
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream)
}

Expand Down
2 changes: 1 addition & 1 deletion syncer/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewOnlineDDLStorage(newtctx *tcontext.Context, cfg *config.SubTaskConfig) *
cfg: cfg,
schema: cfg.MetaSchema,
table: fmt.Sprintf("%s_onlineddl", cfg.Name),
id: strconv.Itoa(cfg.ServerID),
id: strconv.Itoa(int(cfg.ServerID)),
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
ddls: make(map[string]map[string]*GhostDDLInfo),
tctx: newtctx,
}
Expand Down
2 changes: 1 addition & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2396,7 +2396,7 @@ func (s *Syncer) checkpointID() string {
if len(s.cfg.SourceID) > 0 {
return s.cfg.SourceID
}
return strconv.Itoa(s.cfg.ServerID)
return strconv.Itoa(int(s.cfg.ServerID))
}

// DDLInfo returns a chan from which can receive DDLInfo
Expand Down