diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index cb174368921c..0de463cc3fa2 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -442,6 +442,7 @@ proto_library( "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto", "//pkg/roachpb:roachpb_proto", "//pkg/storage/enginepb:enginepb_proto", + "@com_github_cockroachdb_errors//errorspb:errorspb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", ], @@ -458,6 +459,7 @@ go_proto_library( "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb:with-mocks", "//pkg/storage/enginepb", + "@com_github_cockroachdb_errors//errorspb", "@com_github_gogo_protobuf//gogoproto", "@io_etcd_go_etcd_raft_v3//raftpb", ], diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 29b7269e4469..9bef525358a4 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -15,6 +15,10 @@ import ( "github.com/cockroachdb/errors" ) +// errMarkSnapshotError is used as an error mark for errors that get returned +// to the initiator of a snapshot. This generally classifies errors as transient, +// i.e. communicates an intention for the caller to retry. +// // NB: don't change the string here; this will cause cross-version issues // since this singleton is used as a marker. var errMarkSnapshotError = errors.New("snapshot failed") diff --git a/pkg/kv/kvserver/raft.proto b/pkg/kv/kvserver/raft.proto index a11ac8fda205..2a63c5a48893 100644 --- a/pkg/kv/kvserver/raft.proto +++ b/pkg/kv/kvserver/raft.proto @@ -12,6 +12,7 @@ syntax = "proto3"; package cockroach.kv.kvserver; option go_package = "kvserver"; +import "errorspb/errors.proto"; import "roachpb/errors.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; @@ -210,8 +211,18 @@ message SnapshotResponse { reserved 4; } Status status = 1; + // Message is a message explaining an ERROR return value. It is not set for any + // other status. + // + // DEPRECATED: this field can be removed in 22.2. As of 22.1, the encoded_error + // field is always used instead. (22.1 itself needs to populate both due to + // needing to be wire-compatible with 21.2). string message = 2; reserved 3; + // encoded_error encodes the error when the status is ERROR. + // + // MIGRATION: only guaranteed to be set when the message field is no longer there. + errorspb.EncodedError encoded_error = 4 [(gogoproto.nullable) = false]; } // ConfChangeContext is encoded in the raftpb.ConfChange.Context field. diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 44a6ebe6f41a..7275d612308c 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -420,9 +420,12 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return err } if req.Header == nil { + err := errors.New("client error: no header in first snapshot request message") return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, - Message: "client error: no header in first snapshot request message"}) + Status: SnapshotResponse_ERROR, + Message: err.Error(), + EncodedError: errors.EncodeError(ctx, err), + }) } rmr := req.Header.RaftMessageRequest handler, ok := t.getHandler(rmr.ToReplica.StoreID) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 3d9834956bb6..c43886ffe0a2 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2499,27 +2499,14 @@ func (r *Replica) sendSnapshot( sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) } - if err := r.store.cfg.Transport.SendSnapshot( + return r.store.cfg.Transport.SendSnapshot( ctx, r.store.allocator.storePool, req, snap, newBatchFn, sent, - ); err != nil { - if errors.Is(err, errMalformedSnapshot) { - tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()) - if dir, err := r.store.checkpoint(ctx, tag); err != nil { - log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err) - } else { - log.Warningf(ctx, "created checkpoint %s", dir) - } - - log.Fatal(ctx, "malformed snapshot generated") - } - return errors.Mark(err, errMarkSnapshotError) - } - return nil + ) } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 2700c7fc2749..2f1c2c52a551 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -313,11 +313,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { skip.UnderShort(t) // Takes 90s. runTest := func(t *testing.T, replicaType roachpb.ReplicaType) { - var rejectSnapshots int64 + var rejectSnapshotErr atomic.Value // error knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { - if atomic.LoadInt64(&rejectSnapshots) > 0 { - return errors.New(`nope`) + if err := rejectSnapshotErr.Load().(error); err != nil { + return err } return nil } @@ -329,7 +329,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { defer tc.Stopper().Stop(ctx) scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(&rejectSnapshots, 1) + rejectSnapshotErr.Store(errors.New("boom")) var err error switch replicaType { case roachpb.LEARNER: diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index e7cc24dba5f1..521255020e61 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -74,8 +74,9 @@ func (s *Store) HandleSnapshot( if s.IsDraining() { return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, - Message: storeDrainingMsg, + Status: SnapshotResponse_ERROR, + Message: storeDrainingMsg, + EncodedError: errors.EncodeError(ctx, errors.Errorf("%s", storeDrainingMsg)), }) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e5ed6dd91677..1d484d2cb3f0 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -306,10 +306,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } } -// errMalformedSnapshot indicates that the snapshot in question is malformed, -// for e.g. missing raft log entries. -var errMalformedSnapshot = errors.New("malformed snapshot generated") - // Send implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Send( ctx context.Context, @@ -536,7 +532,10 @@ func (s *Store) checkSnapshotOverlapLocked( // NB: this check seems redundant since placeholders are also represented in // replicasByKey (and thus returned in getOverlappingKeyRangeLocked). if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok { - return errors.Errorf("%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica) + return errors.Mark(errors.Errorf( + "%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", + s, exRng, snapHeader.RaftMessageRequest.FromReplica), + errMarkSnapshotError) } // TODO(benesch): consider discovering and GC'ing *all* overlapping ranges, @@ -581,7 +580,10 @@ func (s *Store) checkSnapshotOverlapLocked( msg += "; initiated GC:" s.replicaGCQueue.AddAsync(ctx, exReplica, gcPriority) } - return errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()) // exReplica can be nil + return errors.Mark( + errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()), // exReplica can be nil + errMarkSnapshotError, + ) } return nil } @@ -592,6 +594,8 @@ func (s *Store) receiveSnapshot( ) error { if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { + // NB: we intentionally don't mark this error as errMarkSnapshotError so + // that we don't end up retrying injected errors in tests. return sendSnapshotError(stream, err) } } @@ -632,7 +636,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return pErr.GoError() + return sendSnapshotError(stream, pErr.GoError()) } defer func() { @@ -687,16 +691,22 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { - return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + err := pErr.GoError() + // We mark this error as a snapshot error which will be interpreted by the + // sender as this being a retriable error, see isSnapshotError(). + err = errors.Mark(err, errMarkSnapshotError) + err = errors.Wrap(err, "failed to apply snapshot") + return sendSnapshotError(stream, err) } return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED}) } func sendSnapshotError(stream incomingSnapshotStream, err error) error { return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, - Message: err.Error(), + Status: SnapshotResponse_ERROR, + Message: err.Error(), + EncodedError: errors.EncodeError(context.Background(), err), }) } @@ -1041,9 +1051,18 @@ func sendSnapshot( } switch resp.Status { case SnapshotResponse_ERROR: - storePool.throttle(throttleFailed, resp.Message, to.StoreID) - return errors.Errorf("%s: remote couldn't accept %s with error: %s", - to, snap, resp.Message) + var err error + if resp.EncodedError.Error != nil { + // NB: errMarkSnapshotError must be set on the other end, if it is + // set. We're not going to add it here. + err = errors.DecodeError(ctx, resp.EncodedError) + } else { + // Deprecated path. + err = errors.Errorf("%s", resp.Message) + err = errors.Mark(err, errMarkSnapshotError) + } + storePool.throttle(throttleFailed, err.Error(), to.StoreID) + return errors.Wrapf(err, "%s: remote couldn't accept %s", to, snap) case SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. default: