Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1528 to release-2.0 (#1562)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Apr 9, 2021
1 parent 3267bbe commit 7bcfad5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 14 deletions.
9 changes: 8 additions & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go/sync2"
Expand All @@ -49,7 +50,6 @@ import (
"github.com/pingcap/dm/relay/retry"
"github.com/pingcap/dm/relay/transformer"
"github.com/pingcap/dm/relay/writer"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
)

var (
Expand All @@ -70,6 +70,8 @@ const (
// NewRelay creates an instance of Relay.
var NewRelay = NewRealRelay

var _ Process = &Relay{}

// Process defines mysql-like relay log process unit
type Process interface {
// Init initial relat log unit
Expand Down Expand Up @@ -204,6 +206,11 @@ func (r *Relay) process(ctx context.Context) error {
return err
}

failpoint.Inject("NewUpstreamServer", func(_ failpoint.Value) {
// test a bug which caused by upstream switching
isNew = true
})

if isNew {
// re-setup meta for new server or new source
err = r.reSetupMeta(ctx)
Expand Down
2 changes: 1 addition & 1 deletion tests/all_mode/conf/source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-gtid: true
enable-relay: true
relay-binlog-name: ''
relay-binlog-gtid: ''
Expand Down
2 changes: 2 additions & 0 deletions tests/all_mode/data/db1.increment0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
use all_mode;
insert into t1 (id, name) values (100, 'Eddard Stark');
1 change: 1 addition & 0 deletions tests/all_mode/data/db2.increment0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
delete from all_mode.t2 where id = 1;
40 changes: 28 additions & 12 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ function test_query_timeout(){
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# start DM task only
Expand Down Expand Up @@ -177,7 +178,11 @@ function run() {

test_stop_task_before_checkpoint

export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
inject_points=(
"github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
"github.com/pingcap/dm/relay/NewUpstreamServer=return(true)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
Expand Down Expand Up @@ -243,24 +248,35 @@ function run() {
pkill -hup tidb-server 2>/dev/null || true
wait_process_exit tidb-server

# test pause and resume relay
# dm-worker execute sql failed, and will try auto resume task
run_sql_file $cur/data/db2.increment0.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
sleep 2
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "dispatch auto resume task"

# restart tidb, and task will recover success
run_tidb_server 4000 $TIDB_PASSWORD
sleep 2

# test after pause and resume relay, relay could continue from syncer's checkpoint
run_sql_source1 "flush logs"
run_sql_file $cur/data/db1.increment0.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-relay -s mysql-replica-01" \
"\"result\": true" 2
# we used failpoint to imitate an upstream switching, which purged whole relay dir
run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-relay -s mysql-replica-01" \
"\"result\": true" 2

sleep 2
# dm-worker execute sql failed, and will try auto resume task
check_log_contains $WORK_DIR/worker1/log/dm-worker.log "dispatch auto resume task"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "dispatch auto resume task"

# restart tidb, and task will recover success
run_tidb_server 4000 $TIDB_PASSWORD
sleep 2
# relay should continue pulling from syncer's checkpoint, so only pull the latest binlog
server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index)
relay_log_num=`ls $WORK_DIR/worker1/relay_log/$server_uuid | grep -v 'relay.meta' | wc -l`
echo "relay logs `ls $WORK_DIR/worker1/relay_log/$server_uuid`"
[ $relay_log_num -eq 1 ]

# use sync_diff_inspector to check data now!
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
Expand Down

0 comments on commit 7bcfad5

Please sign in to comment.