Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachpb: improve AmbiguousResultError #65714

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
if r < 0 {
return nil
}
return roachpb.NewError(roachpb.NewAmbiguousResultError(strconv.Itoa(int(r))))
return roachpb.NewError(roachpb.NewAmbiguousResultErrorf(strconv.Itoa(int(r))))
},
},
}}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func (ds *DistSender) detectIntentMissingDueToIntentResolution(
// We weren't able to determine whether the intent missing error is
// due to intent resolution or not, so it is still ambiguous whether
// the commit succeeded.
return false, roachpb.NewAmbiguousResultErrorf("error=%s [intent missing]", pErr)
return false, roachpb.NewAmbiguousResultError(errors.Wrap(pErr.GoError(), "intent missing"))
}
resp := br.Responses[0].GetQueryTxn()
respTxn := &resp.QueriedTxn
Expand Down Expand Up @@ -1704,7 +1704,7 @@ func fillSkippedResponses(
// the error that the last attempt to execute the request returned.
func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
if ambiguousErr != nil {
return roachpb.NewAmbiguousResultErrorf("error=%s [exhausted]", ambiguousErr)
return roachpb.NewAmbiguousResultError(errors.Wrap(ambiguousErr, "exhausted"))
}

// TODO(bdarnell): The error from the last attempt is not necessarily the best
Expand Down Expand Up @@ -2029,7 +2029,7 @@ func (ds *DistSender) sendToReplicas(
}
default:
if ambiguousError != nil {
return nil, roachpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError)
return nil, roachpb.NewAmbiguousResultError(errors.Wrap(ambiguousError, "propagate"))
}

// The error received is likely not specific to this
Expand All @@ -2046,7 +2046,7 @@ func (ds *DistSender) sendToReplicas(
reportedErr := errors.Wrap(ctx.Err(), "context done during DistSender.Send")
log.Eventf(ctx, "%v", reportedErr)
if ambiguousError != nil {
return nil, roachpb.NewAmbiguousResultErrorf(reportedErr.Error())
return nil, roachpb.NewAmbiguousResultError(reportedErr)
}
// Don't consider this a sendError, because sendErrors indicate that we
// were unable to reach a replica that could serve the request, and they
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) {

testutils.RunTrueAndFalse(t, "errOnFirst", func(t *testing.T, errOnFirst bool) {
key := roachpb.Key("a")
are := roachpb.NewAmbiguousResultError("very ambiguous")
are := roachpb.NewAmbiguousResultErrorf("very ambiguous")
knobs := &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, req := range ba.Requests {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvnemesis/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func TestValidate(t *testing.T) {
},
{
name: "one ambiguous put with successful write",
steps: []Step{step(withResult(put(`a`, `v1`), roachpb.NewAmbiguousResultError(``)))},
steps: []Step{step(withResult(put(`a`, `v1`), roachpb.NewAmbiguousResultErrorf(`foo`)))},
kvs: kvs(kv(`a`, 1, `v1`)),
expected: nil,
},
{
name: "one ambiguous put with failed write",
steps: []Step{step(withResult(put(`a`, `v1`), roachpb.NewAmbiguousResultError(``)))},
steps: []Step{step(withResult(put(`a`, `v1`), roachpb.NewAmbiguousResultErrorf(`foo`)))},
kvs: nil,
expected: nil,
},
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestValidate(t *testing.T) {
step(withResult(closureTxn(ClosureTxnType_Commit,
withResult(put(`a`, `v1`), nil),
withResult(put(`b`, `v2`), nil),
), roachpb.NewAmbiguousResultError(``))),
), roachpb.NewAmbiguousResultErrorf(`foo`))),
},
kvs: kvs(kv(`a`, 1, `v1`), kv(`b`, 1, `v2`)),
expected: nil,
Expand All @@ -262,7 +262,7 @@ func TestValidate(t *testing.T) {
step(withResult(closureTxn(ClosureTxnType_Commit,
withResult(put(`a`, `v1`), nil),
withResult(put(`b`, `v2`), nil),
), roachpb.NewAmbiguousResultError(``))),
), roachpb.NewAmbiguousResultErrorf(`foo`))),
},
kvs: nil,
expected: nil,
Expand All @@ -273,7 +273,7 @@ func TestValidate(t *testing.T) {
step(withResult(closureTxn(ClosureTxnType_Commit,
withResult(put(`a`, `v1`), nil),
withResult(put(`b`, `v2`), nil),
), roachpb.NewAmbiguousResultError(``))),
), roachpb.NewAmbiguousResultErrorf(`foo`))),
},
kvs: kvs(kv(`a`, 1, `v1`), kv(`b`, 2, `v2`)),
expected: []string{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/node_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) {
if val := injectError.Load(); val != nil && val.(bool) {
atomic.AddInt32(&injectedErrorCount, 1)
injectError.Store(false)
return roachpb.NewError(roachpb.NewAmbiguousResultError("test"))
return roachpb.NewError(roachpb.NewAmbiguousResultErrorf("test"))
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *replicatedCmd) AckErrAndFinish(ctx context.Context, err error) error {
if c.IsLocal() {
c.response.Err = roachpb.NewError(
roachpb.NewAmbiguousResultError(
err.Error()))
err))
}
return c.AckOutcomeAndFinish(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) {
p.finishApplication(ctx, proposalResult{
Err: roachpb.NewError(
roachpb.NewAmbiguousResultError(
apply.ErrRemoved.Error())),
apply.ErrRemoved)),
})
}
r.mu.internalRaftGroup = nil
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package kvserver

