From 2e1a3380e4fb0e388c5f0c02ce755d62071777d4 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Thu, 27 Apr 2023 12:41:52 +0800 Subject: [PATCH] tests(ticdc): add storage integration tests (#8685) close pingcap/tiflow#8826 --- tests/integration_tests/api_v2/run.sh | 8 +-- tests/integration_tests/bank/run.sh | 6 +- .../batch_update_to_no_batch/run.sh | 2 +- tests/integration_tests/bdr_mode/run.sh | 2 +- .../canal_json_adapter_compatibility/run.sh | 2 +- .../integration_tests/canal_json_basic/run.sh | 2 +- .../canal_json_storage_basic/run.sh | 4 +- .../canal_json_storage_partition_table/run.sh | 4 +- tests/integration_tests/cdc/cdc.go | 2 +- .../{ => cdc}/dailytest/case.go | 0 .../{ => cdc}/dailytest/dailytest.go | 0 .../{ => cdc}/dailytest/data.go | 0 .../{ => cdc}/dailytest/db.go | 0 .../{ => cdc}/dailytest/exector.go | 0 .../{ => cdc}/dailytest/job.go | 0 .../{ => cdc}/dailytest/parser.go | 0 .../{ => cdc}/dailytest/rand.go | 0 .../changefeed_fast_fail/run.sh | 2 +- .../run.sh | 15 ++--- .../integration_tests/clustered_index/run.sh | 4 +- .../consistent_replicate_ddl/run.sh | 2 +- .../consistent_replicate_gbk/run.sh | 2 +- .../consistent_replicate_nfs/run.sh | 2 +- .../consistent_replicate_storage_file/run.sh | 2 +- .../consistent_replicate_storage_s3/run.sh | 2 +- .../csv_storage_basic/run.sh | 4 +- .../csv_storage_multi_tables_ddl/run.sh | 2 +- .../csv_storage_partition_table/run.sh | 4 +- tests/integration_tests/ddl_reentrant/run.sh | 2 +- tests/integration_tests/http_api/run.sh | 4 +- tests/integration_tests/http_api_tls/run.sh | 4 +- .../kafka_big_messages/run.sh | 2 +- .../kafka_big_messages_v2/run.sh | 2 +- .../conf/changefeed.toml | 0 .../conf/diff_config.toml | 2 +- .../conf/workload | 0 .../{big_txn_v2 => kafka_big_txn_v2}/run.sh | 3 +- .../kafka_compression/run.sh | 2 +- tests/integration_tests/kafka_messages/run.sh | 22 +++---- .../kafka_sink_error_resume/run.sh | 2 +- .../kill_owner_with_ddl/run.sh | 2 +- .../multi_cdc_cluster/run.sh | 2 +- .../integration_tests/multi_changefeed/run.sh | 2 +- .../multi_tables_ddl_v2/run.sh | 3 +- tests/integration_tests/multi_topics/run.sh | 2 +- .../integration_tests/multi_topics_v2/run.sh | 3 +- .../owner_remove_table_error/run.sh | 2 +- .../processor_etcd_worker_delay/run.sh | 2 +- .../processor_resolved_ts_fallback/run.sh | 2 +- tests/integration_tests/run_group.sh | 64 ++++++++++++------- tests/integration_tests/sequence/run.sh | 2 +- tests/integration_tests/sink_hang/run.sh | 2 +- 52 files changed, 102 insertions(+), 104 deletions(-) rename tests/integration_tests/{ => cdc}/dailytest/case.go (100%) rename tests/integration_tests/{ => cdc}/dailytest/dailytest.go (100%) rename tests/integration_tests/{ => cdc}/dailytest/data.go (100%) rename tests/integration_tests/{ => cdc}/dailytest/db.go (100%) rename tests/integration_tests/{ => cdc}/dailytest/exector.go (100%) rename tests/integration_tests/{ => cdc}/dailytest/job.go (100%) rename tests/integration_tests/{ => cdc}/dailytest/parser.go (100%) rename tests/integration_tests/{ => cdc}/dailytest/rand.go (100%) rename tests/integration_tests/{big_txn_v2 => kafka_big_txn_v2}/conf/changefeed.toml (100%) rename tests/integration_tests/{big_txn_v2 => kafka_big_txn_v2}/conf/diff_config.toml (85%) rename tests/integration_tests/{big_txn_v2 => kafka_big_txn_v2}/conf/workload (100%) rename tests/integration_tests/{big_txn_v2 => kafka_big_txn_v2}/run.sh (96%) diff --git a/tests/integration_tests/api_v2/run.sh b/tests/integration_tests/api_v2/run.sh index 4fd76f36a50..865ee8ae66b 100644 --- a/tests/integration_tests/api_v2/run.sh +++ b/tests/integration_tests/api_v2/run.sh @@ -9,10 +9,6 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function prepare() { - if [ "$SINK_TYPE" == "kafka" ]; then - return - fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -29,8 +25,8 @@ function prepare() { } trap stop_tidb_cluster EXIT -# kafka is not supported yet. -if [ "$SINK_TYPE" != "kafka" ]; then +# kafka and storage is not supported yet. +if [ "$SINK_TYPE" == "mysql" ]; then prepare $* cd "$(dirname "$0")" diff --git a/tests/integration_tests/bank/run.sh b/tests/integration_tests/bank/run.sh index 9aa463a0030..eb213c25ae4 100644 --- a/tests/integration_tests/bank/run.sh +++ b/tests/integration_tests/bank/run.sh @@ -9,10 +9,6 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function prepare() { - if [ "$SINK_TYPE" == "kafka" ]; then - return - fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -30,7 +26,7 @@ function prepare() { trap stop_tidb_cluster EXIT # kafka is not supported yet. -if [ "$SINK_TYPE" != "kafka" ]; then +if [ "$SINK_TYPE" == "mysql" ]; then prepare $* cd "$(dirname "$0")" diff --git a/tests/integration_tests/batch_update_to_no_batch/run.sh b/tests/integration_tests/batch_update_to_no_batch/run.sh index ee5865267fa..15a9b26f340 100644 --- a/tests/integration_tests/batch_update_to_no_batch/run.sh +++ b/tests/integration_tests/batch_update_to_no_batch/run.sh @@ -14,7 +14,7 @@ SINK_TYPE=$1 # 3. cdc can switch from batch mode to no-batch mode and vice versa and works well function run() { # batch mode only supports mysql sink - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/bdr_mode/run.sh b/tests/integration_tests/bdr_mode/run.sh index 2c77256ec75..25007391f56 100644 --- a/tests/integration_tests/bdr_mode/run.sh +++ b/tests/integration_tests/bdr_mode/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 function run() { # BDR mode only supports mysql sink - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/canal_json_adapter_compatibility/run.sh b/tests/integration_tests/canal_json_adapter_compatibility/run.sh index 877adee988d..f9c45af1dc3 100644 --- a/tests/integration_tests/canal_json_adapter_compatibility/run.sh +++ b/tests/integration_tests/canal_json_adapter_compatibility/run.sh @@ -11,7 +11,7 @@ SINK_TYPE=$1 # use canal-adapter to sync data from kafka to mysql, # make sure that `canal-json` output can be consumed by the canal-adapter. function run() { - if [ "$SINK_TYPE" == "mysql" ]; then + if [ "$SINK_TYPE" != "kafka" ]; then return fi diff --git a/tests/integration_tests/canal_json_basic/run.sh b/tests/integration_tests/canal_json_basic/run.sh index 56affd90ece..9551a4b7e51 100644 --- a/tests/integration_tests/canal_json_basic/run.sh +++ b/tests/integration_tests/canal_json_basic/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 # use kafka-consumer with canal-json decoder to sync data from kafka to mysql function run() { - if [ "$SINK_TYPE" == "mysql" ]; then + if [ "$SINK_TYPE" != "kafka" ]; then return fi diff --git a/tests/integration_tests/canal_json_storage_basic/run.sh b/tests/integration_tests/canal_json_storage_basic/run.sh index 97e6cb03098..dfb413c899f 100644 --- a/tests/integration_tests/canal_json_storage_basic/run.sh +++ b/tests/integration_tests/canal_json_storage_basic/run.sh @@ -9,9 +9,7 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - # Now, we run the storage tests in mysql sink tests. - # It's a temporary solution, we will move it to a new test pipeline later. - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" != "storage" ]; then return fi diff --git a/tests/integration_tests/canal_json_storage_partition_table/run.sh b/tests/integration_tests/canal_json_storage_partition_table/run.sh index e730c57dcb4..63899b6c0da 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/run.sh +++ b/tests/integration_tests/canal_json_storage_partition_table/run.sh @@ -9,9 +9,7 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - # Now, we run the storage tests in mysql sink tests. - # It's a temporary solution, we will move it to a new test pipeline later. - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" != "storage" ]; then return fi diff --git a/tests/integration_tests/cdc/cdc.go b/tests/integration_tests/cdc/cdc.go index 6b7f56cb36f..5e224609302 100644 --- a/tests/integration_tests/cdc/cdc.go +++ b/tests/integration_tests/cdc/cdc.go @@ -20,7 +20,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/tests/integration_tests/dailytest" + "github.com/pingcap/tiflow/tests/integration_tests/cdc/dailytest" "github.com/pingcap/tiflow/tests/integration_tests/util" ) diff --git a/tests/integration_tests/dailytest/case.go b/tests/integration_tests/cdc/dailytest/case.go similarity index 100% rename from tests/integration_tests/dailytest/case.go rename to tests/integration_tests/cdc/dailytest/case.go diff --git a/tests/integration_tests/dailytest/dailytest.go b/tests/integration_tests/cdc/dailytest/dailytest.go similarity index 100% rename from tests/integration_tests/dailytest/dailytest.go rename to tests/integration_tests/cdc/dailytest/dailytest.go diff --git a/tests/integration_tests/dailytest/data.go b/tests/integration_tests/cdc/dailytest/data.go similarity index 100% rename from tests/integration_tests/dailytest/data.go rename to tests/integration_tests/cdc/dailytest/data.go diff --git a/tests/integration_tests/dailytest/db.go b/tests/integration_tests/cdc/dailytest/db.go similarity index 100% rename from tests/integration_tests/dailytest/db.go rename to tests/integration_tests/cdc/dailytest/db.go diff --git a/tests/integration_tests/dailytest/exector.go b/tests/integration_tests/cdc/dailytest/exector.go similarity index 100% rename from tests/integration_tests/dailytest/exector.go rename to tests/integration_tests/cdc/dailytest/exector.go diff --git a/tests/integration_tests/dailytest/job.go b/tests/integration_tests/cdc/dailytest/job.go similarity index 100% rename from tests/integration_tests/dailytest/job.go rename to tests/integration_tests/cdc/dailytest/job.go diff --git a/tests/integration_tests/dailytest/parser.go b/tests/integration_tests/cdc/dailytest/parser.go similarity index 100% rename from tests/integration_tests/dailytest/parser.go rename to tests/integration_tests/cdc/dailytest/parser.go diff --git a/tests/integration_tests/dailytest/rand.go b/tests/integration_tests/cdc/dailytest/rand.go similarity index 100% rename from tests/integration_tests/dailytest/rand.go rename to tests/integration_tests/cdc/dailytest/rand.go diff --git a/tests/integration_tests/changefeed_fast_fail/run.sh b/tests/integration_tests/changefeed_fast_fail/run.sh index 183b586515e..3a280665de5 100644 --- a/tests/integration_tests/changefeed_fast_fail/run.sh +++ b/tests/integration_tests/changefeed_fast_fail/run.sh @@ -12,7 +12,7 @@ MAX_RETRIES=20 function run() { # it is no need to test kafka # the logic are all the same - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh b/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh index 8181d1e07ca..d0319e4c794 100644 --- a/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh +++ b/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh @@ -10,11 +10,6 @@ SINK_TYPE=$1 MAX_RETRIES=20 function prepare() { - if [ "$SINK_TYPE" == "kafka" ]; then - echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" - exit 0 - fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR @@ -132,9 +127,11 @@ function resume_changefeed_in_failed_state() { } trap stop_tidb_cluster EXIT -prepare -resume_changefeed_in_stopped_state $* -resume_changefeed_in_failed_state $* -check_logs $WORK_DIR +if [ "$SINK_TYPE" == "mysql" ]; then + prepare + resume_changefeed_in_stopped_state $* + resume_changefeed_in_failed_state $* + check_logs $WORK_DIR +fi echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/clustered_index/run.sh b/tests/integration_tests/clustered_index/run.sh index afde9e62284..ca3b9db047c 100755 --- a/tests/integration_tests/clustered_index/run.sh +++ b/tests/integration_tests/clustered_index/run.sh @@ -46,8 +46,8 @@ function run() { # kafka is not supported yet. # ref to issue: https://github.com/pingcap/tiflow/issues/3421 -if [ "$SINK_TYPE" = "kafka" ]; then - echo "[$(date)] <<<<<< skip test case $TEST_NAME for kafka! >>>>>>" +if [ "$SINK_TYPE" != "mysql" ]; then + echo "[$(date)] <<<<<< skip test case $TEST_NAME for $SINK_TYPE! >>>>>>" exit 0 fi trap stop_tidb_cluster EXIT diff --git a/tests/integration_tests/consistent_replicate_ddl/run.sh b/tests/integration_tests/consistent_replicate_ddl/run.sh index 6000c481202..c144c623819 100644 --- a/tests/integration_tests/consistent_replicate_ddl/run.sh +++ b/tests/integration_tests/consistent_replicate_ddl/run.sh @@ -19,7 +19,7 @@ stop() { function run() { # we only support eventually consistent replication with MySQL sink - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/consistent_replicate_gbk/run.sh b/tests/integration_tests/consistent_replicate_gbk/run.sh index 2054f741116..1359e3abc7c 100644 --- a/tests/integration_tests/consistent_replicate_gbk/run.sh +++ b/tests/integration_tests/consistent_replicate_gbk/run.sh @@ -43,7 +43,7 @@ s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_E function run() { # we only support eventually consistent replication with MySQL sink - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/consistent_replicate_nfs/run.sh b/tests/integration_tests/consistent_replicate_nfs/run.sh index 35579801da9..f2b9a6f6612 100644 --- a/tests/integration_tests/consistent_replicate_nfs/run.sh +++ b/tests/integration_tests/consistent_replicate_nfs/run.sh @@ -16,7 +16,7 @@ stop() { function run() { # we only support eventually consistent replication with MySQL sink - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/consistent_replicate_storage_file/run.sh b/tests/integration_tests/consistent_replicate_storage_file/run.sh index 73ebab005fa..5e03d111e43 100644 --- a/tests/integration_tests/consistent_replicate_storage_file/run.sh +++ b/tests/integration_tests/consistent_replicate_storage_file/run.sh @@ -19,7 +19,7 @@ stop() { function run() { # we only support eventually consistent replication with MySQL sink - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/consistent_replicate_storage_s3/run.sh b/tests/integration_tests/consistent_replicate_storage_s3/run.sh index 9ce5048b9dc..4ec5d429f26 100644 --- a/tests/integration_tests/consistent_replicate_storage_s3/run.sh +++ b/tests/integration_tests/consistent_replicate_storage_s3/run.sh @@ -45,7 +45,7 @@ s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_E function run() { # we only support eventually consistent replication with MySQL sink - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/csv_storage_basic/run.sh b/tests/integration_tests/csv_storage_basic/run.sh index b0af86bfb43..ac7a2b6164e 100644 --- a/tests/integration_tests/csv_storage_basic/run.sh +++ b/tests/integration_tests/csv_storage_basic/run.sh @@ -9,9 +9,7 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - # Now, we run the storage tests in mysql sink tests. - # It's a temporary solution, we will move it to a new test pipeline later. - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" != "storage" ]; then return fi diff --git a/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh b/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh index 1a847a66f66..f0f774b3dd5 100755 --- a/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh +++ b/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh @@ -11,7 +11,7 @@ SINK_TYPE=$1 function run() { # Now, we run the storage tests in mysql sink tests. # It's a temporary solution, we will move it to a new test pipeline later. - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" != "storage" ]; then return fi diff --git a/tests/integration_tests/csv_storage_partition_table/run.sh b/tests/integration_tests/csv_storage_partition_table/run.sh index a192f50a706..8d47dad4d41 100644 --- a/tests/integration_tests/csv_storage_partition_table/run.sh +++ b/tests/integration_tests/csv_storage_partition_table/run.sh @@ -9,9 +9,7 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - # Now, we run the storage tests in mysql sink tests. - # It's a temporary solution, we will move it to a new test pipeline later. - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" != "storage" ]; then return fi diff --git a/tests/integration_tests/ddl_reentrant/run.sh b/tests/integration_tests/ddl_reentrant/run.sh index 44410a72e31..a719c36604a 100644 --- a/tests/integration_tests/ddl_reentrant/run.sh +++ b/tests/integration_tests/ddl_reentrant/run.sh @@ -119,7 +119,7 @@ function ddl_test() { function run() { # don't test kafka in this case - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/http_api/run.sh b/tests/integration_tests/http_api/run.sh index e202b502370..c95795971f3 100644 --- a/tests/integration_tests/http_api/run.sh +++ b/tests/integration_tests/http_api/run.sh @@ -10,8 +10,8 @@ SINK_TYPE=$1 MAX_RETRIES=50 function run() { - # mysql and kafka are the same - if [ "$SINK_TYPE" == "kafka" ]; then + # storage and kafka are the same as mysql + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/http_api_tls/run.sh b/tests/integration_tests/http_api_tls/run.sh index e16d4c69463..5a1aec78f8d 100644 --- a/tests/integration_tests/http_api_tls/run.sh +++ b/tests/integration_tests/http_api_tls/run.sh @@ -11,8 +11,8 @@ TLS_DIR=$(cd $CUR/../_certificates && pwd) MAX_RETRIES=20 function run() { - # mysql and kafka are the same - if [ "$SINK_TYPE" == "kafka" ]; then + # storage and kafka are the same as mysql + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/kafka_big_messages/run.sh b/tests/integration_tests/kafka_big_messages/run.sh index 40698eb0466..d0904360006 100755 --- a/tests/integration_tests/kafka_big_messages/run.sh +++ b/tests/integration_tests/kafka_big_messages/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 function run() { # test kafka sink only in this case - if [ "$SINK_TYPE" == "mysql" ]; then + if [ "$SINK_TYPE" != "kafka" ]; then return fi rm -rf $WORK_DIR && mkdir -p $WORK_DIR diff --git a/tests/integration_tests/kafka_big_messages_v2/run.sh b/tests/integration_tests/kafka_big_messages_v2/run.sh index 60da2498fc8..d1f861fdeee 100755 --- a/tests/integration_tests/kafka_big_messages_v2/run.sh +++ b/tests/integration_tests/kafka_big_messages_v2/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 function run() { # test kafka sink only in this case - if [ "$SINK_TYPE" == "mysql" ]; then + if [ "$SINK_TYPE" != "kafka" ]; then return fi rm -rf $WORK_DIR && mkdir -p $WORK_DIR diff --git a/tests/integration_tests/big_txn_v2/conf/changefeed.toml b/tests/integration_tests/kafka_big_txn_v2/conf/changefeed.toml similarity index 100% rename from tests/integration_tests/big_txn_v2/conf/changefeed.toml rename to tests/integration_tests/kafka_big_txn_v2/conf/changefeed.toml diff --git a/tests/integration_tests/big_txn_v2/conf/diff_config.toml b/tests/integration_tests/kafka_big_txn_v2/conf/diff_config.toml similarity index 85% rename from tests/integration_tests/big_txn_v2/conf/diff_config.toml rename to tests/integration_tests/kafka_big_txn_v2/conf/diff_config.toml index 367c21817c8..71521d8b006 100644 --- a/tests/integration_tests/big_txn_v2/conf/diff_config.toml +++ b/tests/integration_tests/kafka_big_txn_v2/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] - output-dir = "/tmp/tidb_cdc_test/big_txn/sync_diff/output" + output-dir = "/tmp/tidb_cdc_test/kafka_big_txn_v2/sync_diff/output" source-instances = ["tidb"] diff --git a/tests/integration_tests/big_txn_v2/conf/workload b/tests/integration_tests/kafka_big_txn_v2/conf/workload similarity index 100% rename from tests/integration_tests/big_txn_v2/conf/workload rename to tests/integration_tests/kafka_big_txn_v2/conf/workload diff --git a/tests/integration_tests/big_txn_v2/run.sh b/tests/integration_tests/kafka_big_txn_v2/run.sh similarity index 96% rename from tests/integration_tests/big_txn_v2/run.sh rename to tests/integration_tests/kafka_big_txn_v2/run.sh index 63142119583..bd449ff6ccb 100755 --- a/tests/integration_tests/big_txn_v2/run.sh +++ b/tests/integration_tests/kafka_big_txn_v2/run.sh @@ -12,7 +12,8 @@ CDC_COUNT=3 DB_COUNT=4 function run() { - if [ "$SINK_TYPE" == "mysql" ]; then + # test kafka sink only in this case + if [ "$SINK_TYPE" != "kafka" ]; then return fi rm -rf $WORK_DIR && mkdir -p $WORK_DIR diff --git a/tests/integration_tests/kafka_compression/run.sh b/tests/integration_tests/kafka_compression/run.sh index ed779827eff..64f5544781e 100644 --- a/tests/integration_tests/kafka_compression/run.sh +++ b/tests/integration_tests/kafka_compression/run.sh @@ -30,7 +30,7 @@ function test_compression() { } function run() { - if [ "$SINK_TYPE" == "mysql" ]; then + if [ "$SINK_TYPE" != "kafka" ]; then return fi diff --git a/tests/integration_tests/kafka_messages/run.sh b/tests/integration_tests/kafka_messages/run.sh index f2659b79900..88562a92725 100755 --- a/tests/integration_tests/kafka_messages/run.sh +++ b/tests/integration_tests/kafka_messages/run.sh @@ -12,11 +12,6 @@ CDC_COUNT=3 DB_COUNT=4 function run_length_limit() { - # test kafka sink only in this case - if [ "$SINK_TYPE" == "mysql" ]; then - return - fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -71,11 +66,6 @@ function run_length_limit() { } function run_batch_size_limit() { - # test kafka sink only in this case - if [ "$SINK_TYPE" == "mysql" ]; then - return - fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -126,8 +116,16 @@ function run_batch_size_limit() { stop_tidb_cluster } +function run() { + # test kafka sink only in this case + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + run_length_limit $* + run_batch_size_limit $* +} + trap stop_tidb_cluster EXIT -run_length_limit $* -run_batch_size_limit $* check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/kafka_sink_error_resume/run.sh b/tests/integration_tests/kafka_sink_error_resume/run.sh index 2e6326ea4ec..c5e44a3acdc 100755 --- a/tests/integration_tests/kafka_sink_error_resume/run.sh +++ b/tests/integration_tests/kafka_sink_error_resume/run.sh @@ -14,7 +14,7 @@ MAX_RETRIES=20 function run() { # test kafka sink only in this case - if [ "$SINK_TYPE" == "mysql" ]; then + if [ "$SINK_TYPE" != "kafka" ]; then return fi diff --git a/tests/integration_tests/kill_owner_with_ddl/run.sh b/tests/integration_tests/kill_owner_with_ddl/run.sh index 60c7c6a69c3..39475021a06 100755 --- a/tests/integration_tests/kill_owner_with_ddl/run.sh +++ b/tests/integration_tests/kill_owner_with_ddl/run.sh @@ -39,7 +39,7 @@ export -f kill_cdc_and_restart function run() { # kafka is not supported yet. - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/multi_cdc_cluster/run.sh b/tests/integration_tests/multi_cdc_cluster/run.sh index 862f3874688..8b17118462f 100644 --- a/tests/integration_tests/multi_cdc_cluster/run.sh +++ b/tests/integration_tests/multi_cdc_cluster/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 function run() { # test mysql sink only in this case - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi rm -rf $WORK_DIR && mkdir -p $WORK_DIR diff --git a/tests/integration_tests/multi_changefeed/run.sh b/tests/integration_tests/multi_changefeed/run.sh index f4522be3d63..f6850b5d594 100755 --- a/tests/integration_tests/multi_changefeed/run.sh +++ b/tests/integration_tests/multi_changefeed/run.sh @@ -57,7 +57,7 @@ export -f check_old_value_enabled function run() { # kafka is not supported yet. - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/multi_tables_ddl_v2/run.sh b/tests/integration_tests/multi_tables_ddl_v2/run.sh index 608084cfa85..70ce52bdba9 100755 --- a/tests/integration_tests/multi_tables_ddl_v2/run.sh +++ b/tests/integration_tests/multi_tables_ddl_v2/run.sh @@ -9,7 +9,8 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - if [ "$SINK_TYPE" == "mysql" ]; then + # test kafka sink only in this case + if [ "$SINK_TYPE" != "kafka" ]; then return fi diff --git a/tests/integration_tests/multi_topics/run.sh b/tests/integration_tests/multi_topics/run.sh index f7257f0fbcc..fa38b2f1037 100644 --- a/tests/integration_tests/multi_topics/run.sh +++ b/tests/integration_tests/multi_topics/run.sh @@ -9,7 +9,7 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - if [ "$SINK_TYPE" == "mysql" ]; then + if [ "$SINK_TYPE" != "kafka" ]; then return fi diff --git a/tests/integration_tests/multi_topics_v2/run.sh b/tests/integration_tests/multi_topics_v2/run.sh index f7257f0fbcc..615e577e900 100644 --- a/tests/integration_tests/multi_topics_v2/run.sh +++ b/tests/integration_tests/multi_topics_v2/run.sh @@ -9,7 +9,8 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - if [ "$SINK_TYPE" == "mysql" ]; then + # test kafka sink only in this case + if [ "$SINK_TYPE" != "kafka" ]; then return fi diff --git a/tests/integration_tests/owner_remove_table_error/run.sh b/tests/integration_tests/owner_remove_table_error/run.sh index d28160059ba..22d5a21c8fa 100644 --- a/tests/integration_tests/owner_remove_table_error/run.sh +++ b/tests/integration_tests/owner_remove_table_error/run.sh @@ -14,7 +14,7 @@ MAX_RETRIES=20 function run() { # kafka is not supported yet. - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/processor_etcd_worker_delay/run.sh b/tests/integration_tests/processor_etcd_worker_delay/run.sh index 24107fefd9d..0dacb38e5f4 100644 --- a/tests/integration_tests/processor_etcd_worker_delay/run.sh +++ b/tests/integration_tests/processor_etcd_worker_delay/run.sh @@ -14,7 +14,7 @@ MAX_RETRIES=20 function run() { # kafka is not supported yet. - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/processor_resolved_ts_fallback/run.sh b/tests/integration_tests/processor_resolved_ts_fallback/run.sh index 732039c7b51..0b494ff7272 100755 --- a/tests/integration_tests/processor_resolved_ts_fallback/run.sh +++ b/tests/integration_tests/processor_resolved_ts_fallback/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 function run() { # TODO: kafka sink has bug with this case, remove this after bug is fixed - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index becd070f264..adbd9027dbe 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -7,6 +7,23 @@ CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) sink_type=$1 group=$2 +# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant +# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence +# multi_cdc_cluster capture_suicide_while_balance_table +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint" +mysql_only_http="http_api http_api_tls api_v2" +mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_s3" + +# Tests that need to support kafka: bank kill_owner_with_ddl owner_remove_table_error +# owner_resign processor_etcd_worker_delay processor_resolved_ts_fallback +# multi_changefeed clustered_index sink_hang +kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume" +kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics" +kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" + +storage_only_csv="csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" +storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table" + # Define groups # Note: If new group is added, the group name must also be added to CI # * https://github.com/PingCAP-QE/ci/blob/main/pipelines/pingcap/tiflow/latest/pull_cdc_integration_kafka_test.groovy @@ -15,32 +32,31 @@ group=$2 # Putting multiple light tests together and heavy tests in a separate group. declare -A groups groups=( - ["G00"]='changefeed_error ddl_sequence force_replicate_table' - ["G01"]='multi_capture kafka_big_messages cdc' - ["G02"]='drop_many_tables multi_cdc_cluster processor_stop_delay' - ["G03"]='capture_suicide_while_balance_table row_format ddl_only_block_related_table ddl_manager' - ["G04"]='foreign_key canal_json_basic ddl_puller_lag owner_resign' + ["G00"]="$mysql_only $kafka_only $storage_only_csv" + ["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json" + ["G02"]="$mysql_only_consistent_replicate $kafka_only_v2" + ["G03"]='row_format drop_many_tables processor_stop_delay' + ["G04"]='foreign_key ddl_puller_lag ddl_only_block_related_table' ["G05"]='partition_table changefeed_auto_stop' - ["G06"]='charset_gbk owner_remove_table_error bdr_mode' - ["G07"]='clustered_index multi_tables_ddl big_txn_v2' - ["G08"]='bank multi_source kafka_sink_error_resume' - ["G09"]='capture_suicide_while_balance_table' - ["G10"]='multi_topics_v2 consistent_replicate_storage_s3 sink_retry' - ["G11"]='consistent_replicate_storage_file kv_client_stream_reconnect consistent_replicate_gbk' - ["G12"]='http_api changefeed_fast_fail tidb_mysql_test server_config_compatibility' - ["G13"]='canal_json_adapter_compatibility resourcecontrol processor_etcd_worker_delay' - ["G14"]='batch_update_to_no_batch gc_safepoint default_value changefeed_pause_resume' - ["G15"]='cli simple cdc_server_tips changefeed_resume_with_checkpoint_ts ddl_reentrant' - ["G16"]='processor_err_chan resolve_lock move_table kafka_compression autorandom' - ["G17"]='ddl_attributes many_pk_or_uk kafka_messages capture_session_done_during_task http_api_tls' + ["G06"]='charset_gbk owner_remove_table_error ddl_manager' + ["G07"]='clustered_index multi_tables_ddl' + ["G08"]='bank multi_source multi_capture' + ["G09"]='ddl_reentrant multi_cdc_cluster' + ["G10"]='sink_retry changefeed_error ddl_sequence' + ["G11"]='kv_client_stream_reconnect cdc default_value' + ["G12"]='changefeed_fast_fail tidb_mysql_test server_config_compatibility' + ["G13"]='resourcecontrol processor_etcd_worker_delay' + ["G14"]='batch_update_to_no_batch gc_safepoint changefeed_pause_resume' + ["G15"]='cli simple cdc_server_tips changefeed_resume_with_checkpoint_ts' + ["G16"]='processor_err_chan resolve_lock move_table autorandom' + ["G17"]='ddl_attributes many_pk_or_uk capture_session_done_during_task' ["G18"]='tiflash new_ci_collation_without_old_value region_merge common_1' - ["G19"]='kafka_big_messages_v2 multi_tables_ddl_v2 split_region availability' - ["G20"]='changefeed_reconstruct http_proxies kill_owner_with_ddl savepoint' - ["G21"]='event_filter generate_column syncpoint sequence processor_resolved_ts_fallback' - ["G22"]='big_txn csv_storage_basic changefeed_finish sink_hang canal_json_storage_basic' - ["G23"]='multi_topics new_ci_collation_with_old_value batch_add_table multi_changefeed' - ["G24"]='consistent_replicate_nfs consistent_replicate_ddl owner_resign api_v2' - ["G25"]='canal_json_storage_partition_table csv_storage_partition_table csv_storage_multi_tables_ddl' + ["G19"]='split_region availability kill_owner_with_ddl' + ["G20"]='changefeed_reconstruct http_proxies savepoint' + ["G21"]='event_filter generate_column sequence processor_resolved_ts_fallback' + ["G22"]='big_txn changefeed_finish sink_hang' + ["G23"]='new_ci_collation_with_old_value batch_add_table ' + ["G24"]='owner_resign force_replicate_table multi_changefeed' ) # Get other cases not in groups, to avoid missing any case diff --git a/tests/integration_tests/sequence/run.sh b/tests/integration_tests/sequence/run.sh index 702131d6015..ff90a243dac 100755 --- a/tests/integration_tests/sequence/run.sh +++ b/tests/integration_tests/sequence/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 function run() { # No need to test kafka. - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/sink_hang/run.sh b/tests/integration_tests/sink_hang/run.sh index d00e04557c8..3c33ad17f08 100644 --- a/tests/integration_tests/sink_hang/run.sh +++ b/tests/integration_tests/sink_hang/run.sh @@ -14,7 +14,7 @@ MAX_RETRIES=20 function run() { # kafka is not supported yet. - if [ "$SINK_TYPE" == "kafka" ]; then + if [ "$SINK_TYPE" != "mysql" ]; then return fi