Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefer using REPLICA tablets when selecting vreplication sources #10040

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ type TabletPicker struct {

// NewTabletPicker returns a TabletPicker.
func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) {
inOrder := false
if strings.HasPrefix(tabletTypesStr, inOrderHint) {
inOrder = true
tabletTypesStr = tabletTypesStr[len(inOrderHint):]
}
tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr)
tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr)
}
Expand Down
18 changes: 18 additions & 0 deletions go/vt/discovery/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ limitations under the License.

package discovery

import (
"strings"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/topoproto"
)

// This file contains helper filter methods to process the unfiltered list of
// tablets returned by LegacyHealthCheck.GetTabletStatsFrom*.
// See also legacy_replicationlag.go for a more sophisicated filter used by vtgate.
Expand All @@ -37,3 +44,14 @@ func RemoveUnhealthyTablets(tabletStatsList []LegacyTabletStats) []LegacyTabletS
}
return result
}

func ParseTabletTypesAndOrder(tabletTypesStr string) ([]topodatapb.TabletType, bool, error) {
inOrder := false
if strings.HasPrefix(tabletTypesStr, inOrderHint) {
inOrder = true
tabletTypesStr = tabletTypesStr[len(inOrderHint):]
}
tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr)

return tabletTypes, inOrder, err
}
14 changes: 7 additions & 7 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ var commands = []commandGroup{
name: "Reshard",
method: commandReshard,
params: "[-source_shards=<source_shards>] [-target_shards=<target_shards>] [-cells=<cells>] [-tablet_types=<source_tablet_types>] [-skip_schema_copy] <action> 'action must be one of the following: Create, Complete, Cancel, SwitchTraffic, ReverseTrafffic, Show, or Progress' <keyspace.workflow>",
help: "Start a Resharding process. Example: Reshard -cells='zone1,alias1' -tablet_types='primary,replica,rdonly' ks.workflow001 '0' '-80,80-'",
help: "Start a Resharding process. Example: Reshard -cells='zone1,alias1' -tablet_types='PRIMARY,REPLICA,RDONLY' ks.workflow001 '0' '-80,80-'",
},
{
name: "MoveTables",
Expand Down Expand Up @@ -510,7 +510,7 @@ var commands = []commandGroup{
{
name: "VDiff",
method: commandVDiff,
params: "[-source_cell=<cell>] [-target_cell=<cell>] [-tablet_types=primary,replica,rdonly] [-filtered_replication_wait_time=30s] [-max_extra_rows_to_compare=1000] <keyspace.workflow>",
params: "[-source_cell=<cell>] [-target_cell=<cell>] [-tablet_types=PRIMARY,REPLICA,RDONLY] [-filtered_replication_wait_time=30s] [-max_extra_rows_to_compare=1000] <keyspace.workflow>",
help: "Perform a diff of all tables in the workflow",
},
{
Expand Down Expand Up @@ -2257,7 +2257,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla

workflow := subFlags.String("workflow", "", "Workflow name. Can be any descriptive string. Will be used to later migrate traffic via SwitchReads/SwitchWrites.")
cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from (e.g. PRIMARY, REPLICA, RDONLY). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of PRIMARY,REPLICA.")
mattlord marked this conversation as resolved.
Show resolved Hide resolved
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from (e.g. PRIMARY, REPLICA, RDONLY). Defaults to --vreplication_tablet_type parameter value for the tablet, which has the default value of in_order:REPLICA,PRIMARY.")
allTables := subFlags.Bool("all", false, "Move all tables from the source keyspace")
excludes := subFlags.String("exclude", "", "Tables to exclude (comma-separated) if -all is specified")

Expand Down Expand Up @@ -2330,7 +2330,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
const defaultMaxReplicationLagAllowed = defaultWaitTime

cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "primary,replica,rdonly", "Source tablet types to replicate from (e.g. primary, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
tabletTypes := subFlags.String("tablet_types", "in_order:REPLICA,PRIMARY", "Source tablet types to replicate from (e.g. PRIMARY, REPLICA, RDONLY). Defaults to --vreplication_tablet_type parameter value for the tablet, which has the default value of in_order:REPLICA,PRIMARY.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken. -dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete.")
timeout := subFlags.Duration("timeout", defaultWaitTime, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on primary migrations. The migration will be cancelled on a timeout. -timeout is only supported for SwitchTraffic and ReverseTraffic.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication (default true). -reverse_replication is only supported for SwitchTraffic.")
Expand Down Expand Up @@ -2486,7 +2486,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
vrwp.Cells = *cells
vrwp.TabletTypes = *tabletTypes
if vrwp.TabletTypes == "" {
vrwp.TabletTypes = "primary,replica,rdonly"
vrwp.TabletTypes = "in_order:REPLICA,PRIMARY"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was rdonly removed on purpose?

Copy link
Contributor Author

@mattlord mattlord Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, according to our docs and --help output we use the default of the vttablet --vreplication_tablet_type flag if you don't specify and that's what we're actually now doing here. After this PR, the only place RDONLY is potentially used by default is for VDiff (maybe rdonly had ended up here due to a cut and paste from the VDiff code?).

Let me know if not using the vttablet --vreplication_tablet_type default was intentional here before and perhaps the docs were incorrect.

Copy link
Contributor Author

@mattlord mattlord Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to this, I'm not sure why we have this if vrwp.TabletTypes == "" { code block at all since we are specifying a default value here: https://github.com/vitessio/vitess/pull/10040/files#diff-ad37010accc5a29c6751d18124328bb882457e8b2527db688b854c81fe23861cR2333

Maybe that wasn't always working as I'd expect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to this, I'm not sure why we have this if vrwp.TabletTypes == "" { code block at all since we are specifying a

This might be defensive coding for when an empty string is specified for tablet types on the command line or in tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohit-nayak-ps turns out that you were right! I shouldn't have removed that. Re-adding it fix this: #10421

🤦

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, after further testing... this didn't actually change any behavior. We were never switching RDONLY traffic by default -- as the tablet_types flag variable was never an empty string as it has a default value (which is what we changed) -- and we weren't testing for this either. Will correct both in #10421.

}
vrwp.Timeout = *timeout
vrwp.EnableReverseReplication = *reverseReplication
Expand Down Expand Up @@ -2742,7 +2742,7 @@ func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFl
func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
sourceCell := subFlags.String("source_cell", "", "The source cell to compare from; default is any available cell")
targetCell := subFlags.String("target_cell", "", "The target cell to compare with; default is any available cell")
tabletTypes := subFlags.String("tablet_types", "primary,replica,rdonly", "Tablet types for source and target")
tabletTypes := subFlags.String("tablet_types", "in_order:RDONLY,REPLICA,PRIMARY", "Tablet types for source and target")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on primary migrations. The migration will be cancelled on a timeout.")
maxRows := subFlags.Int64("limit", math.MaxInt64, "Max rows to stop comparing after")
debugQuery := subFlags.Bool("debug_query", false, "Adds a mysql query to the report that can be used for further debugging")
Expand Down Expand Up @@ -2786,7 +2786,7 @@ func splitKeyspaceWorkflow(in string) (keyspace, workflow string, err error) {
func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
skipReFreshState := subFlags.Bool("skip-refresh-state", false, "Skips refreshing the state of the source tablets after the migration, meaning that the refresh will need to be done manually, replica and rdonly only)")
skipReFreshState := subFlags.Bool("skip-refresh-state", false, "Skips refreshing the state of the source tablets after the migration, meaning that the refresh will need to be done manually, REPLICA and RDONLY only)")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on primary migrations. The migration will be cancelled on a timeout.")
reverseReplication := subFlags.Bool("reverse_replication", false, "For primary migration, enabling this flag reverses replication which allows you to rollback")
if err := subFlags.Parse(args); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func init() {

// this are the default tablet_types that will be used by the tablet picker to find sources for a vreplication stream
// it can be overridden by passing a different list to the MoveTables or Reshard commands
var tabletTypesStr = flag.String("vreplication_tablet_type", "PRIMARY,REPLICA", "comma separated list of tablet types used as a source")
var tabletTypesStr = flag.String("vreplication_tablet_type", "in_order:REPLICA,PRIMARY", "comma separated list of tablet types used as a source")

// waitRetryTime can be changed to a smaller value for tests.
// A VReplication stream can be created by sending an insert statement
Expand Down
40 changes: 18 additions & 22 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vtgate/evalengine"

Expand Down Expand Up @@ -375,25 +375,18 @@ func (vrw *VReplicationWorkflow) getCellsAsArray() []string {
return nil
}

func (vrw *VReplicationWorkflow) getTabletTypes() []topodatapb.TabletType {
tabletTypesArr := strings.Split(vrw.params.TabletTypes, ",")
var tabletTypes []topodatapb.TabletType
for _, tabletType := range tabletTypesArr {
servedType, _ := topoproto.ParseTabletType(tabletType)
tabletTypes = append(tabletTypes, servedType)
}
return tabletTypes
}

func (vrw *VReplicationWorkflow) parseTabletTypes() (hasReplica, hasRdonly, hasPrimary bool, err error) {
tabletTypesArr := strings.Split(vrw.params.TabletTypes, ",")
for _, tabletType := range tabletTypesArr {
switch strings.ToLower(tabletType) {
case "replica":
tabletTypes, _, err := discovery.ParseTabletTypesAndOrder(vrw.params.TabletTypes)
if err != nil {
return false, false, false, err
}
for _, tabletType := range tabletTypes {
switch tabletType {
case topodatapb.TabletType_REPLICA:
hasReplica = true
case "rdonly":
case topodatapb.TabletType_RDONLY:
hasRdonly = true
case "primary", "master":
case topodatapb.TabletType_PRIMARY:
hasPrimary = true
default:
return false, false, false, fmt.Errorf("invalid tablet type passed %s", tabletType)
Expand Down Expand Up @@ -421,15 +414,18 @@ func (vrw *VReplicationWorkflow) initReshard() error {

func (vrw *VReplicationWorkflow) switchReads() (*[]string, error) {
log.Infof("In VReplicationWorkflow.switchReads() for %+v", vrw)
var tabletTypes []topodatapb.TabletType
for _, tt := range vrw.getTabletTypes() {
fullTabletTypes, _, err := discovery.ParseTabletTypesAndOrder(vrw.params.TabletTypes)
if err != nil {
return nil, err
}
var nonPrimaryTabletTypes []topodatapb.TabletType
for _, tt := range fullTabletTypes {
if tt != topodatapb.TabletType_PRIMARY {
tabletTypes = append(tabletTypes, tt)
nonPrimaryTabletTypes = append(nonPrimaryTabletTypes, tt)
}
}
var dryRunResults *[]string
var err error
dryRunResults, err = vrw.wr.SwitchReads(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, tabletTypes,
dryRunResults, err = vrw.wr.SwitchReads(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, nonPrimaryTabletTypes,
vrw.getCellsAsArray(), vrw.params.Direction, vrw.params.DryRun)
if err != nil {
return nil, err
Expand Down
10 changes: 8 additions & 2 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -72,16 +73,21 @@ func TestReshardingWorkflowErrorsAndMisc(t *testing.T) {
mtwf.ws.WritesSwitched = true
require.Errorf(t, mtwf.Cancel(), ErrWorkflowPartiallySwitched)

tabletTypes, _, err := discovery.ParseTabletTypesAndOrder(mtwf.params.TabletTypes)
require.NoError(t, err)

require.ElementsMatch(t, mtwf.getCellsAsArray(), []string{"cell1", "cell2"})
require.ElementsMatch(t, mtwf.getTabletTypes(), []topodata.TabletType{topodata.TabletType_REPLICA, topodata.TabletType_RDONLY})
require.ElementsMatch(t, tabletTypes, []topodata.TabletType{topodata.TabletType_REPLICA, topodata.TabletType_RDONLY})
hasReplica, hasRdonly, hasPrimary, err := mtwf.parseTabletTypes()
require.NoError(t, err)
require.True(t, hasReplica)
require.True(t, hasRdonly)
require.False(t, hasPrimary)

mtwf.params.TabletTypes = "replica,rdonly,primary"
require.ElementsMatch(t, mtwf.getTabletTypes(),
tabletTypes, _, err = discovery.ParseTabletTypesAndOrder(mtwf.params.TabletTypes)
require.NoError(t, err)
require.ElementsMatch(t, tabletTypes,
[]topodata.TabletType{topodata.TabletType_REPLICA, topodata.TabletType_RDONLY, topodata.TabletType_PRIMARY})

hasReplica, hasRdonly, hasPrimary, err = mtwf.parseTabletTypes()
Expand Down