Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: use random value as server-id if server-id is not set #329

Merged
merged 32 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3f9b12a
use random server-id if not set
WangXiangUSTC Oct 21, 2019
6b1ee0a
remove server-id in integration test
WangXiangUSTC Oct 21, 2019
177dbdf
add unit test for get slave hosts
WangXiangUSTC Oct 21, 2019
ce20637
add unit test for config
WangXiangUSTC Oct 21, 2019
8b8453c
remove useless code
WangXiangUSTC Oct 21, 2019
2474fe0
update variable name
WangXiangUSTC Oct 21, 2019
2a1e83c
Merge branch 'master' into xiang/server_id
csuzhangxc Oct 22, 2019
394d4c2
address comment
WangXiangUSTC Oct 22, 2019
d14bd4c
minor update
WangXiangUSTC Oct 22, 2019
84efa8c
fix function name
WangXiangUSTC Oct 22, 2019
b389461
Update dm/worker/config.go
WangXiangUSTC Oct 22, 2019
4830291
address comment
WangXiangUSTC Oct 22, 2019
38f6c59
Merge branch 'xiang/server_id' of https://github.com/pingcap/dm into …
WangXiangUSTC Oct 22, 2019
457bbbf
change server id to uint32
WangXiangUSTC Oct 22, 2019
ec9bb07
fix
WangXiangUSTC Oct 22, 2019
3fb9a44
update comment
WangXiangUSTC Oct 22, 2019
e668f68
address comnemnt
WangXiangUSTC Oct 22, 2019
0c4b592
fix test
WangXiangUSTC Oct 22, 2019
7d0f754
format code
WangXiangUSTC Oct 22, 2019
3bc5c5f
fix test
WangXiangUSTC Oct 22, 2019
3aedf77
Merge branch 'master' into xiang/server_id
WangXiangUSTC Oct 23, 2019
0a4ce74
fix test
WangXiangUSTC Oct 23, 2019
aa2b214
Merge branch 'xiang/server_id' of https://github.com/pingcap/dm into …
WangXiangUSTC Oct 23, 2019
02a6fcb
resolve conflit
WangXiangUSTC Oct 23, 2019
6ca9c90
address comment
WangXiangUSTC Oct 23, 2019
898754f
add rand seed
WangXiangUSTC Oct 23, 2019
69ca67e
fix unit for heartbeat
WangXiangUSTC Oct 23, 2019
7b923f4
fix heartbeat
WangXiangUSTC Oct 23, 2019
5bbc661
fix
WangXiangUSTC Oct 23, 2019
354a2fc
address comment
WangXiangUSTC Oct 25, 2019
b1a8fbd
address comment
WangXiangUSTC Oct 25, 2019
9b89300
address comemnt
WangXiangUSTC Oct 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package main
import (
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
Expand All @@ -30,6 +32,8 @@ import (
)

func main() {
rand.Seed(time.Now().UnixNano())

cfg := worker.NewConfig()
err := cfg.Parse(os.Args[1:])
switch errors.Cause(err) {
Expand Down
2 changes: 1 addition & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type SubTaskConfig struct {
IgnoreCheckingItems []string `toml:"ignore-checking-items" json:"ignore-checking-items"`
// it represents a MySQL/MariaDB instance or a replica group
SourceID string `toml:"source-id" json:"source-id"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
MetaSchema string `toml:"meta-schema" json:"meta-schema"`
RemoveMeta bool `toml:"remove-meta" json:"remove-meta"`
Expand Down
111 changes: 87 additions & 24 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package worker
import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math"
"math/rand"
"strings"
"time"

Expand All @@ -39,18 +42,23 @@ import (
)

const (
// flavorReadTimeout is readTimeout for DB connection in adjustFlavor
flavorReadTimeout = "30s"
// flavorGetTimeout is timeout for getting version info from DB
flavorGetTimeout = 30 * time.Second
// dbReadTimeout is readTimeout for DB connection in adjust
dbReadTimeout = "30s"
// dbGetTimeout is timeout for getting some information from DB
dbGetTimeout = 30 * time.Second

// the default base(min) server id generated by random
defaultBaseServerID = math.MaxUint32 / 10
)

// SampleConfigFile is sample config file of dm-worker
// later we can read it from dm/worker/dm-worker.toml
// and assign it to SampleConfigFile while we build dm-worker
var SampleConfigFile string

var applyNewBaseDB = conn.DefaultDBProvider.Apply
var (
getAllServerIDFunc = utils.GetAllServerID
)

// NewConfig creates a new base config for worker.
func NewConfig() *Config {
Expand Down Expand Up @@ -94,7 +102,7 @@ type Config struct {
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
MetaDir string `toml:"meta-dir" json:"meta-dir"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`

Expand Down Expand Up @@ -199,9 +207,7 @@ func (c *Config) Parse(arguments []string) error {
// assign tracer id to source id
c.Tracer.Source = c.SourceID

c.From.Adjust()
c.Checker.adjust()
err = c.adjustFlavor()
err = c.adjust()
if err != nil {
return err
}
Expand Down Expand Up @@ -256,8 +262,52 @@ func (c *Config) configFromFile(path string) error {
return c.verify()
}

func (c *Config) adjust() error {
c.From.Adjust()
c.Checker.adjust()

if c.Flavor == "" || c.ServerID == 0 {
fromDB, err := c.createFromDB()
if err != nil {
return err
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), dbGetTimeout)
defer cancel()

err = c.adjustFlavor(ctx, fromDB.DB)
if err != nil {
return err
}

err = c.adjustServerID(ctx, fromDB.DB)
if err != nil {
return err
}
}

return nil
}

func (c *Config) createFromDB() (*conn.BaseDB, error) {
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return nil, err
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(dbReadTimeout)
fromDB, err := conn.DefaultDBProvider.Apply(from)
if err != nil {
return nil, terror.WithScope(err, terror.ScopeUpstream)
}

return fromDB, nil
}

// adjustFlavor adjusts flavor through querying from given database
func (c *Config) adjustFlavor() error {
func (c *Config) adjustFlavor(ctx context.Context, db *sql.DB) (err error) {
if c.Flavor != "" {
switch c.Flavor {
case mysql.MariaDBFlavor, mysql.MySQLFlavor:
Expand All @@ -266,26 +316,39 @@ func (c *Config) adjustFlavor() error {
return terror.ErrNotSupportedFlavor.Generate(c.Flavor)
}
}
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return err

c.Flavor, err = utils.GetFlavor(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", dbGetTimeout)
}
return terror.WithScope(err, terror.ScopeUpstream)
}

func (c *Config) adjustServerID(ctx context.Context, db *sql.DB) error {
if c.ServerID != 0 {
return nil
}

serverIDs, err := getAllServerIDFunc(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get server-id info exceeds %s", dbGetTimeout)
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(flavorReadTimeout)
fromDB, err := applyNewBaseDB(from)
if err != nil {
return terror.WithScope(err, terror.ScopeUpstream)
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), flavorGetTimeout)
defer cancel()
c.Flavor, err = utils.GetFlavor(ctx, fromDB.DB)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", flavorGetTimeout)
for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
randomServerID := defaultBaseServerID + randomValue
if _, ok := serverIDs[randomServerID]; ok {
continue
}

c.ServerID = randomServerID
return nil
}
return terror.WithScope(err, terror.ScopeUpstream)

return terror.ErrInvalidServerID.Generatef("can't find a random available server ID")
}

// UpdateConfigFile write configure to local file
Expand Down
51 changes: 35 additions & 16 deletions dm/worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@
package worker

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"path"
"strings"

"github.com/DATA-DOG/go-sqlmock"
sqlmock "github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
)

func (t *testServer) TestConfig(c *C) {
cfg := NewConfig()

c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)
c.Assert(cfg.RelayDir, Equals, "./xx")
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

dir := c.MkDir()
cfg.ConfigFile = path.Join(dir, "dm-worker.toml")
Expand All @@ -41,7 +42,7 @@ func (t *testServer) TestConfig(c *C) {
clone1 := cfg.Clone()
c.Assert(cfg, DeepEquals, clone1)
clone1.ServerID = 100
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

// test format
c.Assert(cfg.String(), Matches, `.*"server-id":101.*`)
Expand All @@ -55,10 +56,10 @@ func (t *testServer) TestConfig(c *C) {
// test update config file and reload
c.Assert(cfg.UpdateConfigFile(tomlStr), IsNil)
c.Assert(cfg.Reload(), IsNil)
c.Assert(cfg.ServerID, Equals, 100)
c.Assert(cfg.ServerID, Equals, uint32(100))
c.Assert(cfg.UpdateConfigFile(originCfgStr), IsNil)
c.Assert(cfg.Reload(), IsNil)
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

// test decrypt password
clone1.From.Password = "1234"
Expand Down Expand Up @@ -170,10 +171,8 @@ func subtestFlavor(c *C, cfg *Config, sqlInfo, expectedFlavor, expectedError str
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", sqlInfo))
mock.ExpectClose()
applyNewBaseDB = func(config config.DBConfig) (*conn.BaseDB, error) {
return &conn.BaseDB{DB: db}, nil
}
err = cfg.adjustFlavor()

err = cfg.adjustFlavor(context.Background(), db)
if expectedError == "" {
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, expectedFlavor)
Expand All @@ -187,18 +186,38 @@ func (t *testServer) TestAdjustFlavor(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.Flavor = "mariadb"
err := cfg.adjustFlavor()
err := cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor)
cfg.Flavor = "MongoDB"
err = cfg.adjustFlavor()
err = cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, ErrorMatches, ".*flavor MongoDB not supported")

var origApplyNewBaseDB = applyNewBaseDB
subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
}

func (t *testServer) TestAdjustServerID(c *C) {
var originGetAllServerIDFunc = getAllServerIDFunc
defer func() {
applyNewBaseDB = origApplyNewBaseDB
getAllServerIDFunc = originGetAllServerIDFunc
}()
getAllServerIDFunc = getMockServerIDs

subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Equals, uint32(101))

cfg.ServerID = 0
cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Not(Equals), 0)
}

func getMockServerIDs(ctx context.Context, db *sql.DB) (map[uint32]struct{}, error) {
return map[uint32]struct{}{
1: {},
2: {},
}, nil
}
Loading