Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: use random value as server-id if server-id is not set #329

Merged
merged 32 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3f9b12a
use random server-id if not set
WangXiangUSTC Oct 21, 2019
6b1ee0a
remove server-id in integration test
WangXiangUSTC Oct 21, 2019
177dbdf
add unit test for get slave hosts
WangXiangUSTC Oct 21, 2019
ce20637
add unit test for config
WangXiangUSTC Oct 21, 2019
8b8453c
remove useless code
WangXiangUSTC Oct 21, 2019
2474fe0
update variable name
WangXiangUSTC Oct 21, 2019
2a1e83c
Merge branch 'master' into xiang/server_id
csuzhangxc Oct 22, 2019
394d4c2
address comment
WangXiangUSTC Oct 22, 2019
d14bd4c
minor update
WangXiangUSTC Oct 22, 2019
84efa8c
fix function name
WangXiangUSTC Oct 22, 2019
b389461
Update dm/worker/config.go
WangXiangUSTC Oct 22, 2019
4830291
address comment
WangXiangUSTC Oct 22, 2019
38f6c59
Merge branch 'xiang/server_id' of https://github.com/pingcap/dm into …
WangXiangUSTC Oct 22, 2019
457bbbf
change server id to uint32
WangXiangUSTC Oct 22, 2019
ec9bb07
fix
WangXiangUSTC Oct 22, 2019
3fb9a44
update comment
WangXiangUSTC Oct 22, 2019
e668f68
address comnemnt
WangXiangUSTC Oct 22, 2019
0c4b592
fix test
WangXiangUSTC Oct 22, 2019
7d0f754
format code
WangXiangUSTC Oct 22, 2019
3bc5c5f
fix test
WangXiangUSTC Oct 22, 2019
3aedf77
Merge branch 'master' into xiang/server_id
WangXiangUSTC Oct 23, 2019
0a4ce74
fix test
WangXiangUSTC Oct 23, 2019
aa2b214
Merge branch 'xiang/server_id' of https://github.com/pingcap/dm into …
WangXiangUSTC Oct 23, 2019
02a6fcb
resolve conflit
WangXiangUSTC Oct 23, 2019
6ca9c90
address comment
WangXiangUSTC Oct 23, 2019
898754f
add rand seed
WangXiangUSTC Oct 23, 2019
69ca67e
fix unit for heartbeat
WangXiangUSTC Oct 23, 2019
7b923f4
fix heartbeat
WangXiangUSTC Oct 23, 2019
5bbc661
fix
WangXiangUSTC Oct 23, 2019
354a2fc
address comment
WangXiangUSTC Oct 25, 2019
b1a8fbd
address comment
WangXiangUSTC Oct 25, 2019
9b89300
address comemnt
WangXiangUSTC Oct 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 90 additions & 23 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package worker
import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"strings"
"time"

Expand All @@ -39,18 +41,22 @@ import (
)

const (
// flavorReadTimeout is readTimeout for DB connection in adjustFlavor
flavorReadTimeout = "30s"
// flavorGetTimeout is timeout for getting version info from DB
flavorGetTimeout = 30 * time.Second
// dbReadTimeout is readTimeout for DB connection in adjust
dbReadTimeout = "30s"
// flavorGetTimeout is timeout for getting some information from DB
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
dbGetTimeout = 30 * time.Second

maxServerID = 1<<32 - 1
)

// SampleConfigFile is sample config file of dm-worker
// later we can read it from dm/worker/dm-worker.toml
// and assign it to SampleConfigFile while we build dm-worker
var SampleConfigFile string

var applyNewBaseDB = conn.DefaultDBProvider.Apply
var (
getAllServerIDFunc = utils.GetAllServerID
)

// NewConfig creates a new base config for worker.
func NewConfig() *Config {
Expand Down Expand Up @@ -199,9 +205,7 @@ func (c *Config) Parse(arguments []string) error {
// assign tracer id to source id
c.Tracer.Source = c.SourceID

c.From.Adjust()
c.Checker.adjust()
err = c.adjustFlavor()
err = c.adjust()
if err != nil {
return err
}
Expand Down Expand Up @@ -256,8 +260,54 @@ func (c *Config) configFromFile(path string) error {
return c.verify()
}

func (c *Config) adjust() error {
c.From.Adjust()
c.Checker.adjust()

if c.Flavor == "" || c.ServerID == 0 {
return nil
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

fromDB, err := c.createBaseDB()
if err != nil {
return err
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), dbGetTimeout)
defer cancel()

err = c.adjustFlavor(ctx, fromDB.DB)
if err != nil {
return err
}

err = c.adjustServerID(ctx, fromDB.DB)
if err != nil {
return err
}
}

return nil
}

func (c *Config) createBaseDB() (*conn.BaseDB, error) {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return nil, err
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(dbReadTimeout)
fromDB, err := conn.DefaultDBProvider.Apply(from)
if err != nil {
return nil, terror.WithScope(err, terror.ScopeUpstream)
}

return fromDB, nil
}

// adjustFlavor adjusts flavor through querying from given database
func (c *Config) adjustFlavor() error {
func (c *Config) adjustFlavor(ctx context.Context, db *sql.DB) (err error) {
if c.Flavor != "" {
switch c.Flavor {
case mysql.MariaDBFlavor, mysql.MySQLFlavor:
Expand All @@ -266,26 +316,43 @@ func (c *Config) adjustFlavor() error {
return terror.ErrNotSupportedFlavor.Generate(c.Flavor)
}
}
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return err

c.Flavor, err = utils.GetFlavor(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", dbGetTimeout)
}
return terror.WithScope(err, terror.ScopeUpstream)
}

func (c *Config) adjustServerID(ctx context.Context, db *sql.DB) error {
if c.ServerID != 0 {
if c.ServerID < 0 || c.ServerID > maxServerID {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.ServerID's type is uint32, which means it will never overflows math.MaxUint32 or less than 0. IMO, this condition will never be true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I will remove this judgement

return terror.ErrInvalidServerID.Generate(c.ServerID)
}

return nil
}

serverIDs, err := getAllServerIDFunc(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", dbGetTimeout)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(flavorReadTimeout)
fromDB, err := applyNewBaseDB(from)
if err != nil {
return terror.WithScope(err, terror.ScopeUpstream)
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), flavorGetTimeout)
defer cancel()
c.Flavor, err = utils.GetFlavor(ctx, fromDB.DB)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", flavorGetTimeout)
for i := 0; i < 5; i++ {
randomValue := rand.Intn(100000)
randomServerID := maxServerID/10 + randomValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sames that the only use of maxServerID is maxServerID/10. How about set maxServerID directly to math.MaxUint32/10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update 354a2fc

Copy link
Contributor

@lichunzhu lichunzhu Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about storing math.MaxUint32/10 into a variable so that we don't need to compute it every time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 9b89300

if _, ok := serverIDs[int64(randomServerID)]; ok {
continue
}

c.ServerID = randomServerID
return nil
}
return terror.WithScope(err, terror.ScopeUpstream)

return terror.ErrInvalidServerID.Generatef("can't find a random available server ID")
}

// UpdateConfigFile write configure to local file
Expand Down
43 changes: 31 additions & 12 deletions dm/worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
package worker

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"path"
"strings"

"github.com/DATA-DOG/go-sqlmock"
sqlmock "github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
)

func (t *testServer) TestConfig(c *C) {
Expand Down Expand Up @@ -172,10 +173,8 @@ func subtestFlavor(c *C, cfg *Config, sqlInfo, expectedFlavor, expectedError str
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", sqlInfo))
mock.ExpectClose()
applyNewBaseDB = func(config config.DBConfig) (*conn.BaseDB, error) {
return &conn.BaseDB{DB: db}, nil
}
err = cfg.adjustFlavor()

err = cfg.adjustFlavor(context.Background(), db)
if expectedError == "" {
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, expectedFlavor)
Expand All @@ -189,18 +188,38 @@ func (t *testServer) TestAdjustFlavor(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.Flavor = "mariadb"
err := cfg.adjustFlavor()
err := cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor)
cfg.Flavor = "MongoDB"
err = cfg.adjustFlavor()
err = cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, ErrorMatches, ".*flavor MongoDB not supported")

var origApplyNewBaseDB = applyNewBaseDB
subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
}

func (t *testServer) TestAdjustServerID(c *C) {
var originGetAllServerIDFunc = getAllServerIDFunc
defer func() {
applyNewBaseDB = origApplyNewBaseDB
getAllServerIDFunc = originGetAllServerIDFunc
}()
getAllServerIDFunc = getMockServerIDs

subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Equals, 101)

cfg.ServerID = 0
cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Not(Equals), 0)
}

func getMockServerIDs(ctx context.Context, db *sql.DB) (map[int64]interface{}, error) {
return map[int64]interface{}{
1: struct{}{},
2: struct{}{},
}, nil
}
77 changes: 76 additions & 1 deletion pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,81 @@ func GetFlavor(ctx context.Context, db *sql.DB) (string, error) {
return gmysql.MySQLFlavor, nil
}

// GetAllServerID gets all slave server id and master server id
func GetAllServerID(ctx context.Context, db *sql.DB) (map[int64]interface{}, error) {
rows, err := db.QueryContext(ctx, `SHOW SLAVE HOSTS`)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer rows.Close()

rowColumns, err := rows.Columns()
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

/*
in MySQL:
mysql> SHOW SLAVE HOSTS;
+------------+-----------+------+-----------+--------------------------------------+
| Server_id | Host | Port | Master_id | Slave_UUID |
+------------+-----------+------+-----------+--------------------------------------+
| 192168010 | iconnect2 | 3306 | 192168011 | 14cb6624-7f93-11e0-b2c0-c80aa9429562 |
| 1921680101 | athena | 3306 | 192168011 | 07af4990-f41f-11df-a566-7ac56fdaf645 |
+------------+-----------+------+-----------+--------------------------------------+

in MariaDB:
mysql> SHOW SLAVE HOSTS;
+------------+-----------+------+-----------+
| Server_id | Host | Port | Master_id |
+------------+-----------+------+-----------+
| 192168010 | iconnect2 | 3306 | 192168011 |
| 1921680101 | athena | 3306 | 192168011 |
+------------+-----------+------+-----------+
*/

var (
serverID sql.NullInt64
host sql.NullString
port sql.NullInt64
masterID sql.NullInt64
slaveUUID sql.NullString
)
serverIDs := make(map[int64]interface{})
for rows.Next() {
if len(rowColumns) == 5 {
err = rows.Scan(&serverID, &host, &port, &masterID, &slaveUUID)
} else {
err = rows.Scan(&serverID, &host, &port, &masterID)
}
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

if serverID.Valid {
serverIDs[serverID.Int64] = struct{}{}
} else {
// should never happened
log.L().Warn("get invalid server_id when execute `SHOW SLAVE HOSTS;`")
continue
}

if masterID.Valid {
serverIDs[masterID.Int64] = struct{}{}
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
} else {
// should never happened
log.L().Warn("get invalid master_id when execute `SHOW SLAVE HOSTS;`")
continue
}
}

if rows.Err() != nil {
return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
}

return serverIDs, nil
}

// GetMasterStatus gets status from master
func GetMasterStatus(db *sql.DB, flavor string) (gmysql.Position, gtid.Set, error) {
var (
Expand All @@ -61,7 +136,7 @@ func GetMasterStatus(db *sql.DB, flavor string) (gmysql.Position, gtid.Set, erro

rows, err := db.Query(`SHOW MASTER STATUS`)
if err != nil {
return binlogPos, gs, err
return binlogPos, gs, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer rows.Close()

Expand Down
Loading