Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests(ticdc): add big txn integration test #5848

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestParseCfg(t *testing.T) {
KeyPath: "cc",
CertAllowedCN: []string{"dd", "ee"},
},
PerTableMemoryQuota: 10 * 1024 * 1024, // 10M
PerTableMemoryQuota: config.DefaultTableMemoryQuota,
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
Expand Down Expand Up @@ -314,7 +314,7 @@ server-worker-pool-size = 16
SortDir: config.DefaultSortDir,
},
Security: &config.SecurityConfig{},
PerTableMemoryQuota: 10 * 1024 * 1024, // 10M
PerTableMemoryQuota: config.DefaultTableMemoryQuota,
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
Expand Down Expand Up @@ -455,7 +455,7 @@ cert-allowed-cn = ["dd","ee"]
KeyPath: "cc",
CertAllowedCN: []string{"dd", "ee"},
},
PerTableMemoryQuota: 10 * 1024 * 1024, // 10M
PerTableMemoryQuota: config.DefaultTableMemoryQuota,
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const (

// DebugConfigurationItem is the name of debug configurations
DebugConfigurationItem = "debug"

// DefaultTableMemoryQuota is the default memory quota for each table.
DefaultTableMemoryQuota = 10 * 1024 * 1024 // 10 MB
)

func init() {
Expand Down Expand Up @@ -91,7 +94,7 @@ var defaultServerConfig = &ServerConfig{
SortDir: DefaultSortDir,
},
Security: &SecurityConfig{},
PerTableMemoryQuota: 10 * 1024 * 1024, // 10MB
PerTableMemoryQuota: DefaultTableMemoryQuota,
KVClient: &KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0, // 0 will use NumCPU() * 2
Expand Down
29 changes: 29 additions & 0 deletions tests/integration_tests/big_txn/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/big_txn/sync_diff/output"

source-instances = ["tidb"]

target-instance = "mysql"

target-check-tables = ["big_txn.*"]

[data-sources]
[data-sources.tidb]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.mysql]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
14 changes: 14 additions & 0 deletions tests/integration_tests/big_txn/conf/workload
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
threadcount=1
recordcount=10000
operationcount=0
workload=core
fieldcount=100

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
55 changes: 55 additions & 0 deletions tests/integration_tests/big_txn/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
TOPIC_NAME="ticdc-big-txn-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"

run_sql "CREATE DATABASE big_txn;"
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

check_table_exists "big_txn.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "CREATE TABLE big_txn.USERTABLE1 LIKE big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO big_txn.USERTABLE1 SELECT * FROM big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
sleep 60
check_table_exists "big_txn.USERTABLE1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_sql "CREATE TABLE big_txn.finish_mark_1 (a int primary key);"
sleep 120
check_table_exists "big_txn.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"