Skip to content

Commit

Permalink
Merge pull request #6121 from planetscale/rn-dropsources-delete-rever…
Browse files Browse the repository at this point in the history
…se-streams

DropSources: delete reverse streams on source
  • Loading branch information
sougou authored Apr 27, 2020
2 parents 402ae0a + 89a58c7 commit 0f1551a
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 7 deletions.
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ func shardCustomer(t *testing.T, testReverse bool) {
assert.False(t, exists)

for _, shard := range strings.Split("-80,80-", ",") {
expectNumberOfStreams(t, vtgateConn, "shardCustomer", "p2c", "customer:"+shard, 0)
expectNumberOfStreams(t, vtgateConn, "shardCustomerTargetStreams", "p2c", "customer:"+shard, 0)
}

expectNumberOfStreams(t, vtgateConn, "shardCustomerReverseStreams", "p2c_reverse", "product:0", 0)

var found bool
found, err = checkIfTableExists(t, vc, "zone1-100", "customer")
assert.NoError(t, err, "Customer table not deleted from zone1-100")
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ var dryRunResultsDropSourcesCustomerShard = []string{
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables customer will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete vreplication streams on:",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
"Delete vreplication streams on target:",
" Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200",
" Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300",
"Unlock keyspace customer",
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/stream_migrater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func TestStreamMigrateMainflow(t *testing.T) {
tme.expectCreateReverseVReplication()
tme.expectStartReverseVReplication()
tme.expectFrozenTargetVReplication()
tme.expectDeleteTargetVReplication()
if _, _, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, true, false); err != nil {
t.Fatal(err)
}
Expand All @@ -177,6 +176,8 @@ func TestStreamMigrateMainflow(t *testing.T) {
checkIsMasterServing(t, tme.ts, "ks:-80", true)
checkIsMasterServing(t, tme.ts, "ks:80-", true)

tme.expectDeleteReverseVReplication()
tme.expectDeleteTargetVReplication()
if _, err := tme.wr.DropSources(ctx, tme.targetKeyspace, "test", false); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/wrangler/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (r *switcher) dropTargetVReplicationStreams(ctx context.Context) error {
return r.ts.dropTargetVReplicationStreams(ctx)
}

func (r *switcher) dropSourceReverseVReplicationStreams(ctx context.Context) error {
return r.ts.dropSourceReverseVReplicationStreams(ctx)
}

func (r *switcher) logs() *[]string {
return nil
}
13 changes: 12 additions & 1 deletion go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (dr *switcherDryRun) validateWorkflowHasCompleted(ctx context.Context) erro
}

func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) error {
dr.drLog.Log("Delete vreplication streams on:")
dr.drLog.Log("Delete vreplication streams on target:")
logs := make([]string, 0)
for _, t := range dr.ts.targets {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d",
Expand All @@ -279,6 +279,17 @@ func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) err
return nil
}

func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Context) error {
dr.drLog.Log("Delete reverse vreplication streams on source:")
logs := make([]string, 0)
for _, t := range dr.ts.sources {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Workflow %s DbName %s Tablet %d",
t.si.Keyspace(), t.si.ShardName(), reverseName(dr.ts.workflow), t.master.DbName(), t.master.Alias.Uid))
}
dr.drLog.LogSlice(logs)
return nil
}

func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error {
logs := make([]string, 0)
for _, target := range dr.ts.targets {
Expand Down
1 change: 1 addition & 0 deletions go/vt/wrangler/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type iswitcher interface {
dropSourceShards(ctx context.Context) error
dropSourceBlacklistedTables(ctx context.Context) error
freezeTargetVReplication(ctx context.Context) error
dropSourceReverseVReplicationStreams(ctx context.Context) error
dropTargetVReplicationStreams(ctx context.Context) error
logs() *[]string
}
15 changes: 13 additions & 2 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow st
return nil, err
}
}
if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetVReplicationStreams(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1167,14 +1170,22 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {
func (ts *trafficSwitcher) dropTargetVReplicationStreams(ctx context.Context) error {
return ts.forAllTargets(func(target *tsTarget) error {
ts.wr.Logger().Infof("Deleting target streams for workflow %s db_name %s", ts.workflow, target.master.DbName())
fmt.Printf("Delete target streams for workflow %s db_name %s tablet %d\n", ts.workflow, target.master.DbName(), target.master.Alias.Uid)
query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(target.master.DbName()), encodeString(ts.workflow))
fmt.Println(query)
_, err := ts.wr.tmc.VReplicationExec(ctx, target.master.Tablet, query)
return err
})
}

func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Context) error {
return ts.forAllSources(func(source *tsSource) error {
ts.wr.Logger().Infof("Deleting reverse streams for workflow %s db_name %s", ts.workflow, source.master.DbName())
query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s",
encodeString(source.master.DbName()), encodeString(reverseName(ts.workflow)))
_, err := ts.wr.tmc.VReplicationExec(ctx, source.master.Tablet, query)
return err
})
}

func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) {
rrs, err := wr.ts.GetRoutingRules(ctx)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,9 @@ func TestTableMigrateOneToMany(t *testing.T) {
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
"Blacklisted tables t1,t2 will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10",
"Delete vreplication streams on:",
"Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
"Delete vreplication streams on target:",
" Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20",
" Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30",
"Unlock keyspace ks2",
Expand All @@ -838,13 +840,15 @@ func TestTableMigrateOneToMany(t *testing.T) {
dropSources := func() {
tme.dbTargetClients[0].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", &sqltypes.Result{}, nil)
tme.tmeDB.AddQuery("drop table vt_ks1.t1", &sqltypes.Result{})
tme.tmeDB.AddQuery("drop table vt_ks1.t2", &sqltypes.Result{})
tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil) //
tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil)
//TODO, why are the delete queries not required?!
//tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where db_name='vt_ks2' and workflow='test'", &sqltypes.Result{}, nil)
//tme.dbTargetClients[1].addQuery("delete from _vt.vreplication where db_name='vt_ks2' and workflow='test'", &sqltypes.Result{}, nil)

}
dropSources()

Expand Down

0 comments on commit 0f1551a

Please sign in to comment.