Skip to content

Commit

Permalink
mydumper, worker: don't auto-resume when can't acquire global lock (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Aug 5, 2020
1 parent 83e7e7d commit 8de7dcc
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 1 deletion.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high]
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error"
ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers"
ErrLoadUnitCreateSchemaFile,[code=34001:class=load-unit:scope=internal:level=medium], "Message: generate schema file, Workaround: Please check the `loaders` config in task configuration file."
ErrLoadUnitInvalidFileEnding,[code=34002:class=load-unit:scope=internal:level=high], "Message: corresponding ending of sql: ')' not found"
ErrLoadUnitParseQuoteValues,[code=34003:class=load-unit:scope=internal:level=high], "Message: parse quote values error"
Expand Down
6 changes: 5 additions & 1 deletion dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,16 @@ func isResumableError(err *pb.ProcessError) bool {
return false
}
}
if err.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {

switch err.ErrCode {
case int32(terror.ErrParserParseRelayLog.Code()):
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Message), strings.ToLower(msg)) {
return false
}
}
case int32(terror.ErrDumpUnitGlobalLock.Code()):
return false
}

if _, ok := retry.UnresumableErrCodes[err.ErrCode]; ok {
Expand Down
16 changes: 16 additions & 0 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
//
// so we parse all these lines and translate into our own logs.
scanner := bufio.NewScanner(stderrPipe)
// store first error detected in mydumper's log
// TODO(lance6716): if mydumper will not exit when detected error happens, we should return firstErr earlier
// and using non-block IO to drain and output mydumper's stderr
var (
firstErr error
errMsg []byte
)
for scanner.Scan() {
line := scanner.Bytes()
if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 {
Expand All @@ -168,12 +175,21 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
continue
case "ERROR":
m.logger.Error(string(msg))
if firstErr == nil && strings.HasPrefix(string(msg), "Couldn't acquire global lock") {
firstErr = terror.ErrDumpUnitGlobalLock
errMsg = msg
}
continue
}
}
stderr.Write(line)
stderr.WriteByte('\n')
}

if firstErr != nil {
return errMsg, firstErr
}

if err = scanner.Err(); err != nil {
stdout.Write(stderr.Bytes())
return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ const (
codeDumpUnitRuntime ErrCode = iota + 32001
codeDumpUnitGenTableRouter
codeDumpUnitGenBAList
codeDumpUnitGlobalLock
)

// Load unit error code
Expand Down Expand Up @@ -808,6 +809,7 @@ var (
ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error", "")
ErrDumpUnitGenTableRouter = New(codeDumpUnitGenTableRouter, ClassDumpUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.")
ErrDumpUnitGenBAList = New(codeDumpUnitGenBAList, ClassDumpUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.")
ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers")

// Load unit error
ErrLoadUnitCreateSchemaFile = New(codeLoadUnitCreateSchemaFile, ClassLoadUnit, ScopeInternal, LevelMedium, "generate schema file", "Please check the `loaders` config in task configuration file.")
Expand Down
39 changes: 39 additions & 0 deletions tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,46 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

function fail_acquire_global_lock() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

cp $cur/data/db1.prepare.user.sql $WORK_DIR/db1.prepare.user.sql
sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db1.prepare.user.sql
run_sql_file $WORK_DIR/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 8
cp $cur/data/db2.prepare.user.sql $WORK_DIR/db2.prepare.user.sql
sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db2.prepare.user.sql
run_sql_file $WORK_DIR/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 8

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
sed -i '/timezone/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml
dmctl_start_task $WORK_DIR/dm-task.yaml

check_log_contains $WORK_DIR/worker1/log/dm-worker.log "Couldn't acquire global lock"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "Couldn't acquire global lock"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Paused\"" 4 \
"Please check upstream privilege about FTWRL" 2

cleanup_data full_mode
cleanup_process $*
}

function run() {
fail_acquire_global_lock

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
Expand Down

0 comments on commit 8de7dcc

Please sign in to comment.