Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoveTables Cancel: drop denied tables on target when dropping source/target tables #14008

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,17 @@ func checkIfTableExists(t *testing.T, vc *VitessCluster, tabletAlias string, tab
return found, nil
}

func checkIfDenyListExists(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) {
func validateTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string, mustExist bool) {
found, err := isTableInDenyList(t, vc, ksShard, table)
require.NoError(t, err)
if mustExist {
require.True(t, found, "Table %s not found in deny list", table)
} else {
require.False(t, found, "Table %s found in deny list", table)
}
}

func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) {
var output string
var err error
found := false
Expand Down
53 changes: 52 additions & 1 deletion go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,61 @@ import (
"strings"
"testing"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

// testCancel() starts and cancels a partial MoveTables for one of the shards which will be actually moved later on.
// Before canceling, we first switch traffic to the target keyspace and then reverse it back to the source keyspace.
// This tests that artifacts are being properly cleaned up when a MoveTables ia canceled.
func testCancel(t *testing.T) {
targetKeyspace := "customer2"
sourceKeyspace := "customer"
workflowName := "partial80DashForCancel"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
// We use a different table in this MoveTables than the subsequent one, so that setting up of the artifacts
// while creating MoveTables do not paper over any issues with cleaning up artifacts when MoveTables is canceled.
// Ref: https://github.com/vitessio/vitess/issues/13998
table := "customer2"
shard := "80-"
// start the partial movetables for 80-
mt := newMoveTables(vc, &moveTables{
workflowName: workflowName,
targetKeyspace: targetKeyspace,
sourceKeyspace: sourceKeyspace,
tables: table,
sourceShards: shard,
}, moveTablesFlavorRandom)
Comment on lines +47 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 🙂

mt.Create()

checkDenyList := func(keyspace string, expected bool) {
validateTableInDenyList(t, vc, fmt.Sprintf("%s:%s", keyspace, shard), table, expected)
}

waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

mt.SwitchReadsAndWrites()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)

mt.ReverseReadsAndWrites()
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)

mt.Cancel()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

}

