Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5265 ; add a --skip-verify option to CopySchemaShard. #5964

Merged
merged 3 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ var commands = []commandGroup{
"[-allow_long_unavailability] [-wait_slave_timeout=10s] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
"Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to slaves via replication. If -allow_long_unavailability is set, schema changes affecting a large number of rows (and possibly incurring a longer period of unavailability) will not be rejected."},
{"CopySchemaShard", commandCopySchemaShard,
"[-tables=<table1>,<table2>,...] [-exclude_tables=<table1>,<table2>,...] [-include-views] [-wait_slave_timeout=10s] {<source keyspace/shard> || <source tablet alias>} <destination keyspace/shard>",
"[-tables=<table1>,<table2>,...] [-exclude_tables=<table1>,<table2>,...] [-include-views] [-skip-verify] [-wait_slave_timeout=10s] {<source keyspace/shard> || <source tablet alias>} <destination keyspace/shard>",
"Copies the schema from a source shard's master (or a specific tablet) to a destination shard. The schema is applied directly on the master of the destination shard, and it is propagated to the replicas through binlogs."},

{"ValidateVersionShard", commandValidateVersionShard,
Expand Down Expand Up @@ -2305,6 +2305,7 @@ func commandCopySchemaShard(ctx context.Context, wr *wrangler.Wrangler, subFlags
tables := subFlags.String("tables", "", "Specifies a comma-separated list of tables to copy. Each is either an exact match, or a regular expression of the form /regexp/")
excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/")
includeViews := subFlags.Bool("include-views", true, "Includes views in the output")
skipVerify := subFlags.Bool("skip-verify", false, "Skip verification of source and target schema after copy")
waitSlaveTimeout := subFlags.Duration("wait_slave_timeout", 10*time.Second, "The amount of time to wait for slaves to receive the schema change via replication.")
if err := subFlags.Parse(args); err != nil {
return err
Expand All @@ -2328,11 +2329,11 @@ func commandCopySchemaShard(ctx context.Context, wr *wrangler.Wrangler, subFlags

sourceKeyspace, sourceShard, err := topoproto.ParseKeyspaceShard(subFlags.Arg(0))
if err == nil {
return wr.CopySchemaShardFromShard(ctx, tableArray, excludeTableArray, *includeViews, sourceKeyspace, sourceShard, destKeyspace, destShard, *waitSlaveTimeout)
return wr.CopySchemaShardFromShard(ctx, tableArray, excludeTableArray, *includeViews, sourceKeyspace, sourceShard, destKeyspace, destShard, *waitSlaveTimeout, *skipVerify)
}
sourceTabletAlias, err := topoproto.ParseTabletAlias(subFlags.Arg(0))
if err == nil {
return wr.CopySchemaShard(ctx, sourceTabletAlias, tableArray, excludeTableArray, *includeViews, destKeyspace, destShard, *waitSlaveTimeout)
return wr.CopySchemaShard(ctx, sourceTabletAlias, tableArray, excludeTableArray, *includeViews, destKeyspace, destShard, *waitSlaveTimeout, *skipVerify)
}
return err
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/workflow/resharding/mock_resharding_wrangler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/workflow/resharding/resharding_wrangler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// ReshardingWrangler is the interface to be used in creating mock interface for wrangler, which is used for unit test. It includes a subset of the methods in go/vt/Wrangler.
type ReshardingWrangler interface {
CopySchemaShardFromShard(ctx context.Context, tables, excludeTables []string, includeViews bool, sourceKeyspace, sourceShard, destKeyspace, destShard string, waitSlaveTimeout time.Duration) error
CopySchemaShardFromShard(ctx context.Context, tables, excludeTables []string, includeViews bool, sourceKeyspace, sourceShard, destKeyspace, destShard string, waitSlaveTimeout time.Duration, skipVerify bool) error

WaitForFilteredReplication(ctx context.Context, keyspace, shard string, maxDelay time.Duration) error

Expand Down
2 changes: 1 addition & 1 deletion go/vt/workflow/resharding/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (hw *horizontalReshardingWorkflow) runCopySchema(ctx context.Context, t *wo
destShard := t.Attributes["destination_shard"]
excludeTables := strings.Split(t.Attributes["exclude_tables"], ",")
return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, excludeTables /* excludeTableArray */, true, /*includeViews*/
keyspace, sourceShard, keyspace, destShard, wrangler.DefaultWaitSlaveTimeout)
keyspace, sourceShard, keyspace, destShard, wrangler.DefaultWaitSlaveTimeout, false)
}

func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *workflowpb.Task) error {
Expand Down
10 changes: 5 additions & 5 deletions go/vt/workflow/resharding/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func splitCloneCommand(keyspace string, useConsistentSnapshot bool, excludeTable
return args
}

func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables, splitDiffCommand string) []string {
func splitDiffCommand(keyspace string, shardID string, useConsistentSnapshot bool, excludeTables, splitDiffCommand string) []string {
args := []string{splitDiffCommand}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
Expand All @@ -208,9 +208,9 @@ func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot boo

switch splitDiffCommand {
case "SplitDiff":
args = append(args, "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace+"/"+shardId)
args = append(args, "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace+"/"+shardID)
case "MultiSplitDiff":
args = append(args, "--min_healthy_tablets=1", "--tablet_type=RDONLY", keyspace+"/"+shardId)
args = append(args, "--min_healthy_tablets=1", "--tablet_type=RDONLY", keyspace+"/"+shardID)
}

return args
Expand All @@ -219,8 +219,8 @@ func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot boo
func setupMockWrangler(ctrl *gomock.Controller, keyspace string) *MockReshardingWrangler {
mockWranglerInterface := NewMockReshardingWrangler(ctrl)
// Set the expected behaviors for mock wrangler.
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout, false).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout, false).Return(nil)

mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (int, error) {
func (rs *resharder) copySchema(ctx context.Context) error {
oneSource := rs.sourceShards[0].MasterAlias
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
return rs.wr.CopySchemaShard(ctx, oneSource, []string{"/.*"}, nil, false, rs.keyspace, target.ShardName(), 1*time.Second)
return rs.wr.CopySchemaShard(ctx, oneSource, []string{"/.*"}, nil, false, rs.keyspace, target.ShardName(), 1*time.Second, false)
})
return err
}
Expand Down
20 changes: 11 additions & 9 deletions go/vt/wrangler/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (wr *Wrangler) PreflightSchema(ctx context.Context, tabletAlias *topodatapb

// CopySchemaShardFromShard copies the schema from a source shard to the specified destination shard.
// For both source and destination it picks the master tablet. See also CopySchemaShard.
func (wr *Wrangler) CopySchemaShardFromShard(ctx context.Context, tables, excludeTables []string, includeViews bool, sourceKeyspace, sourceShard, destKeyspace, destShard string, waitSlaveTimeout time.Duration) error {
func (wr *Wrangler) CopySchemaShardFromShard(ctx context.Context, tables, excludeTables []string, includeViews bool, sourceKeyspace, sourceShard, destKeyspace, destShard string, waitSlaveTimeout time.Duration, skipVerify bool) error {
sourceShardInfo, err := wr.ts.GetShard(ctx, sourceKeyspace, sourceShard)
if err != nil {
return fmt.Errorf("GetShard(%v, %v) failed: %v", sourceKeyspace, sourceShard, err)
Expand All @@ -287,14 +287,14 @@ func (wr *Wrangler) CopySchemaShardFromShard(ctx context.Context, tables, exclud
return fmt.Errorf("no master in shard record %v/%v. Consider running 'vtctl InitShardMaster' in case of a new shard or to reparent the shard to fix the topology data, or providing a non-master tablet alias", sourceKeyspace, sourceShard)
}

return wr.CopySchemaShard(ctx, sourceShardInfo.MasterAlias, tables, excludeTables, includeViews, destKeyspace, destShard, waitSlaveTimeout)
return wr.CopySchemaShard(ctx, sourceShardInfo.MasterAlias, tables, excludeTables, includeViews, destKeyspace, destShard, waitSlaveTimeout, skipVerify)
}

// CopySchemaShard copies the schema from a source tablet to the
// specified shard. The schema is applied directly on the master of
// the destination shard, and is propogated to the replicas through
// binlogs.
func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool, destKeyspace, destShard string, waitSlaveTimeout time.Duration) error {
func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool, destKeyspace, destShard string, waitSlaveTimeout time.Duration, skipVerify bool) error {
destShardInfo, err := wr.ts.GetShard(ctx, destKeyspace, destShard)
if err != nil {
return fmt.Errorf("GetShard(%v, %v) failed: %v", destKeyspace, destShard, err)
Expand Down Expand Up @@ -349,12 +349,14 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo
// In that case, MySQL would have skipped our CREATE DATABASE IF NOT EXISTS
// statement. We want to fail early in this case because vtworker SplitDiff
// fails in case of such an inconsistency as well.
diffs, err = wr.compareSchemas(ctx, sourceTabletAlias, destShardInfo.MasterAlias, tables, excludeTables, includeViews)
if err != nil {
return fmt.Errorf("CopySchemaShard failed because schemas could not be compared finally: %v", err)
}
if diffs != nil {
return fmt.Errorf("CopySchemaShard was not successful because the schemas between the two tablets %v and %v differ: %v", sourceTabletAlias, destShardInfo.MasterAlias, diffs)
if !skipVerify {
diffs, err = wr.compareSchemas(ctx, sourceTabletAlias, destShardInfo.MasterAlias, tables, excludeTables, includeViews)
if err != nil {
return fmt.Errorf("CopySchemaShard failed because schemas could not be compared finally: %v", err)
}
if diffs != nil {
return fmt.Errorf("CopySchemaShard was not successful because the schemas between the two tablets %v and %v differ: %v", sourceTabletAlias, destShardInfo.MasterAlias, diffs)
}
}

// Notify slaves to reload schema. This is best-effort.
Expand Down