Skip to content

Commit

Permalink
Wait for each table diff's shard streamers to cleanup before starting…
Browse files Browse the repository at this point in the history
… the next table or exiting. Also limit the size of the error being written to the vdiff record since we have noticed some errors, which contain gtids can be arbitrarily large.

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Oct 23, 2023
1 parent b5cbd36 commit 50c8e92
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
13 changes: 10 additions & 3 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package vdiff

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -162,13 +161,21 @@ func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffSta
extraCols = ", completed_at = utc_timestamp()"
default:
}
var errorString string
if err == nil {
// Clear out any previous error for the vdiff on this shard
err = errors.New("")
errorString = ""
} else {
// limit the error string to be within column length of `last_error`
const MaxErrorLength = 500
errorString = err.Error()
if len(errorString) > MaxErrorLength {
errorString = errorString[:MaxErrorLength]
}
}
query := sqlparser.BuildParsedQuery(sqlUpdateVDiffState,
encodeString(string(state)),
encodeString(err.Error()),
encodeString(errorString),
extraCols,
ct.id,
)
Expand Down
16 changes: 14 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type tableDiffer struct {
sourceQuery string
table *tabletmanagerdatapb.TableDefinition
lastPK *querypb.QueryResult

// wgShardStreamers is used, with a cancellable context, to wait for all shard streamers
// to finish after each diff is complete.
wgShardStreamers sync.WaitGroup
shardStreamsCtx context.Context
shardStreamsCancel context.CancelFunc
}

func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefinition, sourceQuery string) *tableDiffer {
Expand Down Expand Up @@ -121,19 +127,23 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

shardStreamsCtx, shardStreamsCancel := context.WithCancel(ctx)
td.shardStreamsCtx = shardStreamsCtx
td.shardStreamsCancel = shardStreamsCancel

if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
return err
}
if err := td.startSourceDataStreams(ctx); err != nil {
if err := td.startSourceDataStreams(shardStreamsCtx); err != nil {
return err
}
if err := td.syncTargetStreams(ctx); err != nil {
return err
}
if err := td.startTargetDataStream(ctx); err != nil {
if err := td.startTargetDataStream(shardStreamsCtx); err != nil {
return err
}
td.setupRowSorters()
Expand Down Expand Up @@ -353,11 +363,13 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err
}

func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) {
td.wgShardStreamers.Add(1)
log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query)
defer func() {
log.Infof("streamOneShard End on %s", participant.tablet.Alias.String())
close(participant.result)
close(gtidch)
td.wgShardStreamers.Done()
}()
participant.err = func() error {
conn, err := tabletconn.GetDialer()(participant.tablet, false)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa
}

func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error {
defer func() {
// Wait for all the shard streams to finish before returning.
td.wgShardStreamers.Wait()
}()

select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
Expand All @@ -155,6 +160,7 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, err)
return err
}
td.shardStreamsCancel()
log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr)
if dr.ExtraRowsSource > 0 || dr.ExtraRowsTarget > 0 {
if err := wd.reconcileExtraRows(dr, wd.opts.CoreOptions.MaxExtraRowsToCompare); err != nil {
Expand Down

0 comments on commit 50c8e92

Please sign in to comment.