Skip to content

Commit

Permalink
kvserver: use EncodedError in SnapshotResponse
Browse files Browse the repository at this point in the history
We were previously using a "Message" string to indicate details about an
error. We can do so much better now and actually encode the error. This
wasn't possible when this field was first added, but it is now, so let's
use it. As always, there's a migration concern, which means the old
field stays around & is populated as well as interpreted for one
release.

We then use this new-found freedom to improve which errors were marked
as "failed snapshot" errors. Previously, any error coming in on a
`SnapshotResponse` were considered snapshot errors and were considered
retriable. This was causing `TestLearnerSnapshotFailsRollback` to run
for 90s, as `TestCluster`'s replication changes use a [SucceedsSoon] to
retry snapshot errors - but that test actually injects an error that it
wants to fail-fast.  Now, since snapshot error marks propagate over the
wire, we can do the marking on the *sender* of the SnapshotResponse, and
we can only mark messages that correspond to an actual failure to apply
the snapshot (as opposed to an injected error, or a hard error due to a
malformed request). The test now takes around one second, for a rare 90x
speed-up.

As a drive-by, we're also removing `errMalformedSnapshot`, which became
unused when we stopped sending the raft log in raft snaps a few releases
back, and which had managed to hide from the `unused` lint.

[SucceedsSoon]: https://github.com/cockroachdb/cockroach/blob/37175f77bf374d1bcb76bc39a65149788be06134/pkg/testutils/testcluster/testcluster.go#L628-L631

Fixes cockroachdb#74621.

Release note: None
  • Loading branch information
tbg committed Sep 28, 2022
1 parent 455369c commit 61d8135
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 48 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvserverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ proto_library(
"//pkg/storage/enginepb:enginepb_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/tracing/tracingpb:tracingpb_proto",
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:timestamp_proto",
"@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto",
Expand All @@ -59,6 +60,7 @@ go_proto_library(
"//pkg/util/hlc",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid", # keep
"@com_github_cockroachdb_errors//errorspb",
"@com_github_gogo_protobuf//gogoproto",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ syntax = "proto3";
package cockroach.kv.kvserver.kvserverpb;
option go_package = "kvserverpb";

import "errorspb/errors.proto";
import "roachpb/errors.proto";
import "roachpb/internal_raft.proto";
import "roachpb/metadata.proto";
Expand Down Expand Up @@ -237,11 +238,22 @@ message SnapshotResponse {
reserved 4;
}
Status status = 1;
string message = 2;
// Message is a message explaining an ERROR return value. It is not set for any
// other status.
//
// DEPRECATED: this field can be removed in 23.2. As of 23.1, the encoded_error
// field is always used instead. (23.1 itself needs to populate both due to
// needing to be wire-compatible with 22.2).
string deprecated_message = 2;
reserved 3;

// Traces from snapshot processing, returned on status APPLIED or ERROR.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false];

// encoded_error encodes the error when the status is ERROR.
//
// MIGRATION: only guaranteed to be set when the message field is no longer there.
errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false];
}

// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"github.com/cockroachdb/errors"
)

// errMarkSnapshotError is used as an error mark for errors that get returned
// to the initiator of a snapshot. This generally classifies errors as transient,
// i.e. communicates an intention for the caller to retry.
//
// NB: don't change the string here; this will cause cross-version issues
// since this singleton is used as a marker.
var errMarkSnapshotError = errors.New("snapshot failed")
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh
return stream.Send(
&kvserverpb.DelegateSnapshotResponse{
SnapResponse: &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: "client error: no message in first delegated snapshot request",
Status: kvserverpb.SnapshotResponse_ERROR,
DeprecatedMessage: "client error: no message in first delegated snapshot request",
},
},
)
Expand Down Expand Up @@ -364,9 +364,12 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error
return err
}
if req.Header == nil {
err := errors.New("client error: no header in first snapshot request message")
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: "client error: no header in first snapshot request message"})
Status: kvserverpb.SnapshotResponse_ERROR,
DeprecatedMessage: err.Error(),
EncodedError: errors.EncodeError(ctx, err),
})
}
rmr := req.Header.RaftMessageRequest
handler, ok := t.getHandler(rmr.ToReplica.StoreID)
Expand All @@ -376,6 +379,7 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error
return roachpb.NewStoreNotFoundError(rmr.ToReplica.StoreID)
}
return handler.HandleSnapshot(ctx, req.Header, stream)

}

