diff --git a/tests/integration_tests/autorandom/run.sh b/tests/integration_tests/autorandom/run.sh index a811c5b1048..8c5bdcbe238 100644 --- a/tests/integration_tests/autorandom/run.sh +++ b/tests/integration_tests/autorandom/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-autorandom-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists autorandom_test.table_a ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/availability/run.sh b/tests/integration_tests/availability/run.sh index a22464580e1..1e1434125e4 100644 --- a/tests/integration_tests/availability/run.sh +++ b/tests/integration_tests/availability/run.sh @@ -9,6 +9,7 @@ source $CUR/capture.sh source $CUR/processor.sh WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test +SINK_TYPE=$1 export DOWN_TIDB_HOST export DOWN_TIDB_PORT @@ -22,9 +23,11 @@ function prepare() { } trap stop_tidb_cluster EXIT -prepare $* -test_owner_ha $* -test_capture_ha $* -test_processor_ha $* +if [ "$SINK_TYPE" == "mysql" ]; then + prepare $* + test_owner_ha $* + test_capture_ha $* + test_processor_ha $* +fi check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/bank/run.sh b/tests/integration_tests/bank/run.sh index eb213c25ae4..e0c1a5cd6de 100644 --- a/tests/integration_tests/bank/run.sh +++ b/tests/integration_tests/bank/run.sh @@ -25,7 +25,7 @@ function prepare() { } trap stop_tidb_cluster EXIT -# kafka is not supported yet. +# No need to support kafka and storage sink. if [ "$SINK_TYPE" == "mysql" ]; then prepare $* diff --git a/tests/integration_tests/batch_add_table/run.sh b/tests/integration_tests/batch_add_table/run.sh index 3cf1e9cd090..9d8ccf63236 100644 --- a/tests/integration_tests/batch_add_table/run.sh +++ b/tests/integration_tests/batch_add_table/run.sh @@ -25,12 +25,14 @@ function run() { TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/big_txn/run.sh b/tests/integration_tests/big_txn/run.sh index 8cd6ce355ea..e8c36eb277e 100755 --- a/tests/integration_tests/big_txn/run.sh +++ b/tests/integration_tests/big_txn/run.sh @@ -12,6 +12,11 @@ CDC_COUNT=3 DB_COUNT=4 function run() { + # storage is not supported yet. + if [ "$SINK_TYPE" == "storage" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -25,6 +30,7 @@ function run() { TOPIC_NAME="ticdc-big-txn-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" @@ -32,9 +38,10 @@ function run() { run_sql "CREATE DATABASE big_txn;" go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql "CREATE TABLE big_txn.usertable1 LIKE big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/capture_session_done_during_task/run.sh b/tests/integration_tests/capture_session_done_during_task/run.sh index c4fd0284178..6e2f4148ea5 100644 --- a/tests/integration_tests/capture_session_done_during_task/run.sh +++ b/tests/integration_tests/capture_session_done_during_task/run.sh @@ -17,6 +17,7 @@ function run() { TOPIC_NAME="ticdc-capture-session-done-during-task-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -34,9 +35,10 @@ function run() { # wait task is dispatched sleep 1 - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | head -n 1) lease=$(ETCDCTL_API=3 etcdctl get $capture_key -w json | grep -o 'lease":[0-9]*' | awk -F: '{print $2}') diff --git a/tests/integration_tests/cdc/run.sh b/tests/integration_tests/cdc/run.sh index 430fd4ff295..1feec288bcf 100755 --- a/tests/integration_tests/cdc/run.sh +++ b/tests/integration_tests/cdc/run.sh @@ -24,20 +24,24 @@ function prepare() { TOPIC_NAME="ticdc-cdc-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } trap stop_tidb_cluster EXIT -prepare $* - -cd "$(dirname "$0")" -set -o pipefail -GO111MODULE=on go run cdc.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log -cleanup_process $CDC_BINARY -check_logs $WORK_DIR +# storage is not supported yet. +if [ "$SINK_TYPE" != "storage" ]; then + prepare $* + cd "$(dirname "$0")" + set -o pipefail + GO111MODULE=on go run cdc.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log + cleanup_process $CDC_BINARY + check_logs $WORK_DIR +fi echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/cdc_server_tips/run.sh b/tests/integration_tests/cdc_server_tips/run.sh index 3c830dfd937..43d642474f1 100644 --- a/tests/integration_tests/cdc_server_tips/run.sh +++ b/tests/integration_tests/cdc_server_tips/run.sh @@ -48,12 +48,14 @@ function try_to_run_cdc() { TOPIC_NAME="ticdc-server-tips-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_server_tips&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac echo 'Succeed to create a changefeed, no usage tips should be printed' } diff --git a/tests/integration_tests/changefeed_auto_stop/run.sh b/tests/integration_tests/changefeed_auto_stop/run.sh index 5cfd628681e..ec38b91653a 100755 --- a/tests/integration_tests/changefeed_auto_stop/run.sh +++ b/tests/integration_tests/changefeed_auto_stop/run.sh @@ -31,12 +31,14 @@ function run() { TOPIC_NAME="ticdc-changefeed-auto-stop-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac changefeedid=$(cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac ensure $MAX_RETRIES check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ${changefeedid} "normal" "null" "" diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index b81ce6d9c30..62ce051bb4b 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -45,13 +45,15 @@ function run() { TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac changefeedid="changefeed-error" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" "" run_cdc_cli changefeed resume -c $changefeedid diff --git a/tests/integration_tests/changefeed_fast_fail/run.sh b/tests/integration_tests/changefeed_fast_fail/run.sh index 881ee79ff25..e097bd01415 100644 --- a/tests/integration_tests/changefeed_fast_fail/run.sh +++ b/tests/integration_tests/changefeed_fast_fail/run.sh @@ -10,8 +10,7 @@ SINK_TYPE=$1 MAX_RETRIES=20 function run() { - # it is no need to test kafka - # the logic are all the same + # No need to test kafka and storage sink, since the logic are all the same. if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/changefeed_finish/run.sh b/tests/integration_tests/changefeed_finish/run.sh index 121fae56825..e7e9e378a10 100755 --- a/tests/integration_tests/changefeed_finish/run.sh +++ b/tests/integration_tests/changefeed_finish/run.sh @@ -18,6 +18,7 @@ function run() { TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -27,9 +28,10 @@ function run() { target_ts=$(($now + 90 * 10 ** 3 * 2 ** 18)) changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --target-ts=$target_ts 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE DATABASE changefeed_finish;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table changefeed_finish.t (id int primary key auto_increment, t datetime DEFAULT CURRENT_TIMESTAMP)" diff --git a/tests/integration_tests/changefeed_pause_resume/run.sh b/tests/integration_tests/changefeed_pause_resume/run.sh index 4dcdb074ef5..8b0781adf62 100755 --- a/tests/integration_tests/changefeed_pause_resume/run.sh +++ b/tests/integration_tests/changefeed_pause_resume/run.sh @@ -18,14 +18,16 @@ function run() { TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE DATABASE changefeed_pause_resume;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} for i in $(seq 1 $TABLE_COUNT); do diff --git a/tests/integration_tests/changefeed_reconstruct/run.sh b/tests/integration_tests/changefeed_reconstruct/run.sh index 447acbd78bd..6d9d6e23688 100755 --- a/tests/integration_tests/changefeed_reconstruct/run.sh +++ b/tests/integration_tests/changefeed_reconstruct/run.sh @@ -41,15 +41,17 @@ function run() { TOPIC_NAME="ticdc-changefeed-reconstruct-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix server1 --pd $pd_addr owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE DATABASE changefeed_reconstruct;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_reconstruct diff --git a/tests/integration_tests/charset_gbk/run.sh b/tests/integration_tests/charset_gbk/run.sh index 300a81b0058..aa99a4f1a26 100755 --- a/tests/integration_tests/charset_gbk/run.sh +++ b/tests/integration_tests/charset_gbk/run.sh @@ -23,13 +23,15 @@ function run() { TOPIC_NAME="ticdc-charset-gbk-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac SINK_URI="mysql://normal:123456@127.0.0.1:3306/" cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/cli/run.sh b/tests/integration_tests/cli/run.sh index 91cd90b4f34..e1f8c95f3d7 100644 --- a/tests/integration_tests/cli/run.sh +++ b/tests/integration_tests/cli/run.sh @@ -38,14 +38,16 @@ function run() { TOPIC_NAME="ticdc-cli-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac uuid="custom-changefeed-name" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac # Make sure changefeed is created. check_table_exists test.simple ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/clustered_index/run.sh b/tests/integration_tests/clustered_index/run.sh index ca3b9db047c..1bee510b33d 100755 --- a/tests/integration_tests/clustered_index/run.sh +++ b/tests/integration_tests/clustered_index/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-clustered-index-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "set global tidb_enable_clustered_index=1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # TiDB global variables cache 2 seconds at most sleep 2 @@ -46,6 +48,7 @@ function run() { # kafka is not supported yet. # ref to issue: https://github.com/pingcap/tiflow/issues/3421 +# TODO: enable this test for kafka and storage sink. if [ "$SINK_TYPE" != "mysql" ]; then echo "[$(date)] <<<<<< skip test case $TEST_NAME for $SINK_TYPE! >>>>>>" exit 0 diff --git a/tests/integration_tests/common_1/run.sh b/tests/integration_tests/common_1/run.sh index 8bc55a8354f..bb2fd82c0ba 100644 --- a/tests/integration_tests/common_1/run.sh +++ b/tests/integration_tests/common_1/run.sh @@ -9,6 +9,11 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { + # storage is not supported yet. + if [ "$SINK_TYPE" == "storage" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -38,12 +43,14 @@ function run() { TOPIC_NAME="ticdc-common-1-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # TODO: refine the release detection after 5.0 tag of TiDB is ready diff --git a/tests/integration_tests/ddl_attributes/run.sh b/tests/integration_tests/ddl_attributes/run.sh index eaf6e263eee..04e315d723f 100644 --- a/tests/integration_tests/ddl_attributes/run.sh +++ b/tests/integration_tests/ddl_attributes/run.sh @@ -23,12 +23,16 @@ function run() { TOPIC_NAME="ticdc-ddl-attributes-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" - fi + + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first sleep 3 diff --git a/tests/integration_tests/ddl_manager/run.sh b/tests/integration_tests/ddl_manager/run.sh index 29dbb7452b1..d7b7b16a4e5 100755 --- a/tests/integration_tests/ddl_manager/run.sh +++ b/tests/integration_tests/ddl_manager/run.sh @@ -24,14 +24,16 @@ function run() { TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; esac changefeed_id="ddl-manager" run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/ddl_only_block_related_table/run.sh b/tests/integration_tests/ddl_only_block_related_table/run.sh index 1af74ce455a..caf8e913ef4 100755 --- a/tests/integration_tests/ddl_only_block_related_table/run.sh +++ b/tests/integration_tests/ddl_only_block_related_table/run.sh @@ -62,14 +62,16 @@ function run() { TOPIC_NAME="ticdc-common-1-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; esac changefeed_id="ddl-only-block-related-table" run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/start.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/ddl_puller_lag/run.sh b/tests/integration_tests/ddl_puller_lag/run.sh index 6b7e790acce..ca2c47c28a1 100644 --- a/tests/integration_tests/ddl_puller_lag/run.sh +++ b/tests/integration_tests/ddl_puller_lag/run.sh @@ -26,12 +26,14 @@ function prepare() { TOPIC_NAME="ticdc-ddl-puller-lag-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=ddl_puller_lag&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } function sql_check() { diff --git a/tests/integration_tests/ddl_reentrant/run.sh b/tests/integration_tests/ddl_reentrant/run.sh index 60995d62123..12b3b8500cd 100644 --- a/tests/integration_tests/ddl_reentrant/run.sh +++ b/tests/integration_tests/ddl_reentrant/run.sh @@ -118,7 +118,7 @@ function ddl_test() { } function run() { - # don't test kafka in this case + # No need to test kafka and storage sink. if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/ddl_sequence/run.sh b/tests/integration_tests/ddl_sequence/run.sh index 3808a551dc9..5070e33b126 100644 --- a/tests/integration_tests/ddl_sequence/run.sh +++ b/tests/integration_tests/ddl_sequence/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists ddl_sequence.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/default_value/run.sh b/tests/integration_tests/default_value/run.sh index 37b668c7b51..b019f17fbef 100755 --- a/tests/integration_tests/default_value/run.sh +++ b/tests/integration_tests/default_value/run.sh @@ -24,26 +24,31 @@ function prepare() { TOPIC_NAME="ticdc-default-value-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } trap stop_tidb_cluster EXIT -prepare $* - -cd "$(dirname "$0")" -set -o pipefail -GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log -check_table_exists mark.finish_mark_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -# ticdc cost too much sink DDL, just leave more time here -check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 900 -check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_sync_diff $WORK_DIR $CUR/diff_config.toml -cleanup_process $CDC_BINARY -check_logs $WORK_DIR + +# storage is not supported yet. +if [ "$SINK_TYPE" != "storage" ]; then + prepare $* + cd "$(dirname "$0")" + set -o pipefail + GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log + check_table_exists mark.finish_mark_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + # ticdc cost too much sink DDL, just leave more time here + check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 900 + check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_sync_diff $WORK_DIR $CUR/diff_config.toml + cleanup_process $CDC_BINARY + check_logs $WORK_DIR +fi echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/drop_many_tables/run.sh b/tests/integration_tests/drop_many_tables/run.sh index 295e604a2f3..735334e5cf4 100644 --- a/tests/integration_tests/drop_many_tables/run.sh +++ b/tests/integration_tests/drop_many_tables/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-drop-tables-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists drop_tables.c ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/event_filter/run.sh b/tests/integration_tests/event_filter/run.sh index c25f06a23a4..b996adc4db7 100644 --- a/tests/integration_tests/event_filter/run.sh +++ b/tests/integration_tests/event_filter/run.sh @@ -19,15 +19,17 @@ function run() { TOPIC_NAME="ticdc-event-filter-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac # create changefeed run_cdc_cli changefeed create --sink-uri="$SINK_URI" --server="127.0.0.1:8300" --config=$CUR/conf/cf.toml - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/force_replicate_table/run.sh b/tests/integration_tests/force_replicate_table/run.sh index aeceb4637dd..f386ee5bd86 100755 --- a/tests/integration_tests/force_replicate_table/run.sh +++ b/tests/integration_tests/force_replicate_table/run.sh @@ -60,12 +60,16 @@ function run() { TOPIC_NAME="ticdc-force_replicate_table-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?safe-mode=true" ;; esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml - fi + + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" ;; + esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} for i in $(seq 0 6); do diff --git a/tests/integration_tests/foreign_key/run.sh b/tests/integration_tests/foreign_key/run.sh index 0112881e8a9..78a3fea069e 100644 --- a/tests/integration_tests/foreign_key/run.sh +++ b/tests/integration_tests/foreign_key/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-foreign-key-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists foreign_key.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/gc_safepoint/run.sh b/tests/integration_tests/gc_safepoint/run.sh index 45f6097ac7c..58a09b49e0b 100755 --- a/tests/integration_tests/gc_safepoint/run.sh +++ b/tests/integration_tests/gc_safepoint/run.sh @@ -72,14 +72,16 @@ function run() { TOPIC_NAME="ticdc-gc-safepoint-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/txnutil/gc/InjectGcSafepointUpdateInterval=return(500)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac pd_cluster_id=$(curl -s $pd_addr/pd/api/v1/cluster | grep -oE "id\":\s[0-9]+" | grep -oE "[0-9]+") clear_gc_worker_safepoint $pd_addr $pd_cluster_id diff --git a/tests/integration_tests/generate_column/run.sh b/tests/integration_tests/generate_column/run.sh index a6d864a62c3..bd9a0cca0e6 100644 --- a/tests/integration_tests/generate_column/run.sh +++ b/tests/integration_tests/generate_column/run.sh @@ -9,6 +9,11 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { + # storage is not supported yet. + if [ "$SINK_TYPE" == "storage" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -23,12 +28,14 @@ function run() { TOPIC_NAME="ticdc-generate-column-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists generate_column.t ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/http_proxies/run.sh b/tests/integration_tests/http_proxies/run.sh index 74239cf6161..7ef620df91f 100644 --- a/tests/integration_tests/http_proxies/run.sh +++ b/tests/integration_tests/http_proxies/run.sh @@ -6,6 +6,8 @@ CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test +SINK_TYPE=$1 + TEST_HOST_LIST=(UP_TIDB_HOST UP_PD_HOST_{1..3} UP_TIKV_HOST_{1..3}) # FIXME: hostname in macOS doesn't support -I option. lan_addrs=($(hostname -I)) @@ -83,9 +85,10 @@ function check() { trap "stop_tidb_cluster && stop_proxy" EXIT -prepare -sleep 5 -check - -check_logs $WORK_DIR +if [ "$SINK_TYPE" == "mysql" ]; then + prepare + sleep 5 + check + check_logs $WORK_DIR +fi echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/kafka_big_txn_v2/run.sh b/tests/integration_tests/kafka_big_txn_v2/run.sh index bd449ff6ccb..f0c88efa38a 100755 --- a/tests/integration_tests/kafka_big_txn_v2/run.sh +++ b/tests/integration_tests/kafka_big_txn_v2/run.sh @@ -29,6 +29,7 @@ function run() { TOPIC_NAME="ticdc-big-txn-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" @@ -36,9 +37,10 @@ function run() { run_sql "CREATE DATABASE big_txn;" go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql "CREATE TABLE big_txn.usertable1 LIKE big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/kv_client_stream_reconnect/run.sh b/tests/integration_tests/kv_client_stream_reconnect/run.sh index 3989786cb4e..627635d6700 100644 --- a/tests/integration_tests/kv_client_stream_reconnect/run.sh +++ b/tests/integration_tests/kv_client_stream_reconnect/run.sh @@ -20,6 +20,7 @@ function run() { TOPIC_NAME="kv-client-stream-reconnect-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -27,9 +28,10 @@ function run() { export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/kv/kvClientForceReconnect=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE DATABASE kv_client_stream_reconnect;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/many_pk_or_uk/run.sh b/tests/integration_tests/many_pk_or_uk/run.sh index 4cb7e823647..8421346f54e 100755 --- a/tests/integration_tests/many_pk_or_uk/run.sh +++ b/tests/integration_tests/many_pk_or_uk/run.sh @@ -24,22 +24,27 @@ function prepare() { TOPIC_NAME="ticdc-many-pk-or-uk-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } trap stop_tidb_cluster EXIT -prepare $* - -cd "$(dirname "$0")" -set -o pipefail -GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log -check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} -check_sync_diff $WORK_DIR $CUR/diff_config.toml -cleanup_process $CDC_BINARY -check_logs $WORK_DIR + +# storage is not supported yet. +if [ "$SINK_TYPE" != "storage" ]; then + prepare $* + cd "$(dirname "$0")" + set -o pipefail + GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/diff_config.toml + cleanup_process $CDC_BINARY + check_logs $WORK_DIR +fi echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/move_table/run.sh b/tests/integration_tests/move_table/run.sh index 5e80ed53b9b..a0482102af5 100644 --- a/tests/integration_tests/move_table/run.sh +++ b/tests/integration_tests/move_table/run.sh @@ -24,13 +24,15 @@ function run() { TOPIC_NAME="ticdc-move-table-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 diff --git a/tests/integration_tests/multi_capture/run.sh b/tests/integration_tests/multi_capture/run.sh index 7beb0087292..711fe22ba02 100755 --- a/tests/integration_tests/multi_capture/run.sh +++ b/tests/integration_tests/multi_capture/run.sh @@ -36,12 +36,14 @@ function run() { TOPIC_NAME="ticdc-multi-capture-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --server="127.0.0.1:8301" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac # check tables are created and data is synchronized for i in $(seq $DB_COUNT); do diff --git a/tests/integration_tests/multi_changefeed/run.sh b/tests/integration_tests/multi_changefeed/run.sh index f6850b5d594..df4b84fb41f 100755 --- a/tests/integration_tests/multi_changefeed/run.sh +++ b/tests/integration_tests/multi_changefeed/run.sh @@ -56,7 +56,6 @@ function check_old_value_enabled() { export -f check_old_value_enabled function run() { - # kafka is not supported yet. if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/multi_source/run.sh b/tests/integration_tests/multi_source/run.sh index 615deae94f2..ae658f2d5a8 100755 --- a/tests/integration_tests/multi_source/run.sh +++ b/tests/integration_tests/multi_source/run.sh @@ -24,27 +24,31 @@ function prepare() { TOPIC_NAME="ticdc-multi-source-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } trap stop_tidb_cluster EXIT -prepare $* - -cd "$(dirname "$0")" -set -o pipefail -GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log -check_table_exists mark.finish_mark_0 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_table_exists mark.finish_mark_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 -check_sync_diff $WORK_DIR $CUR/diff_config.toml -cleanup_process $CDC_BINARY -check_logs $WORK_DIR +# storage is not supported yet. +if [ "$SINK_TYPE" != "storage" ]; then + prepare $* + cd "$(dirname "$0")" + set -o pipefail + GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log + check_table_exists mark.finish_mark_0 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_sync_diff $WORK_DIR $CUR/diff_config.toml + cleanup_process $CDC_BINARY + check_logs $WORK_DIR +fi echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/multi_tables_ddl/run.sh b/tests/integration_tests/multi_tables_ddl/run.sh index 2ca909bf873..6b59c01a030 100755 --- a/tests/integration_tests/multi_tables_ddl/run.sh +++ b/tests/integration_tests/multi_tables_ddl/run.sh @@ -41,6 +41,10 @@ stop() { s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket function run() { + if [ "$SINK_TYPE" == "storage" ]; then + return + fi + start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR diff --git a/tests/integration_tests/new_ci_collation_with_old_value/run.sh b/tests/integration_tests/new_ci_collation_with_old_value/run.sh index f7963d1ef9d..a246b518a0f 100755 --- a/tests/integration_tests/new_ci_collation_with_old_value/run.sh +++ b/tests/integration_tests/new_ci_collation_with_old_value/run.sh @@ -9,6 +9,11 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { + # storage is not supported yet. + if [ "$SINK_TYPE" == "storage" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR --tidb-config $CUR/conf/tidb_config.toml @@ -23,12 +28,16 @@ function run() { TOPIC_NAME="ticdc-new_ci_collation_with_old_value-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?safe-mode=true" ;; esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml - fi + + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" ;; + esac run_sql_file $CUR/data/test1.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} for i in $(seq 1 5); do diff --git a/tests/integration_tests/new_ci_collation_without_old_value/run.sh b/tests/integration_tests/new_ci_collation_without_old_value/run.sh index 24534849954..d3c868fa13d 100755 --- a/tests/integration_tests/new_ci_collation_without_old_value/run.sh +++ b/tests/integration_tests/new_ci_collation_without_old_value/run.sh @@ -9,6 +9,11 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { + # storage is not supported yet. + if [ "$SINK_TYPE" == "storage" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR --tidb-config $CUR/conf/tidb_config.toml @@ -23,12 +28,14 @@ function run() { TOPIC_NAME="ticdc-new_ci_collation_without_old_value-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?safe-mode=true" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/test1.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} for i in $(seq 1 5); do diff --git a/tests/integration_tests/owner_resign/run.sh b/tests/integration_tests/owner_resign/run.sh index 76a07d4983d..8932d34da43 100755 --- a/tests/integration_tests/owner_resign/run.sh +++ b/tests/integration_tests/owner_resign/run.sh @@ -25,12 +25,14 @@ function run() { TOPIC_NAME="ticdc-owner-resign-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://kafka01:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --sink-uri="$SINK_URI" --server="127.0.0.1:8301" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE database owner_resign;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table owner_resign.t1(id int not null primary key, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/partition_table/run.sh b/tests/integration_tests/partition_table/run.sh index 3945c76df39..c0cc2daf8f4 100644 --- a/tests/integration_tests/partition_table/run.sh +++ b/tests/integration_tests/partition_table/run.sh @@ -24,12 +24,14 @@ function run() { TOPIC_NAME="ticdc-partition-table-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists partition_table.t ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/processor_err_chan/run.sh b/tests/integration_tests/processor_err_chan/run.sh index 3919bab5e7b..392d37828a2 100644 --- a/tests/integration_tests/processor_err_chan/run.sh +++ b/tests/integration_tests/processor_err_chan/run.sh @@ -17,6 +17,7 @@ function run() { TOPIC_NAME="ticdc-processor-err-chan-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -31,9 +32,10 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac retry_time=10 ensure $retry_time check_changefeed_state $pd_addr $changefeed_id "normal" "null" "" diff --git a/tests/integration_tests/processor_etcd_worker_delay/run.sh b/tests/integration_tests/processor_etcd_worker_delay/run.sh index 0dacb38e5f4..5c0b54d55f6 100644 --- a/tests/integration_tests/processor_etcd_worker_delay/run.sh +++ b/tests/integration_tests/processor_etcd_worker_delay/run.sh @@ -14,7 +14,7 @@ MAX_RETRIES=20 function run() { # kafka is not supported yet. - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" == "kafka" ]; then return fi @@ -26,15 +26,17 @@ function run() { TOPIC_NAME="processor-etcd-delay-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/orchestrator/ProcessorEtcdDelay=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE DATABASE processor_delay;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} for i in {1..50}; do diff --git a/tests/integration_tests/processor_resolved_ts_fallback/run.sh b/tests/integration_tests/processor_resolved_ts_fallback/run.sh index 0b494ff7272..c4e305ba893 100755 --- a/tests/integration_tests/processor_resolved_ts_fallback/run.sh +++ b/tests/integration_tests/processor_resolved_ts_fallback/run.sh @@ -10,7 +10,7 @@ SINK_TYPE=$1 function run() { # TODO: kafka sink has bug with this case, remove this after bug is fixed - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" == "kafka" ]; then return fi @@ -24,12 +24,14 @@ function run() { TOPIC_NAME="ticdc-processor-resolved-ts-fallback-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://kafka01:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --sink-uri="$SINK_URI" --server="127.0.0.1:8301" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE database processor_resolved_ts_fallback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table processor_resolved_ts_fallback.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/processor_stop_delay/run.sh b/tests/integration_tests/processor_stop_delay/run.sh index 57a2c371016..360a7a7d76a 100644 --- a/tests/integration_tests/processor_stop_delay/run.sh +++ b/tests/integration_tests/processor_stop_delay/run.sh @@ -18,15 +18,17 @@ function run() { TOPIC_NAME="ticdc-processor-stop-delay-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/processor/processorStopDelay=1*sleep(10000)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE DATABASE processor_stop_delay;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table processor_stop_delay.t (id int primary key auto_increment, t datetime DEFAULT CURRENT_TIMESTAMP)" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/region_merge/run.sh b/tests/integration_tests/region_merge/run.sh index bd80d8260cc..8ac809f91ec 100644 --- a/tests/integration_tests/region_merge/run.sh +++ b/tests/integration_tests/region_merge/run.sh @@ -38,12 +38,14 @@ function run() { TOPIC_NAME="ticdc-region-merge-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac # set max_execution_time to 30s, because split region could block even region has been split. run_sql "SET @@global.MAX_EXECUTION_TIME = 30000;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/resolve_lock/run.sh b/tests/integration_tests/resolve_lock/run.sh index 9e9cd41e547..1c7e7fcdaba 100755 --- a/tests/integration_tests/resolve_lock/run.sh +++ b/tests/integration_tests/resolve_lock/run.sh @@ -24,12 +24,14 @@ function prepare() { TOPIC_NAME="ticdc-resolve-lock-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/tidb-txn-mode=pessimistic" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } trap stop_tidb_cluster EXIT diff --git a/tests/integration_tests/resourcecontrol/run.sh b/tests/integration_tests/resourcecontrol/run.sh index 8dbec3abb86..53838813a5e 100644 --- a/tests/integration_tests/resourcecontrol/run.sh +++ b/tests/integration_tests/resourcecontrol/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-resourcecontrol-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists resourcecontrol.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/row_format/run.sh b/tests/integration_tests/row_format/run.sh index c327b2643a5..c37a1a7dbe0 100644 --- a/tests/integration_tests/row_format/run.sh +++ b/tests/integration_tests/row_format/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-row-format-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "SET GLOBAL tidb_row_format_version = 1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} sleep 2 diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 00d7a7f0084..818ff06b2f2 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -14,9 +14,6 @@ mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3" -# Tests that need to support kafka: bank kill_owner_with_ddl owner_remove_table_error -# owner_resign processor_etcd_worker_delay processor_resolved_ts_fallback -# multi_changefeed clustered_index sink_hang kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume" kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" @@ -33,31 +30,33 @@ storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_t declare -A groups groups=( # Note: only the tests in the first three groups are running in storage sink pipeline. - ["G00"]="$mysql_only $kafka_only $storage_only_csv" - ["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json" - ["G02"]="$mysql_only_consistent_replicate $kafka_only_v2" - ["G03"]='row_format drop_many_tables processor_stop_delay' - ["G04"]='foreign_key ddl_puller_lag ddl_only_block_related_table' - ["G05"]='partition_table changefeed_auto_stop' - ["G06"]='charset_gbk owner_remove_table_error ddl_manager' - ["G07"]='clustered_index multi_tables_ddl' - ["G08"]='bank multi_source multi_capture' - ["G09"]='ddl_reentrant multi_cdc_cluster' - ["G10"]='sink_retry changefeed_error ddl_sequence' - ["G11"]='kv_client_stream_reconnect cdc default_value' - ["G12"]='changefeed_fast_fail tidb_mysql_test server_config_compatibility' - ["G13"]='resourcecontrol processor_etcd_worker_delay' - ["G14"]='batch_update_to_no_batch gc_safepoint changefeed_pause_resume' - ["G15"]='cli simple cdc_server_tips changefeed_resume_with_checkpoint_ts' - ["G16"]='processor_err_chan resolve_lock move_table autorandom' - ["G17"]='ddl_attributes many_pk_or_uk capture_session_done_during_task' - ["G18"]='tiflash new_ci_collation_without_old_value region_merge common_1' - ["G19"]='split_region availability kill_owner_with_ddl' - ["G20"]='changefeed_reconstruct http_proxies savepoint' - ["G21"]='event_filter generate_column sequence processor_resolved_ts_fallback' - ["G22"]='big_txn changefeed_finish sink_hang' - ["G23"]='new_ci_collation_with_old_value batch_add_table ' - ["G24"]='owner_resign force_replicate_table multi_changefeed' + ["G00"]="$mysql_only $kafka_only $storage_only" + ["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl" + ["G02"]="$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv" + ["G03"]='row_format drop_many_tables processor_stop_delay partition_table' + ["G04"]='foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop' + ["G05"]='charset_gbk ddl_manager multi_source' + ["G06"]='sink_retry changefeed_error ddl_sequence resourcecontrol' + ["G07"]='kv_client_stream_reconnect cdc split_region' + ["G08"]='server_config_compatibility processor_err_chan changefeed_reconstruct multi_capture' + ["G09"]='gc_safepoint changefeed_pause_resume cli savepoint' + ["G10"]='default_value simple cdc_server_tips event_filter' + ["G11"]='resolve_lock move_table autorandom generate_column' + ["G12"]='many_pk_or_uk capture_session_done_during_task ddl_attributes' + ["G13"]='tiflash new_ci_collation_without_old_value region_merge common_1' + ["G14"]='big_txn changefeed_finish force_replicate_table' + ["G15"]='new_ci_collation_with_old_value batch_add_table' + # currently G16 is not running in kafka pipeline + ["G16"]='owner_resign processor_etcd_worker_delay sink_hang' + ["G17"]='clustered_index processor_resolved_ts_fallback' + # only run the following tests in mysql pipeline + ["G18"]='availability http_proxies sequence' + ["G19"]='changefeed_fast_fail batch_update_to_no_batch changefeed_resume_with_checkpoint_ts' + ["G20"]='tidb_mysql_test ddl_reentrant multi_cdc_cluster multi_changefeed' + ["G21"]='bank kill_owner_with_ddl owner_remove_table_error' + ["G22"]='' + ["G23"]='' + ["G24"]='' ) # Get other cases not in groups, to avoid missing any case diff --git a/tests/integration_tests/savepoint/run.sh b/tests/integration_tests/savepoint/run.sh index dabb4695676..883a9624689 100644 --- a/tests/integration_tests/savepoint/run.sh +++ b/tests/integration_tests/savepoint/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-savepoint-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists savepoint.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/sequence/run.sh b/tests/integration_tests/sequence/run.sh index ff90a243dac..3bc45696b37 100755 --- a/tests/integration_tests/sequence/run.sh +++ b/tests/integration_tests/sequence/run.sh @@ -9,7 +9,7 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - # No need to test kafka. + # No need to test kafka and storage sink. if [ "$SINK_TYPE" != "mysql" ]; then return fi diff --git a/tests/integration_tests/server_config_compatibility/run.sh b/tests/integration_tests/server_config_compatibility/run.sh index b7ba7c96b08..8d44e514b8c 100644 --- a/tests/integration_tests/server_config_compatibility/run.sh +++ b/tests/integration_tests/server_config_compatibility/run.sh @@ -26,12 +26,14 @@ function prepare() { TOPIC_NAME="ticdc-simple-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_test_simple&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } function sql_check() { diff --git a/tests/integration_tests/simple/run.sh b/tests/integration_tests/simple/run.sh index 7a50b2e6037..bb1a76f67ff 100644 --- a/tests/integration_tests/simple/run.sh +++ b/tests/integration_tests/simple/run.sh @@ -26,12 +26,14 @@ function prepare() { TOPIC_NAME="ticdc-simple-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_test_simple&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } function sql_check() { diff --git a/tests/integration_tests/sink_hang/run.sh b/tests/integration_tests/sink_hang/run.sh index 0de76e0a3aa..89c05e80b29 100644 --- a/tests/integration_tests/sink_hang/run.sh +++ b/tests/integration_tests/sink_hang/run.sh @@ -14,7 +14,7 @@ MAX_RETRIES=20 function run() { # kafka is not supported yet. - if [ "$SINK_TYPE" != "mysql" ]; then + if [ "$SINK_TYPE" == "kafka" ]; then return fi @@ -26,15 +26,17 @@ function run() { TOPIC_NAME="ticdc-sink-hang-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkExecDMLError=2*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table sink_hang.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/sink_retry/run.sh b/tests/integration_tests/sink_retry/run.sh index aca983c18c5..025f5ae0d4f 100755 --- a/tests/integration_tests/sink_retry/run.sh +++ b/tests/integration_tests/sink_retry/run.sh @@ -27,12 +27,14 @@ function run() { TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql "CREATE TABLE sink_retry.finish_mark_1 (a int primary key);" sleep 30 diff --git a/tests/integration_tests/split_region/run.sh b/tests/integration_tests/split_region/run.sh index 6b7b63bc9c8..6f8983e08c8 100755 --- a/tests/integration_tests/split_region/run.sh +++ b/tests/integration_tests/split_region/run.sh @@ -25,12 +25,14 @@ function run() { TOPIC_NAME="ticdc-split-region-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists split_region.test1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/tidb_mysql_test/run.sh b/tests/integration_tests/tidb_mysql_test/run.sh index d14c93d118d..177a26e80d9 100755 --- a/tests/integration_tests/tidb_mysql_test/run.sh +++ b/tests/integration_tests/tidb_mysql_test/run.sh @@ -24,20 +24,22 @@ function prepare() { TOPIC_NAME="ticdc-default-value-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac #run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" SINK_PARA="{\"force_replicate\":true, \"changefeed_id\":\"tidb-mysql-test\", \"sink_uri\":\"$SINK_URI\", \"start_ts\":$start_ts}" curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v1/changefeeds -d "$SINK_PARA" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac } -# kafka test is not supported yet. +# kafka and storage test is not supported yet. # Because:(1) most cases has no pk/uk, consumer will receive more than one same DML # (2) kafka consumer need support force_replicate -if [ "$SINK_TYPE" = "kafka" ]; then +if [ "$SINK_TYPE" != "mysql" ]; then echo "[$(date)] <<<<<< skip test case $TEST_NAME for kafka! >>>>>>" exit 0 fi diff --git a/tests/integration_tests/tiflash/run.sh b/tests/integration_tests/tiflash/run.sh index bd263e03ff0..2d0ab2294fc 100644 --- a/tests/integration_tests/tiflash/run.sh +++ b/tests/integration_tests/tiflash/run.sh @@ -23,12 +23,14 @@ function run() { TOPIC_NAME="ticdc-tiflash-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" - fi + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists cdc_tiflash_test.multi_data_type ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}