From 8bc817e4faa3c2b2b72b2b1ac6378bf0b2c4fb64 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 16 Jul 2023 17:20:55 -0400 Subject: [PATCH 1/4] Ignore unrelated shards in partial movetables workflow status Signed-off-by: Matt Lord --- .../tabletmanager/rpc_vreplication_test.go | 4 +- go/vt/wrangler/materializer_test.go | 4 +- go/vt/wrangler/traffic_switcher.go | 7 +- go/vt/wrangler/traffic_switcher_env_test.go | 158 ++++++++++++++++++ go/vt/wrangler/workflow_test.go | 64 ++++++- 5 files changed, 228 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index f7f680f10ad..343f6de73b7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -150,7 +150,7 @@ func TestUpdateVRWorkflow(t *testing.T) { OnDdl: binlogdatapb.OnDDLAction_EXEC, }, query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`, - keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC)], vreplID), + keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID), }, { name: "update cell,tablet_types,on_ddl", @@ -161,7 +161,7 @@ func TestUpdateVRWorkflow(t *testing.T) { OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE, }, query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`, - keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC_IGNORE)], "zone1,zone2,zone3", "rdonly,replica,primary", vreplID), + keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID), }, } diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index b2f90303802..bd3e23fce7f 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -45,7 +45,7 @@ const mzSelectIDQuery = "select id from _vt.vreplication where db_name='vt_targe const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1" const mzCheckJournal = "/select val from _vt.resharding_journal where id=" -var defaultOnDDL = binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)] +var defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String() func TestMigrateTables(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ @@ -2825,7 +2825,7 @@ func TestMoveTablesDDLFlag(t *testing.T) { env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) - if onDDLAction == binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)] { + if onDDLAction == binlogdatapb.OnDDLAction_IGNORE.String() { // This is the default and go does not marshal defaults // for prototext fields so we use the default insert stmt. env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 43ab57b8ca8..86968e3bb9e 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -252,10 +252,13 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl rules := shardRoutingRules.Rules for _, rule := range rules { - if rule.ToKeyspace == ts.SourceKeyspaceName() { + switch rule.ToKeyspace { + case ts.SourceKeyspaceName(): state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard) - } else { + case ts.TargetKeyspaceName(): state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard) + default: + // Not a relevant rule. } } } else { diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 7b219e47eec..2242db16e66 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "golang.org/x/sync/semaphore" "vitess.io/vitess/go/mysql" @@ -263,6 +264,163 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, return tme } +// newTestTablePartialMigrater creates a test tablet migrater +// specifially for partial or shard by shard migrations. +// The shards must be the same on the source and target, and we +// must be moving a subset of them. +// fmtQuery should be of the form: 'select a, b %s group by a'. +// The test will Sprintf a from clause and where clause as needed. +func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shardsToMove []string, fmtQuery string) *testMigraterEnv { + require.Greater(t, len(shards), 1, "shard by shard migrations can only be done on sharded keyspaces") + tme := &testMigraterEnv{} + tme.ts = memorytopo.NewServer("cell1", "cell2") + tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient()) + tme.wr.sem = semaphore.NewWeighted(1) + tme.sourceShards = shards + tme.targetShards = shards + tme.tmeDB = fakesqldb.New(t) + expectVDiffQueries(tme.tmeDB) + tabletID := 10 + for _, shard := range tme.sourceShards { + tme.sourcePrimaries = append(tme.sourcePrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks1", shard))) + tabletID += 10 + + _, sourceKeyRange, err := topo.ValidateShardName(shard) + if err != nil { + t.Fatal(err) + } + tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange) + } + tpChoiceTablet := tme.sourcePrimaries[0].Tablet + tpChoice = &testTabletPickerChoice{ + keyspace: tpChoiceTablet.Keyspace, + shard: tpChoiceTablet.Shard, + } + for _, shard := range tme.targetShards { + tme.targetPrimaries = append(tme.targetPrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks2", shard))) + tabletID += 10 + + _, targetKeyRange, err := topo.ValidateShardName(shard) + if err != nil { + t.Fatal(err) + } + tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) + } + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + err := tme.ts.SaveVSchema(ctx, "ks1", vs) + require.NoError(t, err) + err = tme.ts.SaveVSchema(ctx, "ks2", vs) + require.NoError(t, err) + err = tme.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false) + require.NoError(t, err) + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks2", []string{"cell1"}, false) + require.NoError(t, err) + + tme.startTablets(t) + tme.createDBClients(ctx, t) + tme.setPrimaryPositions() + now := time.Now().Unix() + + for i, shard := range shards { + var streamInfoRows []string + var streamExtInfoRows []string + for _, shardToMove := range shardsToMove { + if shardToMove == shard { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)), + }, { + Match: "t2", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)), + }}, + }, + } + streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) + streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now)) + } + } + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...)) + tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) + tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) + } + + for i, shard := range shards { + var streamInfoRows []string + for _, shardToMove := range shardsToMove { + if shardToMove == shard { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks2", + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)), + }, { + Match: "t2", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)), + }}, + }, + } + streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) + } + } + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...), + ) + } + + srr := make(map[string]string, len(shards)) + for _, shard := range shards { + srr[fmt.Sprintf("ks2.%s", shard)] = "ks1" + } + err = topotools.SaveShardRoutingRules(ctx, tme.ts, srr) + require.NoError(t, err) + + tme.targetKeyspace = "ks2" + return tme +} + func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targetShards []string) *testShardMigraterEnv { tme := &testShardMigraterEnv{} tme.ts = memorytopo.NewServer("cell1", "cell2") diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index e6ac0d472e5..d5b155d73d8 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtctl/workflow" @@ -49,7 +50,7 @@ func getMoveTablesWorkflow(t *testing.T, cells, tabletTypes string) *VReplicatio Cells: cells, TabletTypes: tabletTypes, MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds, - OnDDL: binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC)], + OnDDL: binlogdatapb.OnDDLAction_EXEC.String(), } mtwf := &VReplicationWorkflow{ workflowType: MoveTablesWorkflow, @@ -280,7 +281,7 @@ func TestMoveTablesV2(t *testing.T) { TabletTypes: "REPLICA,RDONLY,PRIMARY", Timeout: DefaultActionTimeout, MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds, - OnDDL: binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_STOP)], + OnDDL: binlogdatapb.OnDDLAction_STOP.String(), } tme := newTestTableMigrater(ctx, t) defer tme.stopTablets(t) @@ -300,6 +301,63 @@ func TestMoveTablesV2(t *testing.T) { require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) } +// TestMoveTablesShardByShard ensures that shard by shard +// migrations work as expected. +func TestMoveTablesShardByShard(t *testing.T) { + ctx := context.Background() + shards := []string{"-80", "80-"} + shardsToMove := shards[0:1] + p := &VReplicationWorkflowParams{ + Workflow: "test", + WorkflowType: MoveTablesWorkflow, + SourceKeyspace: "ks1", + SourceShards: shardsToMove, // shard by shard + TargetShards: shardsToMove, // shard by shard + TargetKeyspace: "ks2", + Tables: "t1,t2", + Cells: "cell1,cell2", + TabletTypes: "REPLICA,RDONLY,PRIMARY", + Timeout: DefaultActionTimeout, + MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds, + OnDDL: binlogdatapb.OnDDLAction_STOP.String(), + } + tme := newTestTablePartialMigrater(ctx, t, shards, shards[0:1], "select * %s") + defer tme.stopTablets(t) + + // Save some unrelated shard routing rules to be sure that + // they don't interfere in any way. + srr, err := tme.ts.GetShardRoutingRules(ctx) + require.NoError(t, err) + srr.Rules = append(srr.Rules, &vschema.ShardRoutingRule{ + FromKeyspace: "wut", + Shard: "40-80", + ToKeyspace: "bloop", + }) + err = tme.ts.SaveShardRoutingRules(ctx, srr) + require.NoError(t, err) + + wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p) + require.NoError(t, err) + require.NotNil(t, wf) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) + require.True(t, wf.ts.isPartialMigration, "expected partial shard migration") + + trafficSwitchResults := fmt.Sprintf("Reads partially switched, for shards: %s. Writes partially switched, for shards: %s", + strings.Join(shardsToMove, ","), strings.Join(shardsToMove, ",")) + tme.expectNoPreviousJournals() + expectMoveTablesQueries(t, tme, p) + tme.expectNoPreviousJournals() + require.NoError(t, testSwitchForward(t, wf)) + require.Equal(t, trafficSwitchResults, wf.CurrentState()) + + /* TODO: Figure out why this isn't working... + tme.expectNoPreviousJournals() + tme.expectNoPreviousReverseJournals() + require.NoError(t, testReverse(t, wf)) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) + */ +} + func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) { rr, err := ts.GetRoutingRules(ctx) require.NoError(t, err) @@ -485,7 +543,7 @@ func TestReshardV2(t *testing.T) { TabletTypes: "replica,rdonly,primary", Timeout: DefaultActionTimeout, MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds, - OnDDL: binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC_IGNORE)], + OnDDL: binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), } tme := newTestShardMigrater(ctx, t, sourceShards, targetShards) defer tme.stopTablets(t) From 12d42611ae4de642fc87c58e0f98f569f0594490 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 16 Jul 2023 18:51:15 -0400 Subject: [PATCH 2/4] More fixes related to partial traffic handling Signed-off-by: Matt Lord --- go/vt/wrangler/traffic_switcher.go | 38 ++++++++------- go/vt/wrangler/traffic_switcher_env_test.go | 51 +++++++++------------ go/vt/wrangler/workflow_test.go | 26 +++++++---- 3 files changed, 59 insertions(+), 56 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 86968e3bb9e..b2f77a2cf01 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -218,8 +218,8 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl } var ( - reverse bool - keyspace string + reverse bool + sourceKeyspace string ) // We reverse writes by using the source_keyspace.workflowname_reverse workflow @@ -229,17 +229,19 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl // source to check if writes have been switched. if strings.HasSuffix(workflowName, "_reverse") { reverse = true - keyspace = state.SourceKeyspace + // Flip the source and target keyspaces. + sourceKeyspace = state.TargetKeyspace + targetKeyspace = state.SourceKeyspace workflowName = workflow.ReverseWorkflowName(workflowName) } else { - keyspace = targetKeyspace + sourceKeyspace = state.SourceKeyspace } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { state.WorkflowType = workflow.TypeMoveTables // We assume a consistent state, so only choose routing rule for one table. if len(ts.Tables()) == 0 { - return nil, nil, fmt.Errorf("no tables in workflow %s.%s", keyspace, workflowName) + return nil, nil, fmt.Errorf("no tables in workflow %s.%s", targetKeyspace, workflowName) } table := ts.Tables()[0] @@ -253,21 +255,21 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl rules := shardRoutingRules.Rules for _, rule := range rules { switch rule.ToKeyspace { - case ts.SourceKeyspaceName(): + case sourceKeyspace: state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard) - case ts.TargetKeyspaceName(): + case targetKeyspace: state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard) default: // Not a relevant rule. } } } else { - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_RDONLY) + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY) if err != nil { return nil, nil, err } - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_REPLICA) + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA) if err != nil { return nil, nil, err } @@ -278,7 +280,7 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl for _, table := range ts.Tables() { rr := globalRules[table] // if a rule exists for the table and points to the target keyspace, writes have been switched - if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", keyspace, table) { + if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) { state.WritesSwitched = true break } @@ -295,12 +297,12 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl shard = ts.SourceShards()[0] } - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_RDONLY) + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_RDONLY) if err != nil { return nil, nil, err } - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_REPLICA) + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_REPLICA) if err != nil { return nil, nil, err } @@ -333,11 +335,13 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType) } - if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 { - return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched") - } - if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 { - return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched") + if !ts.isPartialMigration { // shard level traffic switching is all or nothing + if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 { + return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched") + } + if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 { + return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched") + } } switch servedType { case topodatapb.TabletType_REPLICA: diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 2242db16e66..f493e7b5939 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -346,9 +346,9 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar now := time.Now().Unix() for i, shard := range shards { - var streamInfoRows []string - var streamExtInfoRows []string for _, shardToMove := range shardsToMove { + var streamInfoRows []string + var streamExtInfoRows []string if shardToMove == shard { bls := &binlogdatapb.BinlogSource{ Keyspace: "ks1", @@ -366,25 +366,25 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now)) } + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...)) + tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) + tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) } - tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) - tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|message|cell|tablet_types", - "int64|varchar|varchar|varchar|varchar"), - streamInfoRows...)) - tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", - "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), - streamExtInfoRows...)) - tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", - "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), - streamExtInfoRows...)) } for i, shard := range shards { - var streamInfoRows []string for _, shardToMove := range shardsToMove { + var streamInfoRows []string if shardToMove == shard { bls := &binlogdatapb.BinlogSource{ Keyspace: "ks2", @@ -400,23 +400,16 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar }, } streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) } + tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...), + ) } - tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) - tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|message|cell|tablet_types", - "int64|varchar|varchar|varchar|varchar"), - streamInfoRows...), - ) } - srr := make(map[string]string, len(shards)) - for _, shard := range shards { - srr[fmt.Sprintf("ks2.%s", shard)] = "ks1" - } - err = topotools.SaveShardRoutingRules(ctx, tme.ts, srr) - require.NoError(t, err) - tme.targetKeyspace = "ks2" return tme } diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index d5b155d73d8..3c4294a32c8 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -303,6 +303,8 @@ func TestMoveTablesV2(t *testing.T) { // TestMoveTablesShardByShard ensures that shard by shard // migrations work as expected. +// This test moves tables from one sharded keyspace (ks1) to +// another sharded keyspace (ks2), but only for the -80 shard. func TestMoveTablesShardByShard(t *testing.T) { ctx := context.Background() shards := []string{"-80", "80-"} @@ -328,11 +330,18 @@ func TestMoveTablesShardByShard(t *testing.T) { // they don't interfere in any way. srr, err := tme.ts.GetShardRoutingRules(ctx) require.NoError(t, err) - srr.Rules = append(srr.Rules, &vschema.ShardRoutingRule{ - FromKeyspace: "wut", - Shard: "40-80", - ToKeyspace: "bloop", - }) + srr.Rules = append(srr.Rules, []*vschema.ShardRoutingRule{ + { + FromKeyspace: "wut", + Shard: "40-80", + ToKeyspace: "bloop", + }, + { + FromKeyspace: "haylo", + Shard: "-80", + ToKeyspace: "blarg", + }, + }...) err = tme.ts.SaveShardRoutingRules(ctx, srr) require.NoError(t, err) @@ -342,20 +351,17 @@ func TestMoveTablesShardByShard(t *testing.T) { require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) require.True(t, wf.ts.isPartialMigration, "expected partial shard migration") - trafficSwitchResults := fmt.Sprintf("Reads partially switched, for shards: %s. Writes partially switched, for shards: %s", - strings.Join(shardsToMove, ","), strings.Join(shardsToMove, ",")) tme.expectNoPreviousJournals() expectMoveTablesQueries(t, tme, p) tme.expectNoPreviousJournals() require.NoError(t, testSwitchForward(t, wf)) - require.Equal(t, trafficSwitchResults, wf.CurrentState()) + require.Equal(t, "All Reads Switched. All Writes Switched", wf.CurrentState()) + require.NoError(t, err) - /* TODO: Figure out why this isn't working... tme.expectNoPreviousJournals() tme.expectNoPreviousReverseJournals() require.NoError(t, testReverse(t, wf)) require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) - */ } func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) { From e56f3a8d3eec236355f4ed7926a945d310c26909 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 16 Jul 2023 21:14:58 -0400 Subject: [PATCH 3/4] Adjust function names for improved clarity Signed-off-by: Matt Lord --- go/vt/wrangler/workflow_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 3c4294a32c8..ba73db9a2ea 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -301,11 +301,10 @@ func TestMoveTablesV2(t *testing.T) { require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) } -// TestMoveTablesShardByShard ensures that shard by shard -// migrations work as expected. -// This test moves tables from one sharded keyspace (ks1) to -// another sharded keyspace (ks2), but only for the -80 shard. -func TestMoveTablesShardByShard(t *testing.T) { +// TestPartialMoveTables ensures that shard by shard migrations work +// as expected. This test moves tables from one sharded keyspace (ks1) +// to another sharded keyspace (ks2), but only for the -80 shard. +func TestPartialMoveTables(t *testing.T) { ctx := context.Background() shards := []string{"-80", "80-"} shardsToMove := shards[0:1] @@ -429,7 +428,7 @@ func testReverse(t *testing.T, wf *VReplicationWorkflow) error { return err } -func TestMoveTablesV2Partial(t *testing.T) { +func TestMoveTablesV2SwitchTraffic(t *testing.T) { ctx := context.Background() p := &VReplicationWorkflowParams{ Workflow: "test", From 902a91ce7afb2673502683363be60d886bea169d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 17 Jul 2023 10:44:24 -0400 Subject: [PATCH 4/4] Correct unit test Signed-off-by: Matt Lord --- go/vt/wrangler/workflow_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index ba73db9a2ea..73a720296a4 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -350,11 +350,26 @@ func TestPartialMoveTables(t *testing.T) { require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) require.True(t, wf.ts.isPartialMigration, "expected partial shard migration") + // The default shard routing rule for the keyspace's other shard would + // normally be put in place, but the unit test does not execute the + // wrangler.MoveTables function which adds all of the default shard + // routing rules in the topo for the keyspace when the first workflow + // is run against it. So we simulate it here. + srr, err = tme.ts.GetShardRoutingRules(ctx) + require.NoError(t, err) + srr.Rules = append(srr.Rules, &vschema.ShardRoutingRule{ + FromKeyspace: "ks2", + Shard: "80-", + ToKeyspace: "ks1", + }) + err = tme.ts.SaveShardRoutingRules(ctx, srr) + require.NoError(t, err) + tme.expectNoPreviousJournals() expectMoveTablesQueries(t, tme, p) tme.expectNoPreviousJournals() require.NoError(t, testSwitchForward(t, wf)) - require.Equal(t, "All Reads Switched. All Writes Switched", wf.CurrentState()) + require.Equal(t, "Reads partially switched, for shards: -80. Writes partially switched, for shards: -80", wf.CurrentState()) require.NoError(t, err) tme.expectNoPreviousJournals()