Skip to content

Commit

Permalink
cdc: fix a bug that DDLs can be executed multiple times incorrectly (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored and 3AceShowHand committed Sep 5, 2022
1 parent e132ace commit 1b0105b
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 17 deletions.
44 changes: 28 additions & 16 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type changefeed struct {

newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}

func newChangefeed(
Expand Down Expand Up @@ -369,6 +371,16 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
}
failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
if c.lastDDLTs != 0 && c.state.Status.CheckpointTs >= c.lastDDLTs {
log.Info("owner won't update checkpoint because of failpoint",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("keepCheckpoint", c.state.Status.CheckpointTs),
zap.Uint64("skipCheckpoint", newCheckpointTs))
newCheckpointTs = c.state.Status.CheckpointTs
}
})
c.updateStatus(newCheckpointTs, newResolvedTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
} else if c.state.Status != nil {
Expand Down Expand Up @@ -447,38 +459,39 @@ LOOP:
return errors.Trace(err)
}
}

// if resolvedTs == checkpointTs it means owner can't tell whether the DDL on checkpointTs has
// been executed or not. So the DDL puller must start at checkpointTs-1.
var ddlStartTs uint64
if resolvedTs > checkpointTs {
ddlStartTs = checkpointTs
} else {
ddlStartTs = checkpointTs - 1
}

c.barriers = newBarriers()
if c.state.Info.SyncPointEnabled {
c.barriers.Update(syncPointBarrier, resolvedTs)
}
// Since we are starting DDL puller from (checkpointTs-1) to make
// the DDL committed at checkpointTs executable by CDC, we need to set
// the DDL barrier to the correct start point.
// TODO: get DDL barrier based on resolvedTs.
c.barriers.Update(ddlJobBarrier, checkpointTs-1)
c.barriers.Update(ddlJobBarrier, ddlStartTs)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())

// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
// the schema cache.
c.schema, err = newSchemaWrap4Owner(c.upstream.KVStorage,
checkpointTs-1, c.state.Info.Config, c.id)
c.schema, err = newSchemaWrap4Owner(c.upstream.KVStorage, ddlStartTs, c.state.Info.Config, c.id)
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel

c.sink = c.newSink()
c.sink.run(cancelCtx, c.id, c.state.Info)

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx,
c.state.Info.Config, c.upstream, checkpointTs-1, c.id)
c.ddlPuller, err = c.newDDLPuller(cancelCtx, c.state.Info.Config, c.upstream, ddlStartTs, c.id)
if err != nil {
return errors.Trace(err)
}

c.ddlWg.Add(1)
go func() {
defer c.ddlWg.Done()
Expand Down Expand Up @@ -716,10 +729,10 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
if !done {
return barrierTs, nil
}
c.lastDDLTs = ddlResolvedTs
c.ddlPuller.PopFrontDDL()
newDDLResolvedTs, _ := c.ddlPuller.FrontDDL()
c.barriers.Update(ddlJobBarrier, newDDLResolvedTs)

case syncPointBarrier:
if !blocked {
return barrierTs, nil
Expand All @@ -729,7 +742,6 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)

case finishBarrier:
if blocked {
c.feedStateManager.MarkFinished()
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func TestBarrierAdvance(t *testing.T) {
// Then the first tick barrier won't be changed.
barrier, err := cf.handleBarrier(ctx)
require.Nil(t, err)
require.Equal(t, cf.state.Info.StartTs-1, barrier)
require.Equal(t, cf.state.Info.StartTs, barrier)

// If sync-point is enabled, must tick more 1 time to advance barrier.
if i == 1 {
Expand Down
29 changes: 29 additions & 0 deletions tests/integration_tests/owner_resign/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 1

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/owner_resign/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["owner_resign.t?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
66 changes: 66 additions & 0 deletions tests/integration_tests/owner_resign/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# TODO: kafka sink has bug with this case, remove this after bug is fixed
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

export GO_FAILPOINTS=''
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301" --pd "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}"
ensure 10 "cdc cli capture list --server http://127.0.0.1:8301 |jq '.|length'|grep -E '^1$'"

TOPIC_NAME="ticdc-owner-resign-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://kafka01:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --server="127.0.0.1:8301"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql "CREATE database owner_resign;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table owner_resign.t1(id int not null primary key, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# wait table t1 is processed by cdc server
ensure 10 "cdc cli processor list --server http://127.0.0.1:8301 |jq '.|length'|grep -E '^1$'"
# check the t1 is replicated to downstream to make sure the t1 is dispatched to cdc1
check_table_exists "owner_resign.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_sql "INSERT INTO owner_resign.t1 (id, val) values (1, 1);"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedOwnerDontUpdateCheckpoint=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "2" --addr "127.0.0.1:8302" --pd "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}"
ensure 10 "cdc cli capture list --server http://127.0.0.1:8302 |jq '.|length'|grep -E '^2$'"

curl -X POST http://127.0.0.1:8301/api/v1/owner/resign
sleep 3

run_sql "TRUNCATE TABLE owner_resign.t1;"
run_sql "INSERT INTO owner_resign.t1 (id, val) values (2, 2);"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

curl -X POST http://127.0.0.1:8302/api/v1/owner/resign
sleep 10
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 1b0105b

Please sign in to comment.