From 8d6a9e154fd161df7e6d87786ed1d7fab4f36cdc Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 7 May 2024 18:31:07 +0800 Subject: [PATCH] ticdc: Support `create table` ddl only appear in tidb_ddl_history instead of tidb_ddl_job table (#10907) close pingcap/tiflow#10908 --- cdc/entry/mounter.go | 88 ++++++++++++++++--- cdc/puller/ddl_puller.go | 38 +++++--- pkg/spanz/span.go | 9 +- .../integration_tests/batch_add_table/run.sh | 47 +++++++++- tests/integration_tests/multi_source/run.sh | 25 ++++++ 5 files changed, 181 insertions(+), 26 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 0a9cb75cd72..82be98ea63e 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -38,6 +38,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" pfilter "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/integrity" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -64,6 +65,19 @@ type rowKVEntry struct { PreRowExist bool } +// DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history +// and the column id of `job_meta` in these two tables. +type DDLTableInfo struct { + // ddlJobsTable use to parse all ddl jobs except `create table` + DDLJobTable *model.TableInfo + // It holds the column id of `job_meta` in table `tidb_ddl_jobs`. + JobMetaColumnIDinJobTable int64 + // ddlHistoryTable only use to parse `create table` ddl job + DDLHistoryTable *model.TableInfo + // It holds the column id of `job_meta` in table `tidb_ddl_history`. + JobMetaColumnIDinHistoryTable int64 +} + // Mounter is used to parse SQL events from KV events type Mounter interface { // DecodeEvent accepts `model.PolymorphicEvent` with `RawKVEntry` filled and @@ -298,39 +312,89 @@ func IsLegacyFormatJob(rawKV *model.RawKVEntry) bool { return bytes.HasPrefix(rawKV.Key, metaPrefix) } -// ParseDDLJob parses the job from the raw KV entry. id is the column id of `job_meta`. -func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (*timodel.Job, error) { +// ParseDDLJob parses the job from the raw KV entry. +func ParseDDLJob(rawKV *model.RawKVEntry, ddlTableInfo *DDLTableInfo) (*timodel.Job, error) { var v []byte + var datum types.Datum + + // for test case only if bytes.HasPrefix(rawKV.Key, metaPrefix) { - // old queue base job. v = rawKV.Value - } else { - // DDL job comes from `tidb_ddl_job` table after we support concurrent DDL. We should decode the job from the column. - recordID, err := tablecodec.DecodeRowKey(rawKV.Key) + job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false) + if err != nil || job == nil { + job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true) + } + return job, err + } + + recordID, err := tablecodec.DecodeRowKey(rawKV.Key) + if err != nil { + return nil, errors.Trace(err) + } + + tableID := tablecodec.DecodeTableID(rawKV.Key) + + // parse it with tidb_ddl_job + if tableID == spanz.JobTableID { + row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLJobTable, time.UTC) if err != nil { return nil, errors.Trace(err) } - row, err := decodeRow(rawKV.Value, recordID, tblInfo, time.UTC) + datum = row[ddlTableInfo.JobMetaColumnIDinJobTable] + v = datum.GetBytes() + + return parseJob(v, rawKV.StartTs, rawKV.CRTs, false) + } else if tableID == spanz.JobHistoryID { + // parse it with tidb_ddl_history + row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC) if err != nil { return nil, errors.Trace(err) } - datum := row[id] + datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable] v = datum.GetBytes() + + return parseJob(v, rawKV.StartTs, rawKV.CRTs, true) } - return parseJob(v, rawKV.StartTs, rawKV.CRTs) + return nil, fmt.Errorf("Unvalid tableID %v in rawKV.Key", tableID) } // parseJob unmarshal the job from "v". -func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { +// fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history +// We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off +// When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully. +// When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job, +// and being inserted into tidb_ddl_history after being executed successfully. +// In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully. +// +// To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job. +// When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job) +// Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice. +// Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history. +// Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls. +func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*timodel.Job, error) { var job timodel.Job err := json.Unmarshal(v, &job) if err != nil { return nil, errors.Trace(err) } - if !job.IsDone() { - return nil, nil + + if fromHistoryTable { + // we only want to get `create table` ddl from tidb_ddl_history, so we just throw out others ddls. + // We only want the job with `JobStateSynced`, which is means the ddl job is done successfully. + // Besides, to satisfy the subsequent processing, + // We need to set the job to be Done to make it will replay in schemaStorage + if job.Type != timodel.ActionCreateTable || job.State != timodel.JobStateSynced { + return nil, nil + } + job.State = timodel.JobStateDone + } else { + // we need to get all ddl job which is done from tidb_ddl_job + if !job.IsDone() { + return nil, nil + } } + // FinishedTS is only set when the job is synced, // but we can use the entry's ts here job.StartTS = startTs diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 009256bcf7b..e50ad06bf72 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -75,11 +75,8 @@ type ddlJobPullerImpl struct { resolvedTs uint64 schemaVersion int64 filter filter.Filter - // ddlJobsTable is initialized when receive the first concurrent DDL job. - // It holds the info of table `tidb_ddl_jobs` of upstream TiDB. - ddlJobsTable *model.TableInfo - // It holds the column id of `job_meta` in table `tidb_ddl_jobs`. - jobMetaColumnID int64 + // ddlTableInfo is initialized when receive the first concurrent DDL job. + ddlTableInfo *entry.DDLTableInfo // outputCh sends the DDL job entries to the caller. outputCh chan *model.DDLJobEntry } @@ -239,13 +236,14 @@ func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job, if rawKV.OpType != model.OpTypePut { return nil, nil } - if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) { - err := p.initJobTableMeta() + if p.ddlTableInfo == nil && !entry.IsLegacyFormatJob(rawKV) { + err := p.initDDLTableInfo() if err != nil { return nil, errors.Trace(err) } } - return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID) + + return entry.ParseDDLJob(rawKV, p.ddlTableInfo) } func (p *ddlJobPullerImpl) getResolvedTs() uint64 { @@ -256,7 +254,7 @@ func (p *ddlJobPullerImpl) setResolvedTs(ts uint64) { atomic.StoreUint64(&p.resolvedTs, ts) } -func (p *ddlJobPullerImpl) initJobTableMeta() error { +func (p *ddlJobPullerImpl) initDDLTableInfo() error { version, err := p.kvStorage.CurrentVersion(tidbkv.GlobalTxnScope) if err != nil { return errors.Trace(err) @@ -277,6 +275,8 @@ func (p *ddlJobPullerImpl) initJobTableMeta() error { if err != nil { return errors.Trace(err) } + + // for tidb_ddl_job tableInfo, err := findTableByName(tbls, "tidb_ddl_job") if err != nil { return errors.Trace(err) @@ -287,8 +287,24 @@ func (p *ddlJobPullerImpl) initJobTableMeta() error { return errors.Trace(err) } - p.ddlJobsTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo) - p.jobMetaColumnID = col.ID + p.ddlTableInfo = &entry.DDLTableInfo{} + p.ddlTableInfo.DDLJobTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo) + p.ddlTableInfo.JobMetaColumnIDinJobTable = col.ID + + // for tidb_ddl_history + historyTableInfo, err := findTableByName(tbls, "tidb_ddl_history") + if err != nil { + return errors.Trace(err) + } + + historyTableCol, err := findColumnByName(historyTableInfo.Columns, "job_meta") + if err != nil { + return errors.Trace(err) + } + + p.ddlTableInfo.DDLHistoryTable = model.WrapTableInfo(db.ID, db.Name.L, 0, historyTableInfo) + p.ddlTableInfo.JobMetaColumnIDinHistoryTable = historyTableCol.ID + return nil } diff --git a/pkg/spanz/span.go b/pkg/spanz/span.go index 1eed64e1a4c..43b715c1a93 100644 --- a/pkg/spanz/span.go +++ b/pkg/spanz/span.go @@ -29,6 +29,8 @@ import ( const ( // JobTableID is the id of `tidb_ddl_job`. JobTableID = ddl.JobTableID + // JobHistoryID is the id of `tidb_ddl_history` + JobHistoryID = ddl.HistoryTableID ) // UpperBoundKey represents the maximum value. @@ -62,12 +64,17 @@ func GetTableRange(tableID int64) (startKey, endKey []byte) { // GetAllDDLSpan return all cdc interested spans for DDL. func GetAllDDLSpan() []tablepb.Span { - spans := make([]tablepb.Span, 0, 1) + spans := make([]tablepb.Span, 0, 2) start, end := GetTableRange(JobTableID) spans = append(spans, tablepb.Span{ StartKey: ToComparableKey(start), EndKey: ToComparableKey(end), }) + start, end = GetTableRange(JobHistoryID) + spans = append(spans, tablepb.Span{ + StartKey: ToComparableKey(start), + EndKey: ToComparableKey(end), + }) return spans } diff --git a/tests/integration_tests/batch_add_table/run.sh b/tests/integration_tests/batch_add_table/run.sh index 1154958e9a2..9cce075c532 100644 --- a/tests/integration_tests/batch_add_table/run.sh +++ b/tests/integration_tests/batch_add_table/run.sh @@ -8,13 +8,54 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 -function run() { +function run_with_fast_create_table() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR + run_sql "set global tidb_enable_fast_create_table=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + pulsar) + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --sink-uri="$SINK_URI" + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + esac + + run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists batch_add_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +function run_without_fast_create_table() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} @@ -46,6 +87,8 @@ function run() { } trap stop_tidb_cluster EXIT -run $* +run_without_fast_create_table $* +stop_tidb_cluster +run_with_fast_create_table $* check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/multi_source/run.sh b/tests/integration_tests/multi_source/run.sh index 891970eea25..f3017fe579e 100755 --- a/tests/integration_tests/multi_source/run.sh +++ b/tests/integration_tests/multi_source/run.sh @@ -41,12 +41,14 @@ function prepare() { trap stop_tidb_cluster EXIT # storage is not supported yet. +# test without fast create table if [ "$SINK_TYPE" != "storage" ]; then # TODO(dongmen): enable pulsar in the future. if [ "$SINK_TYPE" == "pulsar" ]; then exit 0 fi prepare $* + run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT} cd "$(dirname "$0")" set -o pipefail GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log @@ -55,6 +57,29 @@ if [ "$SINK_TYPE" != "storage" ]; then check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_5 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_sync_diff $WORK_DIR $CUR/diff_config.toml + cleanup_process $CDC_BINARY + check_logs $WORK_DIR +fi +# test with fast create table +if [ "$SINK_TYPE" != "storage" ]; then + # TODO(dongmen): enable pulsar in the future. + if [ "$SINK_TYPE" == "pulsar" ]; then + exit 0 + fi + prepare $* + run_sql "set global tidb_enable_fast_create_table=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + cd "$(dirname "$0")" + set -o pipefail + GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log + check_table_exists mark.finish_mark_0 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_5 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_sync_diff $WORK_DIR $CUR/diff_config.toml cleanup_process $CDC_BINARY