diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 00931d4ec5f..c7a9a6a843e 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -230,9 +230,11 @@ func shardCustomer(t *testing.T, testReverse bool) { assert.False(t, exists) for _, shard := range strings.Split("-80,80-", ",") { - expectNumberOfStreams(t, vtgateConn, "shardCustomer", "p2c", "customer:"+shard, 0) + expectNumberOfStreams(t, vtgateConn, "shardCustomerTargetStreams", "p2c", "customer:"+shard, 0) } + expectNumberOfStreams(t, vtgateConn, "shardCustomerReverseStreams", "p2c_reverse", "product:0", 0) + var found bool found, err = checkIfTableExists(t, vc, "zone1-100", "customer") assert.NoError(t, err, "Customer table not deleted from zone1-100") diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index a7039c4ff38..dc8dfac8d53 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -101,7 +101,9 @@ var dryRunResultsDropSourcesCustomerShard = []string{ " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer", "Blacklisted tables customer will be removed from:", " Keyspace product Shard 0 Tablet 100", - "Delete vreplication streams on:", + "Delete reverse vreplication streams on source:", + " Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100", + "Delete vreplication streams on target:", " Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200", " Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300", "Unlock keyspace customer", diff --git a/go/vt/wrangler/stream_migrater_test.go b/go/vt/wrangler/stream_migrater_test.go index ce971a02bf7..1ca5b4950d2 100644 --- a/go/vt/wrangler/stream_migrater_test.go +++ b/go/vt/wrangler/stream_migrater_test.go @@ -162,7 +162,6 @@ func TestStreamMigrateMainflow(t *testing.T) { tme.expectCreateReverseVReplication() tme.expectStartReverseVReplication() tme.expectFrozenTargetVReplication() - tme.expectDeleteTargetVReplication() if _, _, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, true, false); err != nil { t.Fatal(err) } @@ -177,6 +176,8 @@ func TestStreamMigrateMainflow(t *testing.T) { checkIsMasterServing(t, tme.ts, "ks:-80", true) checkIsMasterServing(t, tme.ts, "ks:80-", true) + tme.expectDeleteReverseVReplication() + tme.expectDeleteTargetVReplication() if _, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", false); err != nil { t.Fatal(err) } diff --git a/go/vt/wrangler/switcher.go b/go/vt/wrangler/switcher.go index 6f755370f0e..a8fb24e89a3 100644 --- a/go/vt/wrangler/switcher.go +++ b/go/vt/wrangler/switcher.go @@ -114,6 +114,10 @@ func (r *switcher) dropTargetVReplicationStreams(ctx context.Context) error { return r.ts.dropTargetVReplicationStreams(ctx) } +func (r *switcher) dropSourceReverseVReplicationStreams(ctx context.Context) error { + return r.ts.dropSourceReverseVReplicationStreams(ctx) +} + func (r *switcher) logs() *[]string { return nil } diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index 3c8aa1af9b3..5cb6a33eb65 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -269,7 +269,7 @@ func (dr *switcherDryRun) validateWorkflowHasCompleted(ctx context.Context) erro } func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) error { - dr.drLog.Log("Delete vreplication streams on:") + dr.drLog.Log("Delete vreplication streams on target:") logs := make([]string, 0) for _, t := range dr.ts.targets { logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d", @@ -279,6 +279,17 @@ func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) err return nil } +func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Context) error { + dr.drLog.Log("Delete reverse vreplication streams on source:") + logs := make([]string, 0) + for _, t := range dr.ts.sources { + logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d", + t.si.Keyspace(), t.si.ShardName(), reverseName(dr.ts.workflow), t.master.DbName(), t.master.Alias.Uid)) + } + dr.drLog.LogSlice(logs) + return nil +} + func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error { logs := make([]string, 0) for _, target := range dr.ts.targets { diff --git a/go/vt/wrangler/switcher_interface.go b/go/vt/wrangler/switcher_interface.go index a9fcb2d457d..49f84859cf0 100644 --- a/go/vt/wrangler/switcher_interface.go +++ b/go/vt/wrangler/switcher_interface.go @@ -44,6 +44,7 @@ type iswitcher interface { dropSourceShards(ctx context.Context) error dropSourceBlacklistedTables(ctx context.Context) error freezeTargetVReplication(ctx context.Context) error + dropSourceReverseVReplicationStreams(ctx context.Context) error dropTargetVReplicationStreams(ctx context.Context) error logs() *[]string } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 5055c914127..d9d8ca1501d 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -339,6 +339,9 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow st return nil, err } } + if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil { + return nil, err + } if err := sw.dropTargetVReplicationStreams(ctx); err != nil { return nil, err } @@ -1167,14 +1170,22 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error { func (ts *trafficSwitcher) dropTargetVReplicationStreams(ctx context.Context) error { return ts.forAllTargets(func(target *tsTarget) error { ts.wr.Logger().Infof("Deleting target streams for workflow %s db_name %s", ts.workflow, target.master.DbName()) - fmt.Printf("Delete target streams for workflow %s db_name %s tablet %d\n", ts.workflow, target.master.DbName(), target.master.Alias.Uid) query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(target.master.DbName()), encodeString(ts.workflow)) - fmt.Println(query) _, err := ts.wr.tmc.VReplicationExec(ctx, target.master.Tablet, query) return err }) } +func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Context) error { + return ts.forAllSources(func(source *tsSource) error { + ts.wr.Logger().Infof("Deleting reverse streams for workflow %s db_name %s", ts.workflow, source.master.DbName()) + query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", + encodeString(source.master.DbName()), encodeString(reverseName(ts.workflow))) + _, err := ts.wr.tmc.VReplicationExec(ctx, source.master.Tablet, query) + return err + }) +} + func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) { rrs, err := wr.ts.GetRoutingRules(ctx) if err != nil { diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 474d833cd06..a3d3d8da3e1 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -824,7 +824,9 @@ func TestTableMigrateOneToMany(t *testing.T) { " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2", "Blacklisted tables t1,t2 will be removed from:", " Keyspace ks1 Shard 0 Tablet 10", - "Delete vreplication streams on:", + "Delete reverse vreplication streams on source:", + " Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10", + "Delete vreplication streams on target:", " Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20", " Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30", "Unlock keyspace ks2", @@ -838,6 +840,7 @@ func TestTableMigrateOneToMany(t *testing.T) { dropSources := func() { tme.dbTargetClients[0].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil) tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", &sqltypes.Result{}, nil) tme.tmeDB.AddQuery("drop table vt_ks1.t1", &sqltypes.Result{}) tme.tmeDB.AddQuery("drop table vt_ks1.t2", &sqltypes.Result{}) tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil) // @@ -845,6 +848,7 @@ func TestTableMigrateOneToMany(t *testing.T) { //TODO, why are the delete queries not required?! //tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where db_name='vt_ks2' and workflow='test'", &sqltypes.Result{}, nil) //tme.dbTargetClients[1].addQuery("delete from _vt.vreplication where db_name='vt_ks2' and workflow='test'", &sqltypes.Result{}, nil) + } dropSources()