forked from pingcap/tiflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sink(ticdc): add some integration tests for storage sink (pingcap#9124)
ref pingcap#8772
- Loading branch information
1 parent
5afd635
commit 5b876a0
Showing
62 changed files
with
402 additions
and
250 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,12 +23,14 @@ function run() { | |
TOPIC_NAME="ticdc-autorandom-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/" ;; | ||
esac | ||
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first | ||
check_table_exists autorandom_test.table_a ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,12 +25,14 @@ function run() { | |
TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/" ;; | ||
esac | ||
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,11 @@ CDC_COUNT=3 | |
DB_COUNT=4 | ||
|
||
function run() { | ||
# storage is not supported yet. | ||
if [ "$SINK_TYPE" == "storage" ]; then | ||
return | ||
fi | ||
|
||
rm -rf $WORK_DIR && mkdir -p $WORK_DIR | ||
|
||
start_tidb_cluster --workdir $WORK_DIR | ||
|
@@ -25,16 +30,18 @@ function run() { | |
TOPIC_NAME="ticdc-big-txn-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306?transaction-atomicity=none" ;; | ||
esac | ||
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
|
||
run_sql "CREATE DATABASE big_txn;" | ||
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn | ||
|
||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} | ||
run_sql "CREATE TABLE big_txn.usertable1 LIKE big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ function run() { | |
TOPIC_NAME="ticdc-capture-session-done-during-task-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;; | ||
esac | ||
|
||
|
@@ -34,9 +35,10 @@ function run() { | |
# wait task is dispatched | ||
sleep 1 | ||
|
||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | head -n 1) | ||
lease=$(ETCDCTL_API=3 etcdctl get $capture_key -w json | grep -o 'lease":[0-9]*' | awk -F: '{print $2}') | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,20 +24,24 @@ function prepare() { | |
TOPIC_NAME="ticdc-cdc-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/" ;; | ||
esac | ||
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
} | ||
|
||
trap stop_tidb_cluster EXIT | ||
prepare $* | ||
|
||
cd "$(dirname "$0")" | ||
set -o pipefail | ||
GO111MODULE=on go run cdc.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log | ||
cleanup_process $CDC_BINARY | ||
check_logs $WORK_DIR | ||
# storage is not supported yet. | ||
if [ "$SINK_TYPE" != "storage" ]; then | ||
prepare $* | ||
cd "$(dirname "$0")" | ||
set -o pipefail | ||
GO111MODULE=on go run cdc.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log | ||
cleanup_process $CDC_BINARY | ||
check_logs $WORK_DIR | ||
fi | ||
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,12 +48,14 @@ function try_to_run_cdc() { | |
TOPIC_NAME="ticdc-server-tips-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_server_tips&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql+ssl://normal:[email protected]:3306/" ;; | ||
esac | ||
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
echo 'Succeed to create a changefeed, no usage tips should be printed' | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,12 +31,14 @@ function run() { | |
TOPIC_NAME="ticdc-changefeed-auto-stop-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/" ;; | ||
esac | ||
changefeedid=$(cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
ensure $MAX_RETRIES check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ${changefeedid} "normal" "null" "" | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,13 +45,15 @@ function run() { | |
TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;; | ||
esac | ||
changefeedid="changefeed-error" | ||
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" "" | ||
run_cdc_cli changefeed resume -c $changefeedid | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ function run() { | |
TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;; | ||
esac | ||
|
||
|
@@ -27,9 +28,10 @@ function run() { | |
target_ts=$(($now + 90 * 10 ** 3 * 2 ** 18)) | ||
changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --target-ts=$target_ts 2>&1 | tail -n2 | head -n1 | awk '{print $2}') | ||
|
||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
run_sql "CREATE DATABASE changefeed_finish;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
run_sql "CREATE table changefeed_finish.t (id int primary key auto_increment, t datetime DEFAULT CURRENT_TIMESTAMP)" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,16 @@ function run() { | |
TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;; | ||
esac | ||
|
||
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr | ||
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
run_sql "CREATE DATABASE changefeed_pause_resume;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
for i in $(seq 1 $TABLE_COUNT); do | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,15 +41,17 @@ function run() { | |
TOPIC_NAME="ticdc-changefeed-reconstruct-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;; | ||
esac | ||
|
||
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix server1 --pd $pd_addr | ||
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') | ||
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
run_sql "CREATE DATABASE changefeed_reconstruct;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_reconstruct | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,13 +23,15 @@ function run() { | |
TOPIC_NAME="ticdc-charset-gbk-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/" ;; | ||
esac | ||
SINK_URI="mysql://normal:[email protected]:3306/" | ||
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,14 +38,16 @@ function run() { | |
TOPIC_NAME="ticdc-cli-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/" ;; | ||
esac | ||
|
||
uuid="custom-changefeed-name" | ||
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
# Make sure changefeed is created. | ||
check_table_exists test.simple ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,12 +23,14 @@ function run() { | |
TOPIC_NAME="ticdc-clustered-index-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://normal:[email protected]:3306/" ;; | ||
esac | ||
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
run_sql "set global tidb_enable_clustered_index=1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
# TiDB global variables cache 2 seconds at most | ||
sleep 2 | ||
|
@@ -46,6 +48,7 @@ function run() { | |
|
||
# kafka is not supported yet. | ||
# ref to issue: https://github.com/pingcap/tiflow/issues/3421 | ||
# TODO: enable this test for kafka and storage sink. | ||
if [ "$SINK_TYPE" != "mysql" ]; then | ||
echo "[$(date)] <<<<<< skip test case $TEST_NAME for $SINK_TYPE! >>>>>>" | ||
exit 0 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,11 @@ CDC_BINARY=cdc.test | |
SINK_TYPE=$1 | ||
|
||
function run() { | ||
# storage is not supported yet. | ||
if [ "$SINK_TYPE" == "storage" ]; then | ||
return | ||
fi | ||
|
||
rm -rf $WORK_DIR && mkdir -p $WORK_DIR | ||
|
||
start_tidb_cluster --workdir $WORK_DIR | ||
|
@@ -38,12 +43,14 @@ function run() { | |
TOPIC_NAME="ticdc-common-1-test-$RANDOM" | ||
case $SINK_TYPE in | ||
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; | ||
*) SINK_URI="mysql://[email protected]:3306/" ;; | ||
esac | ||
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" | ||
if [ "$SINK_TYPE" == "kafka" ]; then | ||
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" | ||
fi | ||
case $SINK_TYPE in | ||
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; | ||
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; | ||
esac | ||
|
||
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} | ||
# TODO: refine the release detection after 5.0 tag of TiDB is ready | ||
|
Oops, something went wrong.