Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flakes: Remove CI endtoend test for VReplication Copy Phase Throttling #13343

Merged
merged 1 commit into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ func TestVReplicationDDLHandling(t *testing.T) {
moveTablesAction(t, "Cancel", defaultCellName, workflow, sourceKs, targetKs, table)
}

// TestVreplicationCopyThrottling tests the logic that is used
// by vstreamer when starting a copy phase cycle.
// This logic today supports waiting for MySQL replication lag
// and/or InnoDB MVCC history to be below a certain threshold
// before starting the next copy phase. This test focuses on
// the innodb history list length check.
// NOTE: this is a manual test. It is not executed in the CI.
func TestVreplicationCopyThrottling(t *testing.T) {
workflow := "copy-throttling"
cell := "zone1"
Expand Down
64 changes: 39 additions & 25 deletions go/vt/vttablet/tabletserver/vstreamer/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,30 @@ func expectUpdateCount(t *testing.T, wantCount int64) int64 {
panic("unreachable")
}

// TestVStreamerWaitForMySQL tests the wait for MySQL to catch-up
// logic that is used by vstreamer when starting a copy phase cycle.
// This logic today supports waiting for MySQL replication lag
// and/or InnoDB MVCC history to be below a certain threshold before
// starting the next copy phase.
func TestVStreamerWaitForMySQL(t *testing.T) {
tableName := "test"
expectedWaits := int64(0)
testDB := fakesqldb.New(t)
hostres := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"hostname|port",
"varchar|int64"),
"localhost|3306",
)
thlres := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"history_len",
"int64"),
"1000",
)
sbmres := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Seconds_Behind_Master",
"int64"),
"10",
)
type fields struct {
vse *Engine
cp dbconfigs.Connector
Expand All @@ -175,29 +197,32 @@ func TestVStreamerWaitForMySQL(t *testing.T) {
maxMySQLReplLagSecs int64
}
tests := []struct {
name string
fields fields
wantErr bool
name string
fields fields
shouldWait bool
wantErr bool
}{
{
name: "Small InnoDB MVCC impact limit",
fields: fields{
vse: engine,
se: engine.se,
maxInnoDBTrxHistLen: 100,
maxInnoDBTrxHistLen: 100, // Should wait on this
maxMySQLReplLagSecs: 5000,
},
wantErr: true,
shouldWait: true,
wantErr: true,
},
{
name: "Small Repl Lag impact limit",
fields: fields{
vse: engine,
se: engine.se,
maxInnoDBTrxHistLen: 10000,
maxMySQLReplLagSecs: 5,
maxMySQLReplLagSecs: 5, // Should wait on this
},
wantErr: true,
shouldWait: true,
wantErr: true,
},
{
name: "Large impact limits",
Expand All @@ -210,25 +235,11 @@ func TestVStreamerWaitForMySQL(t *testing.T) {
wantErr: false,
},
}
testDB := fakesqldb.New(t)
hostres := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"hostname|port",
"varchar|int64"),
"localhost|3306",
)
thlres := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"history_len",
"int64"),
"1000",
)
sbmres := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Seconds_Behind_Master",
"int64"),
"10",
)

testDB.AddQuery(hostQuery, hostres)
testDB.AddQuery(trxHistoryLenQuery, thlres)
testDB.AddQuery(replicaLagQuery, sbmres)

for _, tt := range tests {
tt.fields.cp = testDB.ConnParams()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -247,9 +258,12 @@ func TestVStreamerWaitForMySQL(t *testing.T) {
if err := uvs.vse.waitForMySQL(ctx, uvs.cp, tableName); (err != nil) != tt.wantErr {
t.Errorf("vstreamer.waitForMySQL() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.shouldWait {
expectedWaits++
}
})
}

require.Equal(t, engine.rowStreamerWaits.Counts()["VStreamerTest.waitForMySQL"], int64(2))
require.Equal(t, engine.vstreamerPhaseTimings.Counts()["VStreamerTest."+tableName+":waitForMySQL"], int64(2))
require.Equal(t, engine.rowStreamerWaits.Counts()["VStreamerTest.waitForMySQL"], expectedWaits)
require.Equal(t, engine.vstreamerPhaseTimings.Counts()["VStreamerTest."+tableName+":waitForMySQL"], expectedWaits)
}
9 changes: 0 additions & 9 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1035,15 +1035,6 @@
"RetryMax": 1,
"Tags": []
},
"vreplication_copy_throttling": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVreplicationCopyThrottling"],
"Command": [],
"Manual": false,
"Shard": "vreplication_basic",
"RetryMax": 1,
"Tags": []
},
"vreplication_copy_parallel": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVreplicationCopyParallel"],
Expand Down