Skip to content

Commit

Permalink
Address my own nits.
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Oct 24, 2023
1 parent 50c8e92 commit a00d764
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go/vt/sidecardb/schema/vdiff/vdiff.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS vdiff
`started_at` timestamp NULL DEFAULT NULL,
`liveness_timestamp` timestamp NULL DEFAULT NULL,
`completed_at` timestamp NULL DEFAULT NULL,
`last_error` varbinary(512) DEFAULT NULL,
`last_error` varbinary(1024) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uuid_idx` (`vdiff_uuid`),
KEY `state` (`state`),
Expand Down
13 changes: 3 additions & 10 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vdiff

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -161,21 +162,13 @@ func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffSta
extraCols = ", completed_at = utc_timestamp()"
default:
}
var errorString string
if err == nil {
// Clear out any previous error for the vdiff on this shard
errorString = ""
} else {
// limit the error string to be within column length of `last_error`
const MaxErrorLength = 500
errorString = err.Error()
if len(errorString) > MaxErrorLength {
errorString = errorString[:MaxErrorLength]
}
err = errors.New("")
}
query := sqlparser.BuildParsedQuery(sqlUpdateVDiffState,
encodeString(string(state)),
encodeString(errorString),
encodeString(err.Error()),
extraCols,
ct.id,
)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.id = %a`
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
Expand Down
23 changes: 11 additions & 12 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// how long to wait for background operations to complete
Expand Down Expand Up @@ -127,23 +128,21 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

shardStreamsCtx, shardStreamsCancel := context.WithCancel(ctx)
td.shardStreamsCtx = shardStreamsCtx
td.shardStreamsCancel = shardStreamsCancel
td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx)

if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
return err
}
if err := td.startSourceDataStreams(shardStreamsCtx); err != nil {
if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil {
return err
}
if err := td.syncTargetStreams(ctx); err != nil {
return err
}
if err := td.startTargetDataStream(shardStreamsCtx); err != nil {
if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil {
return err
}
td.setupRowSorters()
Expand Down Expand Up @@ -213,7 +212,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
targetTablet *topodatapb.Tablet
)

// The cells from the vdiff record are a comma separated list.
Expand Down Expand Up @@ -264,7 +263,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa

func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error {
defer func() {
td.shardStreamsCancel()

// Wait for all the shard streams to finish before returning.
td.wgShardStreamers.Wait()
}()
Expand All @@ -160,7 +162,6 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, err)
return err
}
td.shardStreamsCancel()
log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr)
if dr.ExtraRowsSource > 0 || dr.ExtraRowsTarget > 0 {
if err := wd.reconcileExtraRows(dr, wd.opts.CoreOptions.MaxExtraRowsToCompare); err != nil {
Expand Down

0 comments on commit a00d764

Please sign in to comment.