// Listen registers a raftMessageHandler to receive proxied messages.
Expand Down
16 changes: 1 addition & 15 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2868,7 +2868,7 @@ func (r *Replica) followerSendSnapshot(
}
}

err = contextutil.RunWithTimeout(
return contextutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
return r.store.cfg.Transport.SendSnapshot(
ctx,
Expand All @@ -2881,20 +2881,6 @@ func (r *Replica) followerSendSnapshot(
)
},
)
if err != nil {
if errors.Is(err, errMalformedSnapshot) {
tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short())
if dir, err := r.store.checkpoint(ctx, tag); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err)
} else {
log.Warningf(ctx, "created checkpoint %s", dir)
}

log.Fatal(ctx, "malformed snapshot generated")
}
return errors.Mark(err, errMarkSnapshotError)
}
return nil
}

// replicasCollocated is used in AdminMerge to ensure that the ranges are
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
skip.UnderRace(t)

runTest := func(t *testing.T, replicaType roachpb.ReplicaType) {
var rejectSnapshots int64
var rejectSnapshotErr atomic.Value // error
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) > 0 {
return errors.New(`nope`)
if err := rejectSnapshotErr.Load().(error); err != nil {
return err
}
return nil
}
Expand All @@ -446,7 +446,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
defer tc.Stopper().Stop(ctx)

scratchStartKey := tc.ScratchRange(t)
atomic.StoreInt64(&rejectSnapshots, 1)
rejectSnapshotErr.Store(errors.New("boom"))
var err error
switch replicaType {
case roachpb.LEARNER:
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func (s *Store) HandleDelegatedSnapshot(
return stream.Send(
&kvserverpb.DelegateSnapshotResponse{
SnapResponse: &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
Status: kvserverpb.SnapshotResponse_ERROR,
DeprecatedMessage: err.Error(),
},
CollectedSpans: sp.GetConfiguredRecording(),
},
Expand All @@ -191,8 +191,8 @@ func (s *Store) HandleDelegatedSnapshot(

resp := &kvserverpb.DelegateSnapshotResponse{
SnapResponse: &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
Message: "Snapshot successfully applied by recipient",
Status: kvserverpb.SnapshotResponse_APPLIED,
DeprecatedMessage: "Snapshot successfully applied by recipient",
},
CollectedSpans: sp.GetConfiguredRecording(),
}
Expand Down
58 changes: 38 additions & 20 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
}
}

// errMalformedSnapshot indicates that the snapshot in question is malformed,
// for e.g. missing raft log entries.
var errMalformedSnapshot = errors.New("malformed snapshot generated")

// Send implements the snapshotStrategy interface.
func (kvSS *kvBatchSnapshotStrategy) Send(
ctx context.Context,
Expand Down Expand Up @@ -885,7 +881,10 @@ func (s *Store) checkSnapshotOverlapLocked(
// NB: this check seems redundant since placeholders are also represented in
// replicasByKey (and thus returned in getOverlappingKeyRangeLocked).
if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok {
return errors.Errorf("%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica)
return errors.Mark(errors.Errorf(
"%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v",
s, exRng, snapHeader.RaftMessageRequest.FromReplica),
errMarkSnapshotError)
}

// TODO(benesch): consider discovering and GC'ing *all* overlapping ranges,
Expand Down Expand Up @@ -930,7 +929,10 @@ func (s *Store) checkSnapshotOverlapLocked(
msg += "; initiated GC:"
s.replicaGCQueue.AddAsync(ctx, exReplica, gcPriority)
}
return errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()) // exReplica can be nil
return errors.Mark(
errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()), // exReplica can be nil
errMarkSnapshotError,
)
}
return nil
}
Expand Down Expand Up @@ -964,6 +966,8 @@ func (s *Store) receiveSnapshot(

if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil {
if err := fn(header); err != nil {
// NB: we intentionally don't mark this error as errMarkSnapshotError so
// that we don't end up retrying injected errors in tests.
return sendSnapshotError(stream, err)
}
}
Expand Down Expand Up @@ -1004,7 +1008,7 @@ func (s *Store) receiveSnapshot(
return nil
}); pErr != nil {
log.Infof(ctx, "cannot accept snapshot: %s", pErr)
return pErr.GoError()
return sendSnapshotError(stream, pErr.GoError())
}

defer func() {
Expand Down Expand Up @@ -1078,10 +1082,13 @@ func (s *Store) receiveSnapshot(
// already received the entire snapshot here, so there's no point in
// abandoning application half-way through if the caller goes away.
applyCtx := s.AnnotateCtx(context.Background())
if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil {
return sendSnapshotErrorWithTrace(stream,
errors.Wrap(err.GoError(), "failed to apply snapshot"), rec,
)
if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil {
err := pErr.GoError()
// We mark this error as a snapshot error which will be interpreted by the
// sender as this being a retriable error, see isSnapshotError().
err = errors.Mark(err, errMarkSnapshotError)
err = errors.Wrap(err, "failed to apply snapshot")
return sendSnapshotErrorWithTrace(stream, err, rec)
}
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
Expand All @@ -1097,9 +1104,10 @@ func sendSnapshotErrorWithTrace(
stream incomingSnapshotStream, err error, trace tracingpb.Recording,
) error {
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
CollectedSpans: trace,
Status: kvserverpb.SnapshotResponse_ERROR,
DeprecatedMessage: err.Error(),
EncodedError: errors.EncodeError(context.Background(), err),
CollectedSpans: trace,
})
}

Expand Down Expand Up @@ -1506,9 +1514,19 @@ func sendSnapshot(
switch resp.Status {
case kvserverpb.SnapshotResponse_ERROR:
sp.ImportRemoteRecording(resp.CollectedSpans)
storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID)
return errors.Errorf("%s: remote couldn't accept %s with error: %s",
to, snap, resp.Message)
storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID)

var err error
if resp.EncodedError.Error != nil {
// NB: errMarkSnapshotError must be set on the other end, if it is
// set. We're not going to add it here.
err = errors.DecodeError(ctx, resp.EncodedError)
} else {
// Deprecated path.
err = errors.Errorf("%s", resp.DeprecatedMessage)
err = errors.Mark(err, errMarkSnapshotError)
}
return errors.Wrapf(err, "%s: remote couldn't accept %s with error", to, snap)
case kvserverpb.SnapshotResponse_ACCEPTED:
// This is the response we're expecting. Continue with snapshot sending.
log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server")
Expand Down Expand Up @@ -1595,7 +1613,7 @@ func sendSnapshot(
}
switch resp.Status {
case kvserverpb.SnapshotResponse_ERROR:
return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.Message)
return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.DeprecatedMessage)
case kvserverpb.SnapshotResponse_APPLIED:
return nil
default:
Expand Down Expand Up @@ -1625,7 +1643,7 @@ func delegateSnapshot(
case kvserverpb.SnapshotResponse_ERROR:
return errors.Errorf(
"%s: sender couldn't accept %s with error: %s", delegatedSender,
req, resp.SnapResponse.Message,
req, resp.SnapResponse.DeprecatedMessage,
)
case kvserverpb.SnapshotResponse_ACCEPTED:
// The sender accepted the request, it will continue with sending.
Expand Down Expand Up @@ -1665,7 +1683,7 @@ func delegateSnapshot(
}
switch resp.SnapResponse.Status {
case kvserverpb.SnapshotResponse_ERROR:
return errors.Newf("%s", resp.SnapResponse.Message)
return errors.Newf("%s", resp.SnapResponse.DeprecatedMessage)
case kvserverpb.SnapshotResponse_APPLIED:
// This is the response we're expecting. Snapshot successfully applied.
log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender)
Expand Down

0 comments on commit 61d8135

Please sign in to comment.