diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index 8396a6b0884d..81dce9c5c2e6 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -38,6 +38,7 @@ proto_library( "//pkg/storage/enginepb:enginepb_proto", "//pkg/util/hlc:hlc_proto", "//pkg/util/tracing/tracingpb:tracingpb_proto", + "@com_github_cockroachdb_errors//errorspb:errorspb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:timestamp_proto", "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", @@ -59,6 +60,7 @@ go_proto_library( "//pkg/util/hlc", "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", # keep + "@com_github_cockroachdb_errors//errorspb", "@com_github_gogo_protobuf//gogoproto", "@io_etcd_go_etcd_raft_v3//raftpb", ], diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4923d96ffecd..0f8276cb13b1 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -12,6 +12,7 @@ syntax = "proto3"; package cockroach.kv.kvserver.kvserverpb; option go_package = "kvserverpb"; +import "errorspb/errors.proto"; import "roachpb/errors.proto"; import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; @@ -237,11 +238,22 @@ message SnapshotResponse { reserved 4; } Status status = 1; - string message = 2; + // Message is a message explaining an ERROR return value. It is not set for any + // other status. + // + // As of 23.1, the encoded_error field is always used instead. 23.1 itself + // needs to populate both due to needing to be compatible with 22.2. Once + // the MinSupportedVersion is 23.1, this can be removed. + string deprecated_message = 2; reserved 3; // Traces from snapshot processing, returned on status APPLIED or ERROR. repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false]; + + // encoded_error encodes the error when the status is ERROR. + // + // MIGRATION: only guaranteed to be set when the message field is no longer there. + errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false]; } // DelegateSnapshotRequest is the request used to delegate send snapshot requests. diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 92597731397e..4f95d04d377c 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -15,6 +15,10 @@ import ( "github.com/cockroachdb/errors" ) +// errMarkSnapshotError is used as an error mark for errors that get returned +// to the initiator of a snapshot. This generally classifies errors as transient, +// i.e. communicates an intention for the caller to retry. +// // NB: don't change the string here; this will cause cross-version issues // since this singleton is used as a marker. var errMarkSnapshotError = errors.New("snapshot failed") diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 33cfb4524bc3..d8b4b463fbba 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -332,8 +332,8 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh return stream.Send( &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: "client error: no message in first delegated snapshot request", + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: "client error: no message in first delegated snapshot request", }, }, ) @@ -364,9 +364,12 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return err } if req.Header == nil { + err := errors.New("client error: no header in first snapshot request message") return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: "client error: no header in first snapshot request message"}) + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: err.Error(), + EncodedError: errors.EncodeError(ctx, err), + }) } rmr := req.Header.RaftMessageRequest handler, ok := t.getHandler(rmr.ToReplica.StoreID) @@ -651,14 +654,14 @@ func (t *RaftTransport) DelegateSnapshot( nodeID := req.DelegatedSender.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return err + return errors.Mark(err, errMarkSnapshotError) } client := NewMultiRaftClient(conn) // Creates a rpc stream between the leaseholder and sender. stream, err := client.DelegateRaftSnapshot(ctx) if err != nil { - return err + return errors.Mark(err, errMarkSnapshotError) } defer func() { if err := stream.CloseSend(); err != nil { diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index dd30a423ff59..73e9e4e4f76d 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2723,11 +2723,12 @@ func (r *Replica) sendSnapshot( ) }, ) - - if err != nil { - return errors.Mark(err, errMarkSnapshotError) + // Only mark explicitly as snapshot error (which is retriable) if we timed out. + // Otherwise, it's up to the remote to add this mark where appropriate. + if errors.HasType(err, (*contextutil.TimeoutError)(nil)) { + err = errors.Mark(err, errMarkSnapshotError) } - return nil + return err } // followerSnapshotsEnabled is used to enable or disable follower snapshots. @@ -2882,7 +2883,7 @@ func (r *Replica) followerSendSnapshot( } } - err = contextutil.RunWithTimeout( + return contextutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { return r.store.cfg.Transport.SendSnapshot( ctx, @@ -2895,20 +2896,6 @@ func (r *Replica) followerSendSnapshot( ) }, ) - if err != nil { - if errors.Is(err, errMalformedSnapshot) { - tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()) - if dir, err := r.store.checkpoint(ctx, tag); err != nil { - log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err) - } else { - log.Warningf(ctx, "created checkpoint %s", dir) - } - - log.Fatal(ctx, "malformed snapshot generated") - } - return errors.Mark(err, errMarkSnapshotError) - } - return nil } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 06933e69aeec..073078ab9828 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -430,11 +430,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { skip.UnderRace(t) runTest := func(t *testing.T, replicaType roachpb.ReplicaType) { - var rejectSnapshots int64 + var rejectSnapshotErr atomic.Value // error knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { - if atomic.LoadInt64(&rejectSnapshots) > 0 { - return errors.New(`nope`) + if err := rejectSnapshotErr.Load().(error); err != nil { + return err } return nil } @@ -446,7 +446,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { defer tc.Stopper().Stop(ctx) scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(&rejectSnapshots, 1) + rejectSnapshotErr.Store(errors.New("boom")) var err error switch replicaType { case roachpb.LEARNER: diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 257ab1f8acce..0519655ee9b9 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -181,8 +181,8 @@ func (s *Store) HandleDelegatedSnapshot( return stream.Send( &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: err.Error(), }, CollectedSpans: sp.GetConfiguredRecording(), }, @@ -191,8 +191,8 @@ func (s *Store) HandleDelegatedSnapshot( resp := &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_APPLIED, - Message: "Snapshot successfully applied by recipient", + Status: kvserverpb.SnapshotResponse_APPLIED, + DeprecatedMessage: "Snapshot successfully applied by recipient", }, CollectedSpans: sp.GetConfiguredRecording(), } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 07fff97b5b0a..5cebd8e4b067 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -504,10 +504,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } } -// errMalformedSnapshot indicates that the snapshot in question is malformed, -// for e.g. missing raft log entries. -var errMalformedSnapshot = errors.New("malformed snapshot generated") - // Send implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Send( ctx context.Context, @@ -885,7 +881,10 @@ func (s *Store) checkSnapshotOverlapLocked( // NB: this check seems redundant since placeholders are also represented in // replicasByKey (and thus returned in getOverlappingKeyRangeLocked). if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok { - return errors.Errorf("%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica) + return errors.Mark(errors.Errorf( + "%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", + s, exRng, snapHeader.RaftMessageRequest.FromReplica), + errMarkSnapshotError) } // TODO(benesch): consider discovering and GC'ing *all* overlapping ranges, @@ -930,7 +929,10 @@ func (s *Store) checkSnapshotOverlapLocked( msg += "; initiated GC:" s.replicaGCQueue.AddAsync(ctx, exReplica, gcPriority) } - return errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()) // exReplica can be nil + return errors.Mark( + errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()), // exReplica can be nil + errMarkSnapshotError, + ) } return nil } @@ -964,6 +966,8 @@ func (s *Store) receiveSnapshot( if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { + // NB: we intentionally don't mark this error as errMarkSnapshotError so + // that we don't end up retrying injected errors in tests. return sendSnapshotError(stream, err) } } @@ -1004,7 +1008,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return pErr.GoError() + return sendSnapshotError(stream, pErr.GoError()) } defer func() { @@ -1078,10 +1082,13 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { - return sendSnapshotErrorWithTrace(stream, - errors.Wrap(err.GoError(), "failed to apply snapshot"), rec, - ) + if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + err := pErr.GoError() + // We mark this error as a snapshot error which will be interpreted by the + // sender as this being a retriable error, see isSnapshotError(). + err = errors.Mark(err, errMarkSnapshotError) + err = errors.Wrap(err, "failed to apply snapshot") + return sendSnapshotErrorWithTrace(stream, err, rec) } return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, @@ -1097,9 +1104,10 @@ func sendSnapshotErrorWithTrace( stream incomingSnapshotStream, err error, trace tracingpb.Recording, ) error { return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), - CollectedSpans: trace, + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: err.Error(), + EncodedError: errors.EncodeError(context.Background(), err), + CollectedSpans: trace, }) } @@ -1505,9 +1513,19 @@ func sendSnapshot( switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: sp.ImportRemoteRecording(resp.CollectedSpans) - storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID) - return errors.Errorf("%s: remote couldn't accept %s with error: %s", - to, snap, resp.Message) + storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID) + + var err error + if resp.EncodedError.Error != nil { + // NB: errMarkSnapshotError must be set on the other end, if it is + // set. We're not going to add it here. + err = errors.DecodeError(ctx, resp.EncodedError) + } else { + // Deprecated path. + err = errors.Errorf("%s", resp.DeprecatedMessage) + err = errors.Mark(err, errMarkSnapshotError) + } + return errors.Wrapf(err, "%s: remote couldn't accept %s with error", to, snap) case kvserverpb.SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server") @@ -1594,7 +1612,7 @@ func sendSnapshot( } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.Message) + return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.DeprecatedMessage) case kvserverpb.SnapshotResponse_APPLIED: return nil default: @@ -1624,7 +1642,7 @@ func delegateSnapshot( case kvserverpb.SnapshotResponse_ERROR: return errors.Errorf( "%s: sender couldn't accept %s with error: %s", delegatedSender, - req, resp.SnapResponse.Message, + req, resp.SnapResponse.DeprecatedMessage, ) case kvserverpb.SnapshotResponse_ACCEPTED: // The sender accepted the request, it will continue with sending. @@ -1643,20 +1661,20 @@ func delegateSnapshot( // Wait for response to see if the receiver successfully applied the snapshot. resp, err = stream.Recv() if err != nil { - return errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender) + return errors.Mark( + errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender), errMarkSnapshotError, + ) } // Wait for EOF to ensure server side processing is complete. if unexpectedResp, err := stream.Recv(); err != io.EOF { if err != nil { - return errors.Wrapf( + return errors.Mark(errors.Wrapf( err, "%s: expected EOF, got resp=%v with error", - delegatedSender.StoreID, unexpectedResp, - ) + delegatedSender.StoreID, unexpectedResp), errMarkSnapshotError) } - return errors.Newf( + return errors.Mark(errors.Newf( "%s: expected EOF, got resp=%v", delegatedSender.StoreID, - unexpectedResp, - ) + unexpectedResp), errMarkSnapshotError) } sp := tracing.SpanFromContext(ctx) if sp != nil { @@ -1664,7 +1682,10 @@ func delegateSnapshot( } switch resp.SnapResponse.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Newf("%s", resp.SnapResponse.Message) + if resp.SnapResponse.EncodedError.IsSet() { + return errors.DecodeError(ctx, resp.SnapResponse.EncodedError) + } + return errors.Newf("%s", resp.SnapResponse.DeprecatedMessage) case kvserverpb.SnapshotResponse_APPLIED: // This is the response we're expecting. Snapshot successfully applied. log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender)