Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into add-test-2
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Apr 2, 2021
2 parents 37703e1 + 8cd3e8a commit 556b916
Show file tree
Hide file tree
Showing 125 changed files with 2,542 additions and 581 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ vendor
tidb-slow.log
/monitoring/dashboards/dm.json
/monitoring/rules/dm_worker.rules.yml
mysql.*.log
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ RUN GO111MODULE=on go mod download

COPY . .

RUN apk update && apk add bash

RUN make dm-worker dm-master dmctl

FROM alpine:3.10
Expand Down
4 changes: 3 additions & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,13 @@ ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:l
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high], "Message: process unit not waiting for sharding DDL to sync"
ErrSyncerUnitGenBAList,[code=36060:class=sync-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high], "Message: fail to handle ddl job for %s"
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high], "Message: fail to handle shard ddl %v in optimistic mode, because schema conflict detected, Workaround: Please use show-ddl-locks command for more details."
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high], "Message: fail to handle shard ddl %v in optimistic mode, because schema conflict detected, conflict error: %s, Workaround: Please use show-ddl-locks command for more details."
ErrSyncerFailpoint,[code=36063:class=sync-unit:scope=internal:level=low], "Message: failpoint specified error"
ErrSyncerReplaceEvent,[code=36064:class=sync-unit:scope=internal:level=high]
ErrSyncerOperatorNotExist,[code=36065:class=sync-unit:scope=internal:level=low], "Message: error operator not exist, position: %s"
ErrSyncerReplaceEventNotExist,[code=36066:class=sync-unit:scope=internal:level=high], "Message: replace event not exist, location: %s"
ErrSyncerParseDDL,[code=36067:class=sync-unit:scope=internal:level=high], "Message: parse DDL: %s, Workaround: Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed."
ErrSyncerUnsupportedStmt,[code=36068:class=sync-unit:scope=internal:level=high], "Message: `%s` statement not supported in %s mode"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium], "Message: nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium], "Message: op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium], "Message: operate request without --sharding specified not valid"
Expand Down Expand Up @@ -372,6 +373,7 @@ ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high
ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "Message: source bound is changed too frequently, last old bound %s:, new bound %s, Workaround: Please try again later"
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
12 changes: 11 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ var (
// ShardDDLOptimismInitSchemaKeyAdapter is used to store the initial schema (before constructed the lock) of merged tables.
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name) -> table schema.
ShardDDLOptimismInitSchemaKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/init-schema/")
// ShardDDLOptimismDroppedColumnsKeyAdapter is used to store the columns that are not fully dropped
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name, column-name, source-id, upstream-schema-name, upstream-table-name) -> empty
// If we don't identify different upstream tables, we may report an error for tb2 in the following case.
// Time series: (+a/-a means add/drop column a)
// older ----------------> newer
// tb1: +a +b +c -c
// tb2: +a +b +c
// tb3: +a +b +c
ShardDDLOptimismDroppedColumnsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/dropped-columns/")
)

func keyAdapterKeysLen(s KeyAdapter) int {
Expand All @@ -95,7 +104,8 @@ func keyAdapterKeysLen(s KeyAdapter) int {
return 3
case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter:
return 4

case ShardDDLOptimismDroppedColumnsKeyAdapter:
return 7
}
return -1
}
Expand Down
44 changes: 32 additions & 12 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ import (
)

const (
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultInitialClusterState = embed.ClusterStateFlagNew
defaultRPCTimeout = "30s"
defaultNamePrefix = "dm-master"
defaultDataDirPrefix = "default"
defaultPeerUrls = "http://127.0.0.1:8291"
defaultInitialClusterState = embed.ClusterStateFlagNew
defaultAutoCompactionMode = "periodic"
defaultAutoCompactionRetention = "1h"
defaultQuotaBackendBytes = 2 * 1024 * 1024 * 1024 // 2GB
quotaBackendBytesLowerBound = 500 * 1024 * 1024 // 500MB
)

var (
Expand Down Expand Up @@ -75,6 +79,9 @@ func NewConfig() *Config {
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${master-addr}" list, e.g. "127.0.0.1:8261,127.0.0.1:18261"`)
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", defaultAutoCompactionMode, `etcd's auto-compaction-mode, either 'periodic' or 'revision'`)
fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", defaultAutoCompactionRetention, `etcd's auto-compaction-retention, accept values like '5h' or '5' (5 hours in 'periodic' mode or 5 revisions in 'revision')`)
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", defaultQuotaBackendBytes, `etcd's storage quota in bytes`)

fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection")
fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection")
Expand Down Expand Up @@ -108,13 +115,16 @@ type Config struct {
// etcd relative config items
// NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls`
// NOTE: more items will be add when adding leader election
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
PeerUrls string `toml:"peer-urls" json:"peer-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address
AutoCompactionMode string `toml:"auto-compaction-mode" json:"auto-compaction-mode"`
AutoCompactionRetention string `toml:"auto-compaction-retention" json:"auto-compaction-retention"`
QuotaBackendBytes int64 `toml:"quota-backend-bytes" json:"quota-backend-bytes"`

// directory path used to store source config files when upgrading from v1.0.x.
// if this path set, DM-master leader will try to upgrade from v1.0.x to the current version.
Expand Down Expand Up @@ -301,6 +311,13 @@ func (c *Config) adjust() error {
c.Join = utils.WrapSchemes(c.Join, c.SSLCA != "")
}

if c.QuotaBackendBytes < quotaBackendBytesLowerBound {
log.L().Warn("quota-backend-bytes is too low, will adjust it",
zap.Int64("from", c.QuotaBackendBytes),
zap.Int64("to", quotaBackendBytesLowerBound))
c.QuotaBackendBytes = quotaBackendBytesLowerBound
}

return err
}

Expand Down Expand Up @@ -345,6 +362,9 @@ func (c *Config) genEmbedEtcdConfig(cfg *embed.Config) (*embed.Config, error) {

cfg.InitialCluster = c.InitialCluster
cfg.ClusterState = c.InitialClusterState
cfg.AutoCompactionMode = c.AutoCompactionMode
cfg.AutoCompactionRetention = c.AutoCompactionRetention
cfg.QuotaBackendBytes = c.QuotaBackendBytes

err = cfg.Validate() // verify & trigger the builder
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) {
c.Assert(etcdCfg.APUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}})
c.Assert(etcdCfg.InitialCluster, check.DeepEquals, fmt.Sprintf("dm-master-%s=http://127.0.0.1:8291", hostname))
c.Assert(etcdCfg.ClusterState, check.Equals, embed.ClusterStateFlagExisting)
c.Assert(etcdCfg.AutoCompactionMode, check.Equals, "periodic")
c.Assert(etcdCfg.AutoCompactionRetention, check.Equals, "1h")
c.Assert(etcdCfg.QuotaBackendBytes, check.Equals, int64(2*1024*1024*1024))

cfg2 := *cfg1
cfg2.MasterAddr = "127.0.0.1\n:8261"
Expand Down
49 changes: 38 additions & 11 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type Scheduler struct {

// expectant relay stages for sources, source ID -> stage.
// add:
// - bound the source to a worker (at first time).
// - bound the source to a worker (at first time). // TODO: change this to add a relay-enabled source
// - recover from etcd (calling `recoverSources`).
// update:
// - update stage by user request (calling `UpdateExpectRelayStage`).
Expand Down Expand Up @@ -386,7 +386,7 @@ func (s *Scheduler) TransferSource(source, worker string) error {
s.logger.Warn("in transfer source, found a free worker and not bound source, which should not happened",
zap.String("source", source),
zap.String("worker", worker))
err := s.boundSourceToWorker(source, w)
err := s.boundSourceToWorker(source, w, s.sourceCfgs[source].EnableRelay)
if err == nil {
delete(s.unbounds, source)
}
Expand All @@ -412,7 +412,8 @@ func (s *Scheduler) TransferSource(source, worker string) error {
failpoint.Inject("failToReplaceSourceBound", func(_ failpoint.Value) {
failpoint.Return(errors.New("failToPutSourceBound"))
})
_, err := ha.ReplaceSourceBound(s.etcdCli, source, oldWorker.BaseInfo().Name, worker)
enableRelay := s.sourceCfgs[source].EnableRelay
_, err := ha.ReplaceSourceBound(s.etcdCli, source, oldWorker.BaseInfo().Name, worker, enableRelay)
if err != nil {
return err
}
Expand Down Expand Up @@ -779,10 +780,16 @@ func (s *Scheduler) UpdateExpectRelayStage(newStage pb.Stage, sources ...string)
stages = make([]ha.Stage, 0, len(sources))
)
for _, source := range sources {
if currStage, ok := s.expectRelayStages[source]; !ok {
if _, ok := s.sourceCfgs[source]; !ok {
notExistSourcesM[source] = struct{}{}
} else {
continue
}

if currStage, ok := s.expectRelayStages[source]; ok {
currStagesM[currStage.Expect.String()] = struct{}{}
} else {
s.logger.Warn("will write relay stage for a source that doesn't have previous stage",
zap.String("source", source))
}
stages = append(stages, ha.NewRelayStage(newStage, source))
}
Expand Down Expand Up @@ -1036,6 +1043,13 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {

// 6. put trigger source bounds info to etcd to order dm-workers to start source
if len(boundsToTrigger) > 0 {
for _, bound := range boundsToTrigger {
if s.sourceCfgs[bound.Source].EnableRelay {
if _, err2 := ha.PutRelayConfig(cli, bound.Source, bound.Worker); err2 != nil {
return 0, err2
}
}
}
_, err = ha.PutSourceBound(cli, boundsToTrigger...)
if err != nil {
return 0, nil
Expand Down Expand Up @@ -1180,6 +1194,11 @@ func (s *Scheduler) handleWorkerOnline(ev ha.WorkerEvent, toLock bool) error {

// 2. check whether is bound.
if w.Stage() == WorkerBound {
if s.sourceCfgs[w.Bound().Source].EnableRelay {
if _, err := ha.PutRelayConfig(s.etcdCli, w.Bound().Source, w.Bound().Worker); err != nil {
return err
}
}
// TODO: When dm-worker keepalive is broken, it will turn off its own running source
// After keepalive is restored, this dm-worker should continue to run the previously bound source
// So we PutSourceBound here to trigger dm-worker to get this event and start source again.
Expand Down Expand Up @@ -1282,7 +1301,7 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
}()

// 3. try to bound them.
err = s.boundSourceToWorker(source, w)
err = s.boundSourceToWorker(source, w, s.sourceCfgs[source].EnableRelay)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1323,7 +1342,7 @@ func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
}

// 2. try to bound them.
err := s.boundSourceToWorker(source, worker)
err := s.boundSourceToWorker(source, worker, s.sourceCfgs[source].EnableRelay)
if err != nil {
return false, err
}
Expand All @@ -1332,23 +1351,31 @@ func (s *Scheduler) tryBoundForSource(source string) (bool, error) {

// boundSourceToWorker bounds the source and worker together.
// we should check the bound relationship of the source and the stage of the worker in the caller.
func (s *Scheduler) boundSourceToWorker(source string, w *Worker) error {
func (s *Scheduler) boundSourceToWorker(source string, w *Worker, enableRelay bool) error {
// 1. put the bound relationship into etcd.
var err error
bound := ha.NewSourceBound(source, w.BaseInfo().Name)
if _, ok := s.expectRelayStages[source]; ok {
// the relay stage exists before, only put the bound relationship.
// TODO: we also put relay config for that worker temporary
_, err = ha.PutRelayConfig(s.etcdCli, bound.Source, bound.Worker)
if err != nil {
return err
}
_, err = ha.PutSourceBound(s.etcdCli, bound)
} else {
// no relay stage exists before, create a `Runnng` stage and put it with the bound relationship.
} else if enableRelay {
// dont enable relay for it
// no relay stage exists before, create a `Running` stage and put it with the bound relationship.
stage := ha.NewRelayStage(pb.Stage_Running, source)
_, err = ha.PutRelayStageSourceBound(s.etcdCli, stage, bound)
_, err = ha.PutRelayStageRelayConfigSourceBound(s.etcdCli, stage, bound)
defer func() {
if err == nil {
// 1.1 if no error exist when returning, record the stage.
s.expectRelayStages[source] = stage
}
}()
} else {
_, err = ha.PutSourceBound(s.etcdCli, bound)
}
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
)
c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg1.EnableRelay = true
sourceCfg2 := sourceCfg1
sourceCfg2.SourceID = sourceID2

Expand Down Expand Up @@ -1052,9 +1053,9 @@ func (t *testScheduler) TestTransferSource(c *C) {
s.sourceCfgs[sourceID2] = config.SourceConfig{}

worker1.ToFree()
c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil)
c.Assert(s.boundSourceToWorker(sourceID1, worker1, false), IsNil)
worker2.ToFree()
c.Assert(s.boundSourceToWorker(sourceID2, worker2), IsNil)
c.Assert(s.boundSourceToWorker(sourceID2, worker2, false), IsNil)

c.Assert(s.bounds[sourceID1], DeepEquals, worker1)
c.Assert(s.bounds[sourceID2], DeepEquals, worker2)
Expand Down
47 changes: 44 additions & 3 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,14 +634,15 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
tiBefore = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tiAfter1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
info1 = optimism.NewInfo(taskName, sources[0], "foo-1", "bar-1", schema, table, DDLs1, tiBefore, []*model.TableInfo{tiAfter1})
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, false)
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, "", false)
)

st1.AddTable("foo-1", "bar-1", schema, table)
_, err = optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, check.IsNil)
_, err = optimism.PutInfo(etcdTestCli, info1)
c.Assert(err, check.IsNil)
_, succ, err = optimism.PutOperation(etcdTestCli, false, op1)
_, succ, err = optimism.PutOperation(etcdTestCli, false, op1, 0)
c.Assert(succ, check.IsTrue)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -1470,7 +1471,7 @@ func (t *testMaster) TestOfflineMember(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(listResp.Members, check.HasLen, 3)

// make sure s3 is not the leader, otherwise it will take some time to campain a new leader after close s3, and it may cause timeout
// make sure s3 is not the leader, otherwise it will take some time to campaign a new leader after close s3, and it may cause timeout
c.Assert(utils.WaitSomething(20, 500*time.Millisecond, func() bool {
_, leaderID, _, err = s1.election.LeaderInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -1770,3 +1771,43 @@ func createTableInfo(c *check.C, p *parser.Parser, se sessionctx.Context, tableI
}
return info
}

type testEtcd struct {
}

var _ = check.Suite(&testEtcd{})

func (t *testEtcd) TestEtcdAutoCompaction(c *check.C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil)

cfg.DataDir = c.MkDir()
cfg.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg.AutoCompactionRetention = "1s"

ctx, cancel := context.WithCancel(context.Background())
s := NewServer(cfg)
c.Assert(s.Start(ctx), check.IsNil)

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cfg.MasterAddr},
})
c.Assert(err, check.IsNil)

for i := 0; i < 100; i++ {
_, err = etcdCli.Put(ctx, "key", fmt.Sprintf("%03d", i))
c.Assert(err, check.IsNil)
}
time.Sleep(3 * time.Second)
resp, err := etcdCli.Get(ctx, "key")
c.Assert(err, check.IsNil)

utils.WaitSomething(10, time.Second, func() bool {
_, err = etcdCli.Get(ctx, "key", clientv3.WithRev(resp.Header.Revision-1))
return err != nil
})
c.Assert(err, check.ErrorMatches, ".*required revision has been compacted.*")

cancel()
s.Close()
}
Loading

0 comments on commit 556b916

Please sign in to comment.