Skip to content

Commit

Permalink
sink(ticdc): add some integration tests for storage sink (#9124)
Browse files Browse the repository at this point in the history
ref #8772
  • Loading branch information
CharlesCheung96 committed Jul 3, 2023
1 parent 5afd635 commit 5b876a0
Show file tree
Hide file tree
Showing 62 changed files with 402 additions and 250 deletions.
8 changes: 5 additions & 3 deletions tests/integration_tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ function run() {
TOPIC_NAME="ticdc-autorandom-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists autorandom_test.table_a ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand Down
11 changes: 7 additions & 4 deletions tests/integration_tests/availability/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ source $CUR/capture.sh
source $CUR/processor.sh
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

export DOWN_TIDB_HOST
export DOWN_TIDB_PORT
Expand All @@ -22,9 +23,11 @@ function prepare() {
}

trap stop_tidb_cluster EXIT
prepare $*
test_owner_ha $*
test_capture_ha $*
test_processor_ha $*
if [ "$SINK_TYPE" == "mysql" ]; then
prepare $*
test_owner_ha $*
test_capture_ha $*
test_processor_ha $*
fi
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion tests/integration_tests/bank/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function prepare() {
}

trap stop_tidb_cluster EXIT
# kafka is not supported yet.
# No need to support kafka and storage sink.
if [ "$SINK_TYPE" == "mysql" ]; then
prepare $*

Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ function run() {
TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
13 changes: 10 additions & 3 deletions tests/integration_tests/big_txn/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ CDC_COUNT=3
DB_COUNT=4

function run() {
# storage is not supported yet.
if [ "$SINK_TYPE" == "storage" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
Expand All @@ -25,16 +30,18 @@ function run() {
TOPIC_NAME="ticdc-big-txn-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306?transaction-atomicity=none" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"

run_sql "CREATE DATABASE big_txn;"
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "CREATE TABLE big_txn.usertable1 LIKE big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function run() {
TOPIC_NAME="ticdc-capture-session-done-during-task-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;;
esac

Expand All @@ -34,9 +35,10 @@ function run() {
# wait task is dispatched
sleep 1

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | head -n 1)
lease=$(ETCDCTL_API=3 etcdctl get $capture_key -w json | grep -o 'lease":[0-9]*' | awk -F: '{print $2}')
Expand Down
24 changes: 14 additions & 10 deletions tests/integration_tests/cdc/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ function prepare() {
TOPIC_NAME="ticdc-cdc-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac
}

trap stop_tidb_cluster EXIT
prepare $*

cd "$(dirname "$0")"
set -o pipefail
GO111MODULE=on go run cdc.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log
cleanup_process $CDC_BINARY
check_logs $WORK_DIR
# storage is not supported yet.
if [ "$SINK_TYPE" != "storage" ]; then
prepare $*
cd "$(dirname "$0")"
set -o pipefail
GO111MODULE=on go run cdc.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log
cleanup_process $CDC_BINARY
check_logs $WORK_DIR
fi
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
8 changes: 5 additions & 3 deletions tests/integration_tests/cdc_server_tips/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ function try_to_run_cdc() {
TOPIC_NAME="ticdc-server-tips-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_server_tips&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql+ssl://normal:[email protected]:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac
echo 'Succeed to create a changefeed, no usage tips should be printed'
}

Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ function run() {
TOPIC_NAME="ticdc-changefeed-auto-stop-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
changefeedid=$(cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

ensure $MAX_RETRIES check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ${changefeedid} "normal" "null" ""

Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ function run() {
TOPIC_NAME="ticdc-sink-retry-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;;
esac
changefeedid="changefeed-error"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" ""
run_cdc_cli changefeed resume -c $changefeedid
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests/changefeed_fast_fail/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ SINK_TYPE=$1
MAX_RETRIES=20

function run() {
# it is no need to test kafka
# the logic are all the same
# No need to test kafka and storage sink, since the logic are all the same.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi
Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ function run() {
TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;;
esac

Expand All @@ -27,9 +28,10 @@ function run() {
target_ts=$(($now + 90 * 10 ** 3 * 2 ** 18))
changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --target-ts=$target_ts 2>&1 | tail -n2 | head -n1 | awk '{print $2}')

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

run_sql "CREATE DATABASE changefeed_finish;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table changefeed_finish.t (id int primary key auto_increment, t datetime DEFAULT CURRENT_TIMESTAMP)"
Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ function run() {
TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;;
esac

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

run_sql "CREATE DATABASE changefeed_pause_resume;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
for i in $(seq 1 $TABLE_COUNT); do
Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/changefeed_reconstruct/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ function run() {
TOPIC_NAME="ticdc-changefeed-reconstruct-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;;
esac

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix server1 --pd $pd_addr
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

run_sql "CREATE DATABASE changefeed_reconstruct;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_reconstruct
Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/charset_gbk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ function run() {
TOPIC_NAME="ticdc-charset-gbk-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
SINK_URI="mysql://normal:[email protected]:3306/"
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first

Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ function run() {
TOPIC_NAME="ticdc-cli-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac

uuid="custom-changefeed-name"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

# Make sure changefeed is created.
check_table_exists test.simple ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand Down
9 changes: 6 additions & 3 deletions tests/integration_tests/clustered_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ function run() {
TOPIC_NAME="ticdc-clustered-index-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac
run_sql "set global tidb_enable_clustered_index=1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# TiDB global variables cache 2 seconds at most
sleep 2
Expand All @@ -46,6 +48,7 @@ function run() {

# kafka is not supported yet.
# ref to issue: https://github.com/pingcap/tiflow/issues/3421
# TODO: enable this test for kafka and storage sink.
if [ "$SINK_TYPE" != "mysql" ]; then
echo "[$(date)] <<<<<< skip test case $TEST_NAME for $SINK_TYPE! >>>>>>"
exit 0
Expand Down
13 changes: 10 additions & 3 deletions tests/integration_tests/common_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# storage is not supported yet.
if [ "$SINK_TYPE" == "storage" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
Expand Down Expand Up @@ -38,12 +43,14 @@ function run() {
TOPIC_NAME="ticdc-common-1-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
*) SINK_URI="mysql://[email protected]:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# TODO: refine the release detection after 5.0 tag of TiDB is ready
Expand Down
Loading

0 comments on commit 5b876a0

Please sign in to comment.