diff --git a/go/vt/sidecardb/schema/vdiff/vdiff.sql b/go/vt/sidecardb/schema/vdiff/vdiff.sql index 5eae9270460..52392bde427 100644 --- a/go/vt/sidecardb/schema/vdiff/vdiff.sql +++ b/go/vt/sidecardb/schema/vdiff/vdiff.sql @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS vdiff `started_at` timestamp NULL DEFAULT NULL, `liveness_timestamp` timestamp NULL DEFAULT NULL, `completed_at` timestamp NULL DEFAULT NULL, - `last_error` varbinary(512) DEFAULT NULL, + `last_error` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `uuid_idx` (`vdiff_uuid`), KEY `state` (`state`), diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 1ba6e264b8c..de93895a4eb 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -18,6 +18,7 @@ package vdiff import ( "context" + "errors" "fmt" "strings" "time" @@ -161,21 +162,13 @@ func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffSta extraCols = ", completed_at = utc_timestamp()" default: } - var errorString string if err == nil { // Clear out any previous error for the vdiff on this shard - errorString = "" - } else { - // limit the error string to be within column length of `last_error` - const MaxErrorLength = 500 - errorString = err.Error() - if len(errorString) > MaxErrorLength { - errorString = errorString[:MaxErrorLength] - } + err = errors.New("") } query := sqlparser.BuildParsedQuery(sqlUpdateVDiffState, encodeString(string(state)), - encodeString(errorString), + encodeString(err.Error()), extraCols, ct.id, ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index f9f48cc72e9..4a00b2194ba 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -39,7 +39,7 @@ const ( from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where vd.id = %a` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` - sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" + sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d" sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'` sqlGetVReplicationEntry = "select * from _vt.vreplication %s" diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index dd1b5f07e7d..de035d0df70 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -34,11 +34,6 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -46,6 +41,12 @@ import ( "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // how long to wait for background operations to complete @@ -127,9 +128,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() - shardStreamsCtx, shardStreamsCancel := context.WithCancel(ctx) - td.shardStreamsCtx = shardStreamsCtx - td.shardStreamsCancel = shardStreamsCancel + td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx) if err := td.selectTablets(ctx); err != nil { return err @@ -137,13 +136,13 @@ func (td *tableDiffer) initialize(ctx context.Context) error { if err := td.syncSourceStreams(ctx); err != nil { return err } - if err := td.startSourceDataStreams(shardStreamsCtx); err != nil { + if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil { return err } if err := td.syncTargetStreams(ctx); err != nil { return err } - if err := td.startTargetDataStream(shardStreamsCtx); err != nil { + if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil { return err } td.setupRowSorters() @@ -213,7 +212,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { var ( wg sync.WaitGroup sourceErr, targetErr error - targetTablet *topodata.Tablet + targetTablet *topodatapb.Tablet ) // The cells from the vdiff record are a comma separated list. @@ -264,7 +263,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { return targetErr } -func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { +func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) { tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 10cfcdda17f..adaf87d6ee4 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -135,6 +135,8 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error { defer func() { + td.shardStreamsCancel() + // Wait for all the shard streams to finish before returning. td.wgShardStreamers.Wait() }() @@ -160,7 +162,6 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, err) return err } - td.shardStreamsCancel() log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr) if dr.ExtraRowsSource > 0 || dr.ExtraRowsTarget > 0 { if err := wd.reconcileExtraRows(dr, wd.opts.CoreOptions.MaxExtraRowsToCompare); err != nil {