Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into PKUKcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD committed Aug 10, 2020
2 parents 37e0cdd + 1dc9d10 commit aabdf91
Show file tree
Hide file tree
Showing 18 changed files with 111 additions and 29 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ ErrRelayTrimUUIDNotFound,[code=30040:class=relay-unit:scope=internal:level=high]
ErrRelayRemoveFileFail,[code=30041:class=relay-unit:scope=internal:level=high], "Message: remove relay log %s %s"
ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high], "Message: args (%T) %+v not valid"
ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high], "Message: previousGTIDs %s not valid"
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error"
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error, with output (may empty): %s"
ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers"
Expand Down
1 change: 1 addition & 0 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ func (t *testMaster) TestOperateSource(c *check.C) {
resp, err = s1.OperateSource(ctx, req)
c.Assert(err, check.IsNil)
c.Assert(resp.Result, check.Equals, true)
c.Logf("%s, %s", resp.Sources[0].String(), resp.Sources[1].String())
c.Assert(resp.Sources, check.DeepEquals, []*pb.CommonWorkerResponse{{
Result: true,
Msg: "source is added but there is no free worker to bound",
Expand Down
5 changes: 4 additions & 1 deletion dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo
case <-ctx.Done():
log.L().Info("worker server is closed, handleSourceBound will quit now")
return nil
case bound := <-boundCh:
case bound, ok := <-boundCh:
if !ok {
continue
}
err := s.operateSourceBound(bound)
s.setSourceStatus(bound.Source, err, true)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus,
// TODO: use different strategies based on the error detail
for _, processErr := range stStatus.Result.Errors {
if !isResumableError(processErr) {
failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) {
tsc.l.Info("error is not resumable", zap.Stringer("error", processErr))
})
return ResumeNoSense
}
}
Expand Down
5 changes: 3 additions & 2 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/pingcap/dumpling/v4/export"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
err := os.RemoveAll(m.cfg.Dir)
if err != nil {
m.logger.Error("fail to remove output directory", zap.String("directory", m.cfg.Dir), log.ShortError(err))
errs = append(errs, unit.NewProcessError(err))
errs = append(errs, unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, "fail to remove output directory: "+m.cfg.Dir)))
pr <- pb.ProcessResult{
IsCanceled: false,
Errors: errs,
Expand All @@ -103,7 +104,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {

if err != nil {
dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Inc()
errs = append(errs, unit.NewProcessError(err))
errs = append(errs, unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, "")))
}

if len(errs) == 0 {
Expand Down
18 changes: 9 additions & 9 deletions dumpling/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func parseExtraArgs(dumpCfg *export.Config, args []string) error {
fileSizeStr string
)

dumplingFlagSet.StringSliceVarP(&dumpCfg.Databases, "database", "B", nil, "Database to dump")
dumplingFlagSet.IntVarP(&dumpCfg.Threads, "threads", "t", 4, "Number of goroutines to use, default 4")
dumplingFlagSet.StringSliceVarP(&dumpCfg.Databases, "database", "B", dumpCfg.Databases, "Database to dump")
dumplingFlagSet.IntVarP(&dumpCfg.Threads, "threads", "t", dumpCfg.Threads, "Number of goroutines to use, default 4")
dumplingFlagSet.StringVarP(&fileSizeStr, "filesize", "F", "", "The approximate size of output file")
dumplingFlagSet.Uint64VarP(&dumpCfg.StatementSize, "statement-size", "S", export.UnspecifiedSize, "Attempted size of INSERT statement in bytes")
dumplingFlagSet.StringVar(&dumpCfg.Consistency, "consistency", "auto", "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
dumplingFlagSet.StringVar(&dumpCfg.Snapshot, "snapshot", "", "Snapshot position. Valid only when consistency=snapshot")
dumplingFlagSet.BoolVarP(&dumpCfg.NoViews, "no-views", "W", true, "Do not dump views")
dumplingFlagSet.Uint64VarP(&dumpCfg.Rows, "rows", "r", export.UnspecifiedSize, "Split table into chunks of this many rows, default unlimited")
dumplingFlagSet.StringVar(&dumpCfg.Where, "where", "", "Dump only selected records")
dumplingFlagSet.BoolVar(&dumpCfg.EscapeBackslash, "escape-backslash", true, "use backslash to escape quotation marks")
dumplingFlagSet.Uint64VarP(&dumpCfg.StatementSize, "statement-size", "S", dumpCfg.StatementSize, "Attempted size of INSERT statement in bytes")
dumplingFlagSet.StringVar(&dumpCfg.Consistency, "consistency", dumpCfg.Consistency, "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
dumplingFlagSet.StringVar(&dumpCfg.Snapshot, "snapshot", dumpCfg.Snapshot, "Snapshot position. Valid only when consistency=snapshot")
dumplingFlagSet.BoolVarP(&dumpCfg.NoViews, "no-views", "W", dumpCfg.NoViews, "Do not dump views")
dumplingFlagSet.Uint64VarP(&dumpCfg.Rows, "rows", "r", dumpCfg.Rows, "Split table into chunks of this many rows, default unlimited")
dumplingFlagSet.StringVar(&dumpCfg.Where, "where", dumpCfg.Where, "Dump only selected records")
dumplingFlagSet.BoolVar(&dumpCfg.EscapeBackslash, "escape-backslash", dumpCfg.EscapeBackslash, "use backslash to escape quotation marks")

err := dumplingFlagSet.Parse(args)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion dumpling/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func (m *testDumplingSuite) TestParseArgs(c *C) {
c.Assert(exportCfg.Threads, Equals, 8)
c.Assert(exportCfg.FileSize, Equals, uint64(50))

extraArgs := `--statement-size=100 --skip-tz-utc`
extraArgs := `--threads 16 --skip-tz-utc`
err = parseExtraArgs(exportCfg, strings.Fields(extraArgs))
c.Assert(err, NotNil)
c.Assert(exportCfg.Threads, Equals, 16)
c.Assert(exportCfg.StatementSize, Equals, uint64(100))
}
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ workaround = ""
tags = ["internal", "high"]

[error.DM-dump-unit-32001]
message = "mydumper runs with error"
message = "mydumper runs with error, with output (may empty): %s"
description = ""
workaround = ""
tags = ["internal", "high"]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.14.3
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/lance6716/retool v1.3.7
github.com/lance6716/retool v1.3.8-0.20200806070832-3469f70b2afe
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/dumpling v0.0.0-20200605144140-0175843056a6
github.com/pingcap/errcode v0.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lance6716/retool v1.3.7 h1:KDdLkDqp/8+Wgnr0OQ6C8Pru8upEAreYay41hyazUw4=
github.com/lance6716/retool v1.3.7/go.mod h1:9nFHbMjlFhh2msJ6vuHgpz7OM7G9RO0wSLO/w1u2nhM=
github.com/lance6716/retool v1.3.8-0.20200806070832-3469f70b2afe h1:TzBkezwbsiqkytxItaOYwjtXymmwCRAfoyPGAbPfYGw=
github.com/lance6716/retool v1.3.8-0.20200806070832-3469f70b2afe/go.mod h1:9nFHbMjlFhh2msJ6vuHgpz7OM7G9RO0wSLO/w1u2nhM=
github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand Down
28 changes: 20 additions & 8 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"os/exec"
"regexp"
Expand Down Expand Up @@ -99,11 +98,11 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

// Cmd cannot be reused, so we create a new cmd when begin processing
output, err := m.spawn(ctx)
err = m.spawn(ctx)

if err != nil {
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Inc()
errs = append(errs, unit.NewProcessError(fmt.Errorf("%s. %s", err.Error(), output)))
errs = append(errs, unit.NewProcessError(err))
} else {
select {
case <-ctx.Done():
Expand All @@ -129,7 +128,7 @@ var mydumperLogRegexp = regexp.MustCompile(
`^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[(DEBUG|INFO|WARNING|ERROR)\] - `,
)

func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
func (m *Mydumper) spawn(ctx context.Context) error {
var (
stdout bytes.Buffer
stderr bytes.Buffer
Expand All @@ -138,10 +137,10 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
cmd.Stdout = &stdout
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, "")
}
if err = cmd.Start(); err != nil {
return nil, terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, "")
}

// Read the stderr from mydumper, which contained the logs.
Expand All @@ -151,6 +150,11 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
//
// so we parse all these lines and translate into our own logs.
scanner := bufio.NewScanner(stderrPipe)
// store first error detected in mydumper's log
// TODO(lance6716): if mydumper will not exit when detected error happens, we should return firstErr earlier
// and using non-block IO to drain and output mydumper's stderr
var firstErr error

for scanner.Scan() {
line := scanner.Bytes()
if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 {
Expand All @@ -168,20 +172,28 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
continue
case "ERROR":
m.logger.Error(string(msg))
if firstErr == nil && strings.HasPrefix(string(msg), "Couldn't acquire global lock") {
firstErr = terror.ErrDumpUnitGlobalLock
}
continue
}
}
stderr.Write(line)
stderr.WriteByte('\n')
}

if firstErr != nil {
return firstErr
}

if err = scanner.Err(); err != nil {
stdout.Write(stderr.Bytes())
return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes())
}

err = cmd.Wait()
stdout.Write(stderr.Bytes())
return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes())
}

// Close implements Unit.Close
Expand Down
2 changes: 2 additions & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
UnresumableErrCodes = map[int32]struct{}{
int32(terror.ErrSyncUnitDDLWrongSequence.Code()): {},
int32(terror.ErrSyncerShardDDLConflict.Code()): {},
int32(terror.ErrDumpUnitGlobalLock.Code()): {},
int32(terror.ErrDumpUnitRuntime.Code()): {},
}
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ var (
ErrPreviousGTIDsNotValid = New(codePreviousGTIDsNotValid, ClassRelayUnit, ScopeInternal, LevelHigh, "previousGTIDs %s not valid", "")

// Dump unit error
ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error", "")
ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error, with output (may empty): %s", "")
ErrDumpUnitGenTableRouter = New(codeDumpUnitGenTableRouter, ClassDumpUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.")
ErrDumpUnitGenBAList = New(codeDumpUnitGenBAList, ClassDumpUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.")
ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers")
Expand Down
56 changes: 56 additions & 0 deletions tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,63 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

function fail_acquire_global_lock() {
export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

cp $cur/data/db1.prepare.user.sql $WORK_DIR/db1.prepare.user.sql
sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db1.prepare.user.sql
run_sql_file $WORK_DIR/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 8
cp $cur/data/db2.prepare.user.sql $WORK_DIR/db2.prepare.user.sql
sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db2.prepare.user.sql
run_sql_file $WORK_DIR/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 8

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
sed -i '/timezone/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/dm-task.yaml --remove-meta" \
"\"result\": true" 1

# TaskCheckInterval set to 500ms
sleep 1

check_log_contains $WORK_DIR/worker1/log/dm-worker.log "you need (at least one of) the RELOAD privilege(s) for this operation"
check_log_contains $WORK_DIR/worker1/log/dm-worker.log "error is not resumable"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "you need (at least one of) the RELOAD privilege(s) for this operation"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "error is not resumable"

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Paused\"" 2 \
"you need (at least one of) the RELOAD privilege(s) for this operation" 2

cleanup_data full_mode
cleanup_process $*
}

function run() {
fail_acquire_global_lock

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
Expand Down
2 changes: 1 addition & 1 deletion tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function DM_001_CASE() {
}

function DM_001() {
run_case 001 "no-sharding" "init_table 111 112" "clean_table" ""
run_case 001 "single-source-no-sharding" "init_table 111 112" "clean_table" ""
}

function DM_002_CASE() {
Expand Down
2 changes: 2 additions & 0 deletions tests/sharding/conf/source2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ from:
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3307
checker:
check-enable: false
2 changes: 1 addition & 1 deletion tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ function run() {
run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

# the task should paused by `FlushCheckpointStage` failpont before flush old checkpoint.
# the task should paused by `FlushCheckpointStage` failpoint before flush old checkpoint.
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"failpoint error for FlushCheckpointStage before flush old checkpoint" 1
Expand Down

0 comments on commit aabdf91

Please sign in to comment.