Skip to content

Commit

Permalink
sql,backupccl: Use Wrapf instead of Assertions in txn callbacks
Browse files Browse the repository at this point in the history
The AssertionError wrappers hid the retryable nature of the errors,
causing failures in cases that should have been retried.

Most of the affected wrappers were changed from Wrapf to
NewAssertionError in cockroachdb#35821. I looked at all uses of
NewAssertionErrorWithWrappedErrf and changed all the ones where a
client.Txn was in scope unless they were clearly not going to be
txn-related (for example, proto unmarshaling errors).

Release note (bug fix): Transaction retries in schema changes are
again handled correctly.
  • Loading branch information
bdarnell committed Jul 9, 2019
1 parent ea7ebb9 commit 32f3de3
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func ensureInterleavesIncluded(tables []*sqlbase.TableDescriptor) error {
func allRangeDescriptors(ctx context.Context, txn *client.Txn) ([]roachpb.RangeDescriptor, error) {
rows, err := txn.Scan(ctx, keys.Meta2Prefix, keys.MetaMax, 0)
if err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err,
return nil, errors.Wrapf(err,
"unable to scan range descriptors")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func allocateTableRewrites(
{
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, parentID)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
}

Expand Down Expand Up @@ -958,7 +958,7 @@ func WriteTableDescs(
} else {
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, tables[i].ParentID)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(tables[i].ParentID))
}
// TODO(mberhault): CheckPrivilege wants a planner.
Expand All @@ -984,7 +984,7 @@ func WriteTableDescs(

for _, table := range tables {
if err := table.Validate(ctx, txn, settings); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"validate table %d", errors.Safe(table.ID))
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,7 @@ func (sc *SchemaChanger) updateJobRunningStatus(
ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return status, nil
}); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(*sc.job.ID()))
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (p *planner) initiateDropTable(
}

if err := job.WithTxn(p.txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to mark job %d as as successful", errors.Safe(jobID))
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (sc *SchemaChanger) DropTableDesc(
b.DelRange(dbZoneKeyPrefix, dbZoneKeyPrefix.PrefixEnd(), false /* returnKeys */)
return nil
}); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to update job %d", errors.Safe(tableDesc.GetDropJobID()))
}
}
Expand Down Expand Up @@ -1067,7 +1067,7 @@ func (sc *SchemaChanger) initJobRunningStatus(ctx context.Context) error {
ctx, func(ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return runStatus, nil
}); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
}
}
Expand Down Expand Up @@ -1170,7 +1170,7 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill(ctx context.Context) erro
if err := sc.job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return runStatus, nil
}); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
}
}
Expand Down Expand Up @@ -1261,14 +1261,14 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}, func(txn *client.Txn) error {
if jobSucceeded {
if err := sc.job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to mark job %d as successful", errors.Safe(*sc.job.ID()))
}
} else {
if err := sc.job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return RunningStatusWaitingGC, nil
}); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/sqlbase/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,12 +1195,12 @@ func (desc *TableDescriptor) validateCrossReferences(ctx context.Context, txn *c
findTargetIndex := func(tableID ID, indexID IndexID) (*TableDescriptor, *IndexDescriptor, error) {
targetTable, err := getTable(tableID)
if err != nil {
return nil, nil, errors.NewAssertionErrorWithWrappedErrf(err,
return nil, nil, errors.Wrapf(err,
"missing table=%d index=%d", errors.Safe(tableID), errors.Safe(indexID))
}
targetIndex, err := targetTable.FindIndexByID(indexID)
if err != nil {
return nil, nil, errors.NewAssertionErrorWithWrappedErrf(err,
return nil, nil, errors.Wrapf(err,
"missing table=%s index=%d", targetTable.Name, errors.Safe(indexID))
}
return targetTable, targetIndex, nil
Expand All @@ -1212,7 +1212,7 @@ func (desc *TableDescriptor) validateCrossReferences(ctx context.Context, txn *c
targetTable, targetIndex, err := findTargetIndex(
index.ForeignKey.Table, index.ForeignKey.Index)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid foreign key")
return errors.Wrapf(err, "invalid foreign key")
}
found := false
for _, backref := range targetIndex.ReferencedBy {
Expand All @@ -1234,12 +1234,12 @@ func (desc *TableDescriptor) validateCrossReferences(ctx context.Context, txn *c
fkBackrefs[backref] = struct{}{}
targetTable, err := getTable(backref.Table)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid fk backreference table=%d index=%d",
return errors.Wrapf(err, "invalid fk backreference table=%d index=%d",
backref.Table, errors.Safe(backref.Index))
}
targetIndex, err := targetTable.FindIndexByID(backref.Index)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid fk backreference table=%s index=%d",
return errors.Wrapf(err, "invalid fk backreference table=%s index=%d",
targetTable.Name, errors.Safe(backref.Index))
}
if fk := targetIndex.ForeignKey; fk.Table != desc.ID || fk.Index != index.ID {
Expand All @@ -1255,7 +1255,7 @@ func (desc *TableDescriptor) validateCrossReferences(ctx context.Context, txn *c
ancestor := index.Interleave.Ancestors[len(index.Interleave.Ancestors)-1]
targetTable, targetIndex, err := findTargetIndex(ancestor.TableID, ancestor.IndexID)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid interleave")
return errors.Wrapf(err, "invalid interleave")
}
found := false
for _, backref := range targetIndex.InterleavedBy {
Expand All @@ -1278,13 +1278,13 @@ func (desc *TableDescriptor) validateCrossReferences(ctx context.Context, txn *c
interleaveBackrefs[backref] = struct{}{}
targetTable, err := getTable(backref.Table)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"invalid interleave backreference table=%d index=%d",
backref.Table, backref.Index)
}
targetIndex, err := targetTable.FindIndexByID(backref.Index)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
return errors.Wrapf(err,
"invalid interleave backreference table=%s index=%d",
targetTable.Name, backref.Index)
}
Expand Down

0 comments on commit 32f3de3

Please sign in to comment.