Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: add start-relay/stop-relay command (#1515)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Apr 6, 2021
1 parent b6346dc commit dda908d
Show file tree
Hide file tree
Showing 51 changed files with 1,732 additions and 327 deletions.
8 changes: 6 additions & 2 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ ErrSchedulerWorkerInvalidTrans,[code=46006:class=scheduler:scope=internal:level=
ErrSchedulerSourceCfgExist,[code=46007:class=scheduler:scope=internal:level=medium], "Message: source config with ID %s already exists"
ErrSchedulerSourceCfgNotExist,[code=46008:class=scheduler:scope=internal:level=medium], "Message: source config with ID %s not exists"
ErrSchedulerSourcesUnbound,[code=46009:class=dm-master:scope=internal:level=medium], "Message: sources %v have not bound"
ErrSchedulerSourceOpTaskExist,[code=46010:class=dm-master:scope=internal:level=medium], "Message: source with name %s need to operate with tasks %v exist"
ErrSchedulerSourceOpTaskExist,[code=46010:class=dm-master:scope=internal:level=medium], "Message: source with name %s need to operate has existing tasks %v, Workaround: Please `stop-task` first."
ErrSchedulerRelayStageInvalidUpdate,[code=46011:class=scheduler:scope=internal:level=medium], "Message: invalid new expectant relay stage %s"
ErrSchedulerRelayStageSourceNotExist,[code=46012:class=scheduler:scope=internal:level=medium], "Message: sources %v need to update expectant relay stage not exist"
ErrSchedulerMultiTask,[code=46013:class=scheduler:scope=internal:level=medium], "Message: the scheduler cannot perform multiple different tasks %v in one operation"
Expand All @@ -493,7 +493,11 @@ ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal
ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium], "Message: subtasks with name %s need to be operate not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist"
ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerRequireNotRunning,[code=46019:class=scheduler:scope=internal:level=high], "Message: tasks %v on source %s should not be running, Workaround: Please use `pause-task [-s source ...] task` to pause them first"
ErrSchedulerRequireNotRunning,[code=46019:class=scheduler:scope=internal:level=high], "Message: tasks %v on source %s should not be running, Workaround: Please use `pause-task [-s source ...] task` to pause them first."
ErrSchedulerRelayWorkersBusy,[code=46020:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for sources %s respectively, Workaround: Please use `stop-relay` to stop them, or change your topology."
ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:level=high], "Message: these workers %s have bound for another sources %s respectively, Workaround: Please `start-relay` on free or same source workers."
ErrSchedulerRelayWorkersWrongRelay,[code=46022:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for another sources %s respectively, Workaround: Please correct sources in `stop-relay`."
ErrSchedulerSourceOpRelayExist,[code=46023:class=scheduler:scope=internal:level=high], "Message: source with name %s need to operate has existing relay workers %s, Workaround: Please `stop-relay` first."
ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection."
ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line."
ErrNotSet,[code=50000:class=not-set:scope=not-set:level=high]
20 changes: 9 additions & 11 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,15 @@ func (c *SourceConfig) Verify() error {
}

var err error
if c.EnableRelay {
if len(c.RelayBinLogName) > 0 {
if !binlog.VerifyFilename(c.RelayBinLogName) {
return terror.ErrWorkerRelayBinlogName.Generate(c.RelayBinLogName)
}
if len(c.RelayBinLogName) > 0 {
if !binlog.VerifyFilename(c.RelayBinLogName) {
return terror.ErrWorkerRelayBinlogName.Generate(c.RelayBinLogName)
}
if len(c.RelayBinlogGTID) > 0 {
_, err = gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID)
if err != nil {
return terror.WithClass(terror.Annotatef(err, "relay-binlog-gtid %s", c.RelayBinlogGTID), terror.ClassDMWorker)
}
}
if len(c.RelayBinlogGTID) > 0 {
_, err = gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID)
if err != nil {
return terror.WithClass(terror.Annotatef(err, "relay-binlog-gtid %s", c.RelayBinlogGTID), terror.ClassDMWorker)
}
}

Expand Down Expand Up @@ -254,7 +252,7 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
}
}

if c.EnableRelay && len(c.RelayDir) == 0 {
if len(c.RelayDir) == 0 {
c.RelayDir = defaultRelayDir
}

Expand Down
3 changes: 2 additions & 1 deletion dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ func (t *testConfig) TestConfigVerify(c *C) {
".*not valid.*",
},
{
// after support `start-relay`, we always check Relay related config
func() *SourceConfig {
cfg := newConfig()
cfg.RelayBinLogName = "mysql-binlog"
return cfg
},
"",
".*not valid.*",
},
{
func() *SourceConfig {
Expand Down
2 changes: 2 additions & 0 deletions dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func NewRootCmd() *cobra.Command {
master.NewGetCfgCmd(),
master.NewHandleErrorCmd(),
master.NewTransferSourceCmd(),
master.NewStartRelayCmd(),
master.NewStopRelayCmd(),
)
// copied from (*cobra.Command).InitDefaultHelpCmd
helpCmd := &cobra.Command{
Expand Down
84 changes: 84 additions & 0 deletions dm/ctl/master/start_relay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"context"
"errors"
"os"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"

"github.com/spf13/cobra"
)

// NewStartRelayCmd creates a StartRelay command
func NewStartRelayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "start-relay <-s source-id> <worker-name> [...worker-name]",
Short: "Starts workers pulling relay log for a source.",
RunE: startRelayFunc,
}
return cmd
}

func startRelayFunc(cmd *cobra.Command, _ []string) (err error) {
sources, err := common.GetSourceArgs(cmd)
if err != nil {
return err
}

if len(cmd.Flags().Args()) == 0 {
cmd.SetOut(os.Stdout)
if len(sources) == 0 {
// all args empty
common.PrintCmdUsage(cmd)
} else {
common.PrintLines("must specify at least one worker")
}
err = errors.New("please check output to see error")
return
}

if len(sources) != 1 {
common.PrintLines("must specify one source (`-s` / `--source`)")
err = errors.New("please check output to see error")
return
}

workers := cmd.Flags().Args()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp := &pb.OperateRelayResponse{}
err = common.SendRequest(
ctx,
"OperateRelay",
&pb.OperateRelayRequest{
Op: pb.RelayOpV2_StartRelayV2,
Source: sources[0],
Worker: workers,
},
&resp,
)

if err != nil {
return
}

common.PrettyPrintResponse(resp)
return
}
84 changes: 84 additions & 0 deletions dm/ctl/master/stop_relay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"context"
"errors"
"os"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"

"github.com/spf13/cobra"
)

// NewStopRelayCmd creates a StartRelay command
func NewStopRelayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "stop-relay <-s source-id> <worker-name> [...worker-name]",
Short: "Stops workers pulling relay log for a source.",
RunE: stopRelayFunc,
}
return cmd
}

func stopRelayFunc(cmd *cobra.Command, _ []string) (err error) {
sources, err := common.GetSourceArgs(cmd)
if err != nil {
return err
}

if len(cmd.Flags().Args()) == 0 {
cmd.SetOut(os.Stdout)
if len(sources) == 0 {
// all args empty
common.PrintCmdUsage(cmd)
} else {
common.PrintLines("must specify at least one worker")
}
err = errors.New("please check output to see error")
return
}

if len(sources) != 1 {
common.PrintLines("must specify one source (`-s` / `--source`)")
err = errors.New("please check output to see error")
return
}

workers := cmd.Flags().Args()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp := &pb.OperateRelayResponse{}
err = common.SendRequest(
ctx,
"OperateRelay",
&pb.OperateRelayRequest{
Op: pb.RelayOpV2_StopRelayV2,
Source: sources[0],
Worker: workers,
},
&resp,
)

if err != nil {
return
}

common.PrettyPrintResponse(resp)
return
}
1 change: 1 addition & 0 deletions dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
cfg1.From.Port = port
cfg1.From.User = user
cfg1.From.Password = password
cfg1.RelayDir = "relay-dir"
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"

Expand Down
Loading

0 comments on commit dda908d

Please sign in to comment.