// TestPartialMoveTablesBasic tests partial move tables by moving each
// customer shard -- -80,80- -- once a a time to customer2.
func TestPartialMoveTablesBasic(t *testing.T) {
Expand Down Expand Up @@ -58,7 +106,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// Move customer table from unsharded product keyspace to
// sharded customer keyspace.
createMoveTablesWorkflow(t, "customer,loadtest")
createMoveTablesWorkflow(t, "customer,loadtest,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

Expand All @@ -81,6 +129,9 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// move tables for one of the two shards: 80-.
defaultRdonly = 0
setupCustomer2Keyspace(t)

testCancel(t)

currentWorkflowType = wrangler.MoveTablesWorkflow
wfName := "partial80Dash"
sourceKs := "customer"
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {
verifyClusterHealth(t, vc)
insertInitialData(t)
shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name, true)
checkIfDenyListExists(t, vc, "product:0", "customer")
isTableInDenyList(t, vc, "product:0", "customer")
// we tag along this test so as not to create the overhead of creating another cluster
testVStreamCellFlag(t)
}
Expand Down Expand Up @@ -876,13 +876,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchWrites(t, workflowType, ksWorkflow, false)

var exists bool
exists, err = checkIfDenyListExists(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product:0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.True(t, exists)

moveTablesAction(t, "Complete", allCellNames, workflow, sourceKs, targetKs, tables)

exists, err = checkIfDenyListExists(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product:0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.False(t, exists)

Expand Down
25 changes: 20 additions & 5 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type moveTables struct {
sourceKeyspace string
tables string
atomicCopy bool
sourceShards string
}

type iMoveTables interface {
Expand All @@ -53,6 +54,7 @@ type iMoveTables interface {
SwitchReads()
SwitchWrites()
SwitchReadsAndWrites()
ReverseReadsAndWrites()
Cancel()
Complete()
Flavor() string
Expand Down Expand Up @@ -91,7 +93,7 @@ func newVtctlMoveTables(mt *moveTables) *VtctlMoveTables {
func (vmt *VtctlMoveTables) Create() {
log.Infof("vmt is %+v", vmt.vc, vmt.tables)
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCreate, "", "", "", vmt.atomicCopy)
vmt.tables, workflowActionCreate, "", vmt.sourceShards, "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

Expand All @@ -101,6 +103,12 @@ func (vmt *VtctlMoveTables) SwitchReadsAndWrites() {
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) Show() {
//TODO implement me
panic("implement me")
Expand All @@ -117,8 +125,9 @@ func (vmt *VtctlMoveTables) SwitchWrites() {
}

func (vmt *VtctlMoveTables) Cancel() {
//TODO implement me
panic("implement me")
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCancel, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) Complete() {
Expand Down Expand Up @@ -158,13 +167,20 @@ func (v VtctldMoveTables) Create() {
if v.atomicCopy {
args = append(args, "--atomic-copy="+strconv.FormatBool(v.atomicCopy))
}
if v.sourceShards != "" {
args = append(args, "--source-shards="+v.sourceShards)
}
v.exec(args...)
}

func (v VtctldMoveTables) SwitchReadsAndWrites() {
v.exec("SwitchTraffic")
}

func (v VtctldMoveTables) ReverseReadsAndWrites() {
v.exec("ReverseTraffic")
}

func (v VtctldMoveTables) Show() {
//TODO implement me
panic("implement me")
Expand All @@ -181,8 +197,7 @@ func (v VtctldMoveTables) SwitchWrites() {
}

func (v VtctldMoveTables) Cancel() {
//TODO implement me
panic("implement me")
v.exec("Cancel")
}

func (v VtctldMoveTables) Complete() {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat
return nil
}

// UpdateSourceDeniedTables will add or remove the listed tables
// UpdateDeniedTables will add or remove the listed tables
// in the shard record's TabletControl structures. Note we don't
// support a lot of the corner cases:
// - only support one table list per shard. If we encounter a different
Expand All @@ -419,7 +419,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat
// because it's not used in the same context (vertical vs horizontal sharding)
//
// This function should be called while holding the keyspace lock.
func (si *ShardInfo) UpdateSourceDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error {
func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error {
if err := CheckKeyspaceLocked(ctx, si.keyspace); err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions go/vt/topo/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ func lockedKeyspaceContext(keyspace string) context.Context {
}

func addToDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, false, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables); err != nil {
return err
}
return nil
}

func removeFromDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, true, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -161,13 +161,13 @@ func TestUpdateSourceDeniedTables(t *testing.T) {

// check we enforce the keyspace lock
ctx := context.Background()
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" {
t.Fatalf("unlocked keyspace produced wrong error: %v", err)
}
ctx = lockedKeyspaceContext("ks")

// add one cell
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first"},
Expand All @@ -178,20 +178,20 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// remove that cell, going back
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 {
t.Fatalf("going back should have remove the record: %v", si)
}

// re-add a cell, then another with different table list to
// make sure it fails
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil {
t.Fatalf("one cell add failed: %v", si)
}
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" {
t.Fatalf("different table list should fail: %v", err)
}
// add another cell, see the list grow
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "second"},
Expand All @@ -202,7 +202,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// add all cells, see the list grow to all
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "second", "third"},
Expand All @@ -213,7 +213,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// remove one cell from the full list
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "third"},
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3286,7 +3286,7 @@ func (s *VtctldServer) SetShardTabletControl(ctx context.Context, req *vtctldata
defer unlock(&err)

si, err := s.ts.UpdateShardFields(ctx, req.Keyspace, req.Shard, func(si *topo.ShardInfo) error {
return si.UpdateSourceDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables)
return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables)
})

switch {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,9 @@ func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow strin
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}
case binlogdatapb.MigrationType_SHARDS:
if err := sw.dropTargetShards(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -2074,6 +2077,9 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}

case binlogdatapb.MigrationType_SHARDS:
log.Infof("Removing shards")
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (r *switcher) dropSourceDeniedTables(ctx context.Context) error {
return r.ts.dropSourceDeniedTables(ctx)
}

func (r *switcher) dropTargetDeniedTables(ctx context.Context) error {
return r.ts.dropTargetDeniedTables(ctx)
}

func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error {
return r.ts.validateWorkflowHasCompleted(ctx)
}
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,17 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error {
return nil
}

func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error {
logs := make([]string, 0)
for _, si := range dr.ts.TargetShards() {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid))
}
if len(logs) > 0 {
dr.drLog.Logf("Denied tables records on [%s] will be removed from: [%s]", strings.Join(dr.ts.Tables(), ","), strings.Join(logs, ","))
}
return nil
}

func (dr *switcherDryRun) logs() *[]string {
return &dr.drLog.logs
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type iswitcher interface {
removeSourceTables(ctx context.Context, removalType TableRemovalType) error
dropSourceShards(ctx context.Context) error
dropSourceDeniedTables(ctx context.Context) error
dropTargetDeniedTables(ctx context.Context) error
freezeTargetVReplication(ctx context.Context) error
dropSourceReverseVReplicationStreams(ctx context.Context) error
dropTargetVReplicationStreams(ctx context.Context) error
Expand Down
Loading
Loading