diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index 8396a6b0884d..81dce9c5c2e6 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -38,6 +38,7 @@ proto_library( "//pkg/storage/enginepb:enginepb_proto", "//pkg/util/hlc:hlc_proto", "//pkg/util/tracing/tracingpb:tracingpb_proto", + "@com_github_cockroachdb_errors//errorspb:errorspb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:timestamp_proto", "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", @@ -59,6 +60,7 @@ go_proto_library( "//pkg/util/hlc", "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", # keep + "@com_github_cockroachdb_errors//errorspb", "@com_github_gogo_protobuf//gogoproto", "@io_etcd_go_etcd_raft_v3//raftpb", ], diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4923d96ffecd..c7833eba46f5 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -12,6 +12,7 @@ syntax = "proto3"; package cockroach.kv.kvserver.kvserverpb; option go_package = "kvserverpb"; +import "errorspb/errors.proto"; import "roachpb/errors.proto"; import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; @@ -237,11 +238,22 @@ message SnapshotResponse { reserved 4; } Status status = 1; - string message = 2; + // Message is a message explaining an ERROR return value. It is not set for any + // other status. + // + // DEPRECATED: this field can be removed in 23.2. As of 23.1, the encoded_error + // field is always used instead. (23.1 itself needs to populate both due to + // needing to be wire-compatible with 22.2). + string deprecated_message = 2; reserved 3; // Traces from snapshot processing, returned on status APPLIED or ERROR. repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false]; + + // encoded_error encodes the error when the status is ERROR. + // + // MIGRATION: only guaranteed to be set when the message field is no longer there. + errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false]; } // DelegateSnapshotRequest is the request used to delegate send snapshot requests. diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 92597731397e..4f95d04d377c 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -15,6 +15,10 @@ import ( "github.com/cockroachdb/errors" ) +// errMarkSnapshotError is used as an error mark for errors that get returned +// to the initiator of a snapshot. This generally classifies errors as transient, +// i.e. communicates an intention for the caller to retry. +// // NB: don't change the string here; this will cause cross-version issues // since this singleton is used as a marker. var errMarkSnapshotError = errors.New("snapshot failed") diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 33cfb4524bc3..fa124d4bb487 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -332,8 +332,8 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh return stream.Send( &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: "client error: no message in first delegated snapshot request", + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: "client error: no message in first delegated snapshot request", }, }, ) @@ -364,9 +364,12 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return err } if req.Header == nil { + err := errors.New("client error: no header in first snapshot request message") return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: "client error: no header in first snapshot request message"}) + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: err.Error(), + EncodedError: errors.EncodeError(ctx, err), + }) } rmr := req.Header.RaftMessageRequest handler, ok := t.getHandler(rmr.ToReplica.StoreID) @@ -376,6 +379,7 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return roachpb.NewStoreNotFoundError(rmr.ToReplica.StoreID) } return handler.HandleSnapshot(ctx, req.Header, stream) + } // Listen registers a raftMessageHandler to receive proxied messages. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index a1de1382349f..2e3b6e83b847 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2868,7 +2868,7 @@ func (r *Replica) followerSendSnapshot( } } - err = contextutil.RunWithTimeout( + return contextutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { return r.store.cfg.Transport.SendSnapshot( ctx, @@ -2881,20 +2881,6 @@ func (r *Replica) followerSendSnapshot( ) }, ) - if err != nil { - if errors.Is(err, errMalformedSnapshot) { - tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()) - if dir, err := r.store.checkpoint(ctx, tag); err != nil { - log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err) - } else { - log.Warningf(ctx, "created checkpoint %s", dir) - } - - log.Fatal(ctx, "malformed snapshot generated") - } - return errors.Mark(err, errMarkSnapshotError) - } - return nil } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 06933e69aeec..073078ab9828 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -430,11 +430,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { skip.UnderRace(t) runTest := func(t *testing.T, replicaType roachpb.ReplicaType) { - var rejectSnapshots int64 + var rejectSnapshotErr atomic.Value // error knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { - if atomic.LoadInt64(&rejectSnapshots) > 0 { - return errors.New(`nope`) + if err := rejectSnapshotErr.Load().(error); err != nil { + return err } return nil } @@ -446,7 +446,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { defer tc.Stopper().Stop(ctx) scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(&rejectSnapshots, 1) + rejectSnapshotErr.Store(errors.New("boom")) var err error switch replicaType { case roachpb.LEARNER: diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 257ab1f8acce..0519655ee9b9 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -181,8 +181,8 @@ func (s *Store) HandleDelegatedSnapshot( return stream.Send( &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: err.Error(), }, CollectedSpans: sp.GetConfiguredRecording(), }, @@ -191,8 +191,8 @@ func (s *Store) HandleDelegatedSnapshot( resp := &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_APPLIED, - Message: "Snapshot successfully applied by recipient", + Status: kvserverpb.SnapshotResponse_APPLIED, + DeprecatedMessage: "Snapshot successfully applied by recipient", }, CollectedSpans: sp.GetConfiguredRecording(), } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 6e7e48ec9c36..83fc67fa288f 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -504,10 +504,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } } -// errMalformedSnapshot indicates that the snapshot in question is malformed, -// for e.g. missing raft log entries. -var errMalformedSnapshot = errors.New("malformed snapshot generated") - // Send implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Send( ctx context.Context, @@ -885,7 +881,10 @@ func (s *Store) checkSnapshotOverlapLocked( // NB: this check seems redundant since placeholders are also represented in // replicasByKey (and thus returned in getOverlappingKeyRangeLocked). if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok { - return errors.Errorf("%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica) + return errors.Mark(errors.Errorf( + "%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", + s, exRng, snapHeader.RaftMessageRequest.FromReplica), + errMarkSnapshotError) } // TODO(benesch): consider discovering and GC'ing *all* overlapping ranges, @@ -930,7 +929,10 @@ func (s *Store) checkSnapshotOverlapLocked( msg += "; initiated GC:" s.replicaGCQueue.AddAsync(ctx, exReplica, gcPriority) } - return errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()) // exReplica can be nil + return errors.Mark( + errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()), // exReplica can be nil + errMarkSnapshotError, + ) } return nil } @@ -964,6 +966,8 @@ func (s *Store) receiveSnapshot( if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { + // NB: we intentionally don't mark this error as errMarkSnapshotError so + // that we don't end up retrying injected errors in tests. return sendSnapshotError(stream, err) } } @@ -1004,7 +1008,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return pErr.GoError() + return sendSnapshotError(stream, pErr.GoError()) } defer func() { @@ -1078,10 +1082,13 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { - return sendSnapshotErrorWithTrace(stream, - errors.Wrap(err.GoError(), "failed to apply snapshot"), rec, - ) + if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + err := pErr.GoError() + // We mark this error as a snapshot error which will be interpreted by the + // sender as this being a retriable error, see isSnapshotError(). + err = errors.Mark(err, errMarkSnapshotError) + err = errors.Wrap(err, "failed to apply snapshot") + return sendSnapshotErrorWithTrace(stream, err, rec) } return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, @@ -1097,9 +1104,10 @@ func sendSnapshotErrorWithTrace( stream incomingSnapshotStream, err error, trace tracingpb.Recording, ) error { return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), - CollectedSpans: trace, + Status: kvserverpb.SnapshotResponse_ERROR, + DeprecatedMessage: err.Error(), + EncodedError: errors.EncodeError(context.Background(), err), + CollectedSpans: trace, }) } @@ -1506,9 +1514,19 @@ func sendSnapshot( switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: sp.ImportRemoteRecording(resp.CollectedSpans) - storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID) - return errors.Errorf("%s: remote couldn't accept %s with error: %s", - to, snap, resp.Message) + storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID) + + var err error + if resp.EncodedError.Error != nil { + // NB: errMarkSnapshotError must be set on the other end, if it is + // set. We're not going to add it here. + err = errors.DecodeError(ctx, resp.EncodedError) + } else { + // Deprecated path. + err = errors.Errorf("%s", resp.DeprecatedMessage) + err = errors.Mark(err, errMarkSnapshotError) + } + return errors.Wrapf(err, "%s: remote couldn't accept %s with error", to, snap) case kvserverpb.SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server") @@ -1595,7 +1613,7 @@ func sendSnapshot( } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.Message) + return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.DeprecatedMessage) case kvserverpb.SnapshotResponse_APPLIED: return nil default: @@ -1625,7 +1643,7 @@ func delegateSnapshot( case kvserverpb.SnapshotResponse_ERROR: return errors.Errorf( "%s: sender couldn't accept %s with error: %s", delegatedSender, - req, resp.SnapResponse.Message, + req, resp.SnapResponse.DeprecatedMessage, ) case kvserverpb.SnapshotResponse_ACCEPTED: // The sender accepted the request, it will continue with sending. @@ -1665,7 +1683,7 @@ func delegateSnapshot( } switch resp.SnapResponse.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Newf("%s", resp.SnapResponse.Message) + return errors.Newf("%s", resp.SnapResponse.DeprecatedMessage) case kvserverpb.SnapshotResponse_APPLIED: // This is the response we're expecting. Snapshot successfully applied. log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender)