Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: better error messages in cut-over phase #17052

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 40 additions & 34 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
if err != nil {
return err
return vterrors.Wrapf(err, "finding queries potentially operating on table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not make more sense to say ~ "failed to find queries ..." and similar for some others like the one just below?

}

log.Infof("killTableLockHoldersAndAccessors: found %v potential queries", len(rs.Rows))
Expand Down Expand Up @@ -841,7 +841,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
if err != nil {
return err
return vterrors.Wrapf(err, "finding transactions locking table")
}
log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows))
for _, row := range rs.Named().Rows {
Expand All @@ -861,7 +861,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
// cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, shouldForceCutOver bool) error {
if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed incrementing cutover attempts")
}

tmClient := e.tabletManagerClient()
Expand All @@ -870,19 +870,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// sanity checks:
vreplTable, err := getVreplTable(s)
if err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed getting vreplication table")
}

// get topology client & entities:
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed reading vreplication table")
}

// information about source tablet
onlineDDL, _, err := e.readMigration(ctx, s.workflow)
if err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed reading migration")
}
isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite()
e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over")
Expand All @@ -896,7 +896,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, s.id, replication.EncodePosition(pos)); err != nil {
return err
if s, _ := e.readVReplStream(ctx, s.workflow, true); s != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As added bonus, why not also report what position we did land at.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also in the error message from the VReplication Engine FWIW.

err = vterrors.Wrapf(err, "read vrepl position %v", s.pos)
}
return vterrors.Wrapf(err, "failed waiting for position %v", replication.EncodePosition(pos))
}
// Target is now in sync with source!
return nil
Expand All @@ -910,14 +913,14 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// in that place as possible.
sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
if err != nil {
return nil
return vterrors.Wrapf(err, "failed creating sentry table name")
}

// We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We
// don't want to overload the buffering time with this excessive wait.

if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil {
return err
return vterrors.Wrapf(err, "failed updating artifacts with sentry table name")
}

dropSentryTableQuery := sqlparser.BuildParsedQuery(sqlDropTableIfExists, sentryTableName)
Expand All @@ -941,30 +944,30 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
}()
parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
return vterrors.Wrapf(err, "failed creating sentry table")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName)

postSentryPos, err := e.primaryPosition(ctx)
if err != nil {
return err
return vterrors.Wrapf(err, "failed getting primary pos after sentry creation")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", replication.EncodePosition(postSentryPos))
if err := waitForPos(s, postSentryPos); err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for pos after sentry creation")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached")
}

lockConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
return vterrors.Wrapf(err, "failed getting locking connection")
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 5*migrationCutOverThreshold)
if err != nil {
return err
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on locking connection")
}
defer lockConn.Recycle()
defer lockConnRestoreLockWaitTimeout()
Expand All @@ -974,13 +977,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
renameWasSuccessful := false
renameConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
return vterrors.Wrapf(err, "failed getting rename connection")
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 5*migrationCutOverThreshold*4)
if err != nil {
return err
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on rename connection")
}
defer renameConn.Recycle()
defer func() {
Expand All @@ -996,7 +999,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable
preserveFKSupported, err := e.isPreserveForeignKeySupported(ctx)
if err != nil {
return err
return vterrors.Wrapf(err, "failed checking for 'rename_table_preserve_foreign_key' support")
}
if preserveFKSupported {
// This code is only applicable when MySQL supports the 'rename_table_preserve_foreign_key' variable. This variable
Expand All @@ -1019,7 +1022,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
for {
renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.Conn.ID(), "rename")
if err != nil {
return err
return vterrors.Wrapf(err, "searching for rename process")
}
if renameProcessFound {
return nil
Expand Down Expand Up @@ -1053,7 +1056,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
bufferingContextCancel()
// force re-read of tables
if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil {
return err
return vterrors.Wrapf(err, "refreshing table state")
}
}
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
Expand All @@ -1073,7 +1076,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
err = toggleBuffering(true)
defer reenableWritesOnce()
if err != nil {
return err
return vterrors.Wrapf(err, "failed enabling buffering")
}
// Give a fraction of a second for a scenario where a query is in
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries:
Expand All @@ -1086,10 +1089,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// We should only proceed with forceful cut over if there is no pending atomic transaction for the table.
// This will help in keeping the atomicity guarantee of a prepared transaction.
if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil {
return err
return vterrors.Wrapf(err, "checking prepared pool for table")
}
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
return err
return vterrors.Wrapf(err, "failed killing table lock holders and accessors")
}
}

Expand All @@ -1112,7 +1115,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
defer cancel()
lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table)
if _, err := lockConn.Conn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil {
return err
return vterrors.Wrapf(err, "failed locking tables")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables")
Expand All @@ -1124,15 +1127,15 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// the rename should block, because of the LOCK. Wait for it to show up.
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block")
if err := waitForRenameProcess(); err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for rename process")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos")
postWritesPos, err := e.primaryPosition(ctx)
if err != nil {
return err
return vterrors.Wrapf(err, "failed reading pos after locking")
}

// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
Expand All @@ -1144,19 +1147,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos:
s, err = e.readVReplStream(ctx, s.workflow, false)
if err != nil {
return err
return vterrors.Wrapf(err, "failed reading vreplication table after locking")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", replication.EncodePosition(postWritesPos))
if err := waitForPos(s, postWritesPos); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err)
return err
return vterrors.Wrapf(err, "failed waiting for pos after locking")
}
go log.Infof("cutOverVReplMigration %v: done waiting for position %v", s.workflow, replication.EncodePosition(postWritesPos))
// Stop vreplication
e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication")
if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(s.id, "stopped for online DDL cutover")); err != nil {
return err
return vterrors.Wrapf(err, "failed stopping vreplication")
}
go log.Infof("cutOverVReplMigration %v: stopped vreplication", s.workflow)

Expand All @@ -1173,7 +1176,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
} else {
e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place")
if err := waitForRenameProcess(); err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for rename process before dropping sentry table")
}

// Normal (non-testing) alter table
Expand All @@ -1184,23 +1187,23 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold)
defer cancel()
if _, err := lockConn.Conn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil {
return err
return vterrors.Wrapf(err, "failed dropping sentry table")
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold)
defer cancel()
e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables")
if _, err := lockConn.Conn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil {
return err
return vterrors.Wrapf(err, "failed unlocking tables")
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold)
defer cancel()
e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete")
if err := <-renameCompleteChan; err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for rename to complete")
}
renameWasSuccessful = true
}
Expand Down Expand Up @@ -3784,8 +3787,10 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if err := e.cutOverVReplMigration(ctx, s, shouldForceCutOver); err != nil {
_ = e.updateMigrationMessage(ctx, uuid, err.Error())
log.Errorf("cutOverVReplMigration failed: err=%v", err)
if merr, ok := err.(*sqlerror.SQLError); ok {
switch merr.Num {

if sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError); isSQLErr && sqlErr != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required because the underlying SQL error is now likely to be wrapped by a vitess error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we have any wrapping, but I think places where we cast errors directly probably should use https://pkg.go.dev/errors#example-As these days. So if we're changing this, maybe also switch to that?

Copy link
Contributor Author

@shlomi-noach shlomi-noach Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlerror.NewSQLErrorFromError is even a bit more elaborate and can parse a MySQL error out of a string. We use that pretty consistently across the repo.
I'd say if we wanted to change that, it would be in a dedicated PR in one giant sweep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlerror.NewSQLErrorFromError

It's not about this function though? It's about the cast after?

Copy link
Contributor Author

@shlomi-noach shlomi-noach Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the cast at that same line... Rich story. We tried to get rid of it altogether, but hit the (weird, IMO) golang design decision where an interface (SQLError) can be non-nil where the underlying implementation is nil. So we kind of have to use the cast. What we know, though, is that the result of sqlerror.NewSQLErrorFromError is either a fully unwrapped sqlerror.SQLError or it is nil. So no need to use As on the result of this function.

Reference: #12574

// let's see if this error is actually acceptable
switch sqlErr.Num {
case sqlerror.ERTooLongIdent:
go e.CancelMigration(ctx, uuid, err.Error(), false)
}
Expand Down Expand Up @@ -5160,6 +5165,7 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context,
return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds, rowsCopied, hint)
}

// checkOnPreparedPool checks if there are any cross-shard prepared transactions on the given table
func (e *Executor) checkOnPreparedPool(ctx context.Context, table string, waitTime time.Duration) error {
if e.isPreparedPoolEmpty(table) {
return nil
Expand Down
Loading