import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
Expand Down Expand Up @@ -1056,9 +1055,9 @@ func (r *Replica) refreshProposalsLocked(
log.VEventf(p.ctx, 2, "refresh (reason: %s) returning AmbiguousResultError for command "+
"without MaxLeaseIndex: %v", reason, p.command)
p.finishApplication(ctx, proposalResult{Err: roachpb.NewError(
roachpb.NewAmbiguousResultError(
fmt.Sprintf("unknown status for command without MaxLeaseIndex "+
"at refreshProposalsLocked time (refresh reason: %s)", reason)))})
roachpb.NewAmbiguousResultErrorf(
"unknown status for command without MaxLeaseIndex "+
"at refreshProposalsLocked time (refresh reason: %s)", reason))})
continue
}
switch reason {
Expand All @@ -1073,7 +1072,7 @@ func (r *Replica) refreshProposalsLocked(
log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason)
p.finishApplication(ctx, proposalResult{
Err: roachpb.NewError(
roachpb.NewAmbiguousResultError(
roachpb.NewAmbiguousResultErrorf(
"unable to determine whether command was applied via snapshot",
),
),
Expand Down Expand Up @@ -1113,7 +1112,7 @@ func (r *Replica) refreshProposalsLocked(
if err := r.mu.proposalBuf.ReinsertLocked(ctx, p); err != nil {
r.cleanupFailedProposalLocked(p)
p.finishApplication(ctx, proposalResult{
Err: roachpb.NewError(roachpb.NewAmbiguousResultError(err.Error())),
Err: roachpb.NewError(roachpb.NewAmbiguousResultError(err)),
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,15 @@ func (r *Replica) executeWriteBatch(
abandon()
log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s",
timeutil.Since(startTime).Seconds(), ba)
return nil, nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error()))
return nil, nil, roachpb.NewError(roachpb.NewAmbiguousResultError(errors.Wrap(ctx.Err(), "while waiting for proposal")))

case <-shouldQuiesce:
// If shutting down, return an AmbiguousResultError, which indicates
// to the caller that the command may have executed.
abandon()
log.VEventf(ctx, 2, "shutdown cancellation after %0.1fs of attempting command %s",
timeutil.Since(startTime).Seconds(), ba)
return nil, nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown"))
return nil, nil, roachpb.NewError(roachpb.NewAmbiguousResultErrorf("server shutdown"))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (s *Store) processRaft(ctx context.Context) {
prop.finishApplication(
context.Background(),
proposalResult{
Err: roachpb.NewError(roachpb.NewAmbiguousResultError("store is stopping")),
Err: roachpb.NewError(roachpb.NewAmbiguousResultErrorf("store is stopping")),
},
)
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,16 +633,34 @@ func (e *RangeKeyMismatchError) AppendRangeInfo(

var _ ErrorDetailInterface = &RangeKeyMismatchError{}

// NewAmbiguousResultError initializes a new AmbiguousResultError with
// an explanatory message.
func NewAmbiguousResultError(msg string) *AmbiguousResultError {
return &AmbiguousResultError{Message: msg}
// NewAmbiguousResultError initializes a new AmbiguousResultError wrapping
// (opaquely) a causing error.
func NewAmbiguousResultError(err error) *AmbiguousResultError {
return &AmbiguousResultError{
Message: err.Error(),
EncodedErr: errors.EncodeError(context.Background(), err),
}
}

// NewAmbiguousResultErrorf initializes a new AmbiguousResultError with
// an explanatory format and set of arguments.
func NewAmbiguousResultErrorf(format string, args ...interface{}) *AmbiguousResultError {
return NewAmbiguousResultError(fmt.Sprintf(format, args...))
return NewAmbiguousResultError(errors.Errorf(format, args...))
}

// SafeFormatError implements errors.SafeFormatter.
func (e *AmbiguousResultError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("result is ambiguous")
cause := errors.DecodeError(context.Background(), e.EncodedErr)
if f, l, _, ok := errors.GetOneLineSource(cause); ok {
p.Printf(" (originated at %s:%d)", redact.SafeString(f), redact.Safe(l))
}
return cause
}

// Format implements fmt.Formatter.
func (e *AmbiguousResultError) Format(s fmt.State, v rune) {
errors.FormatError(e, s, v)
}

func (e *AmbiguousResultError) Error() string {
Expand Down
Loading