Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: use random value as server-id if server-id is not set (#329) #337

Merged
merged 1 commit into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package main
import (
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
Expand All @@ -30,6 +32,8 @@ import (
)

func main() {
rand.Seed(time.Now().UnixNano())

cfg := worker.NewConfig()
err := cfg.Parse(os.Args[1:])
switch errors.Cause(err) {
Expand Down
2 changes: 1 addition & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type SubTaskConfig struct {
IgnoreCheckingItems []string `toml:"ignore-checking-items" json:"ignore-checking-items"`
// it represents a MySQL/MariaDB instance or a replica group
SourceID string `toml:"source-id" json:"source-id"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
MetaSchema string `toml:"meta-schema" json:"meta-schema"`
RemoveMeta bool `toml:"remove-meta" json:"remove-meta"`
Expand Down
111 changes: 87 additions & 24 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package worker
import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math"
"math/rand"
"strings"
"time"

Expand All @@ -39,18 +42,23 @@ import (
)

const (
// flavorReadTimeout is readTimeout for DB connection in adjustFlavor
flavorReadTimeout = "30s"
// flavorGetTimeout is timeout for getting version info from DB
flavorGetTimeout = 30 * time.Second
// dbReadTimeout is readTimeout for DB connection in adjust
dbReadTimeout = "30s"
// dbGetTimeout is timeout for getting some information from DB
dbGetTimeout = 30 * time.Second

// the default base(min) server id generated by random
defaultBaseServerID = math.MaxUint32 / 10
)

// SampleConfigFile is sample config file of dm-worker
// later we can read it from dm/worker/dm-worker.toml
// and assign it to SampleConfigFile while we build dm-worker
var SampleConfigFile string

var applyNewBaseDB = conn.DefaultDBProvider.Apply
var (
getAllServerIDFunc = utils.GetAllServerID
)

// NewConfig creates a new base config for worker.
func NewConfig() *Config {
Expand Down Expand Up @@ -94,7 +102,7 @@ type Config struct {
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
MetaDir string `toml:"meta-dir" json:"meta-dir"`
ServerID int `toml:"server-id" json:"server-id"`
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`

Expand Down Expand Up @@ -199,9 +207,7 @@ func (c *Config) Parse(arguments []string) error {
// assign tracer id to source id
c.Tracer.Source = c.SourceID

c.From.Adjust()
c.Checker.adjust()
err = c.adjustFlavor()
err = c.adjust()
if err != nil {
return err
}
Expand Down Expand Up @@ -256,8 +262,52 @@ func (c *Config) configFromFile(path string) error {
return c.verify()
}

func (c *Config) adjust() error {
c.From.Adjust()
c.Checker.adjust()

if c.Flavor == "" || c.ServerID == 0 {
fromDB, err := c.createFromDB()
if err != nil {
return err
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), dbGetTimeout)
defer cancel()

err = c.adjustFlavor(ctx, fromDB.DB)
if err != nil {
return err
}

err = c.adjustServerID(ctx, fromDB.DB)
if err != nil {
return err
}
}

return nil
}

func (c *Config) createFromDB() (*conn.BaseDB, error) {
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return nil, err
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(dbReadTimeout)
fromDB, err := conn.DefaultDBProvider.Apply(from)
if err != nil {
return nil, terror.WithScope(err, terror.ScopeUpstream)
}

return fromDB, nil
}

// adjustFlavor adjusts flavor through querying from given database
func (c *Config) adjustFlavor() error {
func (c *Config) adjustFlavor(ctx context.Context, db *sql.DB) (err error) {
if c.Flavor != "" {
switch c.Flavor {
case mysql.MariaDBFlavor, mysql.MySQLFlavor:
Expand All @@ -266,26 +316,39 @@ func (c *Config) adjustFlavor() error {
return terror.ErrNotSupportedFlavor.Generate(c.Flavor)
}
}
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
return err

c.Flavor, err = utils.GetFlavor(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", dbGetTimeout)
}
return terror.WithScope(err, terror.ScopeUpstream)
}

func (c *Config) adjustServerID(ctx context.Context, db *sql.DB) error {
if c.ServerID != 0 {
return nil
}

serverIDs, err := getAllServerIDFunc(ctx, db)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get server-id info exceeds %s", dbGetTimeout)
}
from := clone.From
from.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(flavorReadTimeout)
fromDB, err := applyNewBaseDB(from)
if err != nil {
return terror.WithScope(err, terror.ScopeUpstream)
}
defer fromDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), flavorGetTimeout)
defer cancel()
c.Flavor, err = utils.GetFlavor(ctx, fromDB.DB)
if ctx.Err() != nil {
err = terror.Annotatef(err, "time cost to get flavor info exceeds %s", flavorGetTimeout)
for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
if _, ok := serverIDs[randomServerID]; ok {
continue
}

c.ServerID = randomServerID
return nil
}
return terror.WithScope(err, terror.ScopeUpstream)

return terror.ErrInvalidServerID.Generatef("can't find a random available server ID")
}

// UpdateConfigFile write configure to local file
Expand Down
51 changes: 35 additions & 16 deletions dm/worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@
package worker

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"path"
"strings"

"github.com/DATA-DOG/go-sqlmock"
sqlmock "github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
)

func (t *testServer) TestConfig(c *C) {
cfg := NewConfig()

c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)
c.Assert(cfg.RelayDir, Equals, "./xx")
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

dir := c.MkDir()
cfg.ConfigFile = path.Join(dir, "dm-worker.toml")
Expand All @@ -41,7 +42,7 @@ func (t *testServer) TestConfig(c *C) {
clone1 := cfg.Clone()
c.Assert(cfg, DeepEquals, clone1)
clone1.ServerID = 100
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

// test format
c.Assert(cfg.String(), Matches, `.*"server-id":101.*`)
Expand All @@ -55,10 +56,10 @@ func (t *testServer) TestConfig(c *C) {
// test update config file and reload
c.Assert(cfg.UpdateConfigFile(tomlStr), IsNil)
c.Assert(cfg.Reload(), IsNil)
c.Assert(cfg.ServerID, Equals, 100)
c.Assert(cfg.ServerID, Equals, uint32(100))
c.Assert(cfg.UpdateConfigFile(originCfgStr), IsNil)
c.Assert(cfg.Reload(), IsNil)
c.Assert(cfg.ServerID, Equals, 101)
c.Assert(cfg.ServerID, Equals, uint32(101))

// test decrypt password
clone1.From.Password = "1234"
Expand Down Expand Up @@ -170,10 +171,8 @@ func subtestFlavor(c *C, cfg *Config, sqlInfo, expectedFlavor, expectedError str
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", sqlInfo))
mock.ExpectClose()
applyNewBaseDB = func(config config.DBConfig) (*conn.BaseDB, error) {
return &conn.BaseDB{DB: db}, nil
}
err = cfg.adjustFlavor()

err = cfg.adjustFlavor(context.Background(), db)
if expectedError == "" {
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, expectedFlavor)
Expand All @@ -187,18 +186,38 @@ func (t *testServer) TestAdjustFlavor(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.Flavor = "mariadb"
err := cfg.adjustFlavor()
err := cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor)
cfg.Flavor = "MongoDB"
err = cfg.adjustFlavor()
err = cfg.adjustFlavor(context.Background(), nil)
c.Assert(err, ErrorMatches, ".*flavor MongoDB not supported")

var origApplyNewBaseDB = applyNewBaseDB
subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
}

func (t *testServer) TestAdjustServerID(c *C) {
var originGetAllServerIDFunc = getAllServerIDFunc
defer func() {
applyNewBaseDB = origApplyNewBaseDB
getAllServerIDFunc = originGetAllServerIDFunc
}()
getAllServerIDFunc = getMockServerIDs

subtestFlavor(c, cfg, "10.4.8-MariaDB-1:10.4.8+maria~bionic", mysql.MariaDBFlavor, "")
subtestFlavor(c, cfg, "5.7.26-log", mysql.MySQLFlavor, "")
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)

cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Equals, uint32(101))

cfg.ServerID = 0
cfg.adjustServerID(context.Background(), nil)
c.Assert(cfg.ServerID, Not(Equals), 0)
}

func getMockServerIDs(ctx context.Context, db *sql.DB) (map[uint32]struct{}, error) {
return map[uint32]struct{}{
1: {},
2: {},
}, nil
}
Loading