Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Ignore unrelated shards in partial MoveTables traffic state #13515

Merged
merged 4 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestUpdateVRWorkflow(t *testing.T) {
OnDdl: binlogdatapb.OnDDLAction_EXEC,
},
query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC)], vreplID),
keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID),
},
{
name: "update cell,tablet_types,on_ddl",
Expand All @@ -161,7 +161,7 @@ func TestUpdateVRWorkflow(t *testing.T) {
OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE,
},
query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC_IGNORE)], "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
},
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const mzSelectIDQuery = "select id from _vt.vreplication where db_name='vt_targe
const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1"
const mzCheckJournal = "/select val from _vt.resharding_journal where id="

var defaultOnDDL = binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)]
var defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String()

func TestMigrateTables(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Expand Down Expand Up @@ -2825,7 +2825,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{})
if onDDLAction == binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)] {
if onDDLAction == binlogdatapb.OnDDLAction_IGNORE.String() {
// This is the default and go does not marshal defaults
// for prototext fields so we use the default insert stmt.
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
Expand Down
41 changes: 24 additions & 17 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
}

var (
reverse bool
keyspace string
reverse bool
sourceKeyspace string
)

// We reverse writes by using the source_keyspace.workflowname_reverse workflow
Expand All @@ -229,17 +229,19 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
// source to check if writes have been switched.
if strings.HasSuffix(workflowName, "_reverse") {
reverse = true
keyspace = state.SourceKeyspace
// Flip the source and target keyspaces.
sourceKeyspace = state.TargetKeyspace
targetKeyspace = state.SourceKeyspace
workflowName = workflow.ReverseWorkflowName(workflowName)
} else {
keyspace = targetKeyspace
sourceKeyspace = state.SourceKeyspace
}
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
state.WorkflowType = workflow.TypeMoveTables

// We assume a consistent state, so only choose routing rule for one table.
if len(ts.Tables()) == 0 {
return nil, nil, fmt.Errorf("no tables in workflow %s.%s", keyspace, workflowName)
return nil, nil, fmt.Errorf("no tables in workflow %s.%s", targetKeyspace, workflowName)

}
table := ts.Tables()[0]
Expand All @@ -252,19 +254,22 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl

rules := shardRoutingRules.Rules
for _, rule := range rules {
if rule.ToKeyspace == ts.SourceKeyspaceName() {
switch rule.ToKeyspace {
case sourceKeyspace:
state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard)
} else {
case targetKeyspace:
state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard)
default:
// Not a relevant rule.
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand All @@ -275,7 +280,7 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
for _, table := range ts.Tables() {
rr := globalRules[table]
// if a rule exists for the table and points to the target keyspace, writes have been switched
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", keyspace, table) {
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) {
state.WritesSwitched = true
break
}
Expand All @@ -292,12 +297,12 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
shard = ts.SourceShards()[0]
}

state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -330,11 +335,13 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam
if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY {
return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType)
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
if !ts.isPartialMigration { // shard level traffic switching is all or nothing
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes an additional bug:

❯ vtctlclient MoveTables -- --source xtra --source_shards='80-' --tables 'customer' ReverseTraffic xtra2.80dash
E0717 10:08:06.507644   59806 main.go:96] E0717 14:08:06.506797 vtctl.go:2188]
requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched
I0717 10:08:06.514211   59806 main.go:96] I0717 14:08:06.514090 vtctl.go:2190] Workflow Status: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-

The following vreplication streams exist for workflow xtra2.80dash:

id=1 on 80-/zone1-0000000330: Status: Stopped. VStream Lag: 0s.

MoveTables Error: rpc error: code = Unknown desc = requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched'

I had seen that in the unit test. Confirmed with a local test. The problem is that we were processing those values even for partial workflows, when they are not ever set for partial workflows:

if ts.isPartialMigration { // shard level traffic switching is all or nothing
shardRoutingRules, err := wr.ts.GetShardRoutingRules(ctx)
if err != nil {
return nil, nil, err
}
rules := shardRoutingRules.Rules
for _, rule := range rules {
if rule.ToKeyspace == ts.SourceKeyspaceName() {
state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard)
} else {
state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard)
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_REPLICA)

if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
}
}
switch servedType {
case topodatapb.TabletType_REPLICA:
Expand Down
151 changes: 151 additions & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -263,6 +264,156 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
return tme
}

// newTestTablePartialMigrater creates a test tablet migrater
// specifially for partial or shard by shard migrations.
// The shards must be the same on the source and target, and we
// must be moving a subset of them.
// fmtQuery should be of the form: 'select a, b %s group by a'.
// The test will Sprintf a from clause and where clause as needed.
func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shardsToMove []string, fmtQuery string) *testMigraterEnv {
require.Greater(t, len(shards), 1, "shard by shard migrations can only be done on sharded keyspaces")
tme := &testMigraterEnv{}
tme.ts = memorytopo.NewServer("cell1", "cell2")
tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient())
tme.wr.sem = semaphore.NewWeighted(1)
tme.sourceShards = shards
tme.targetShards = shards
tme.tmeDB = fakesqldb.New(t)
expectVDiffQueries(tme.tmeDB)
tabletID := 10
for _, shard := range tme.sourceShards {
tme.sourcePrimaries = append(tme.sourcePrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks1", shard)))
tabletID += 10

_, sourceKeyRange, err := topo.ValidateShardName(shard)
if err != nil {
t.Fatal(err)
}
tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange)
}
tpChoiceTablet := tme.sourcePrimaries[0].Tablet
tpChoice = &testTabletPickerChoice{
keyspace: tpChoiceTablet.Keyspace,
shard: tpChoiceTablet.Shard,
}
for _, shard := range tme.targetShards {
tme.targetPrimaries = append(tme.targetPrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks2", shard)))
tabletID += 10

_, targetKeyRange, err := topo.ValidateShardName(shard)
if err != nil {
t.Fatal(err)
}
tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange)
}

vs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "c1",
Name: "hash",
}},
},
"t2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "c1",
Name: "hash",
}},
},
},
}
err := tme.ts.SaveVSchema(ctx, "ks1", vs)
require.NoError(t, err)
err = tme.ts.SaveVSchema(ctx, "ks2", vs)
require.NoError(t, err)
err = tme.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err)
err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false)
require.NoError(t, err)
err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks2", []string{"cell1"}, false)
require.NoError(t, err)

tme.startTablets(t)
tme.createDBClients(ctx, t)
tme.setPrimaryPositions()
now := time.Now().Unix()

for i, shard := range shards {
for _, shardToMove := range shardsToMove {
var streamInfoRows []string
var streamExtInfoRows []string
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks1",
Shard: shard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)),
}, {
Match: "t2",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)),
}},
},
}
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now))
}
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...))
tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
}
}

for i, shard := range shards {
for _, shardToMove := range shardsToMove {
var streamInfoRows []string
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks2",
Shard: shard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)),
}, {
Match: "t2",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)),
}},
},
}
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
}
tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...),
)
}
}

tme.targetKeyspace = "ks2"
return tme
}

func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targetShards []string) *testShardMigraterEnv {
tme := &testShardMigraterEnv{}
tme.ts = memorytopo.NewServer("cell1", "cell2")
Expand Down
Loading
Loading