From 040fab4eb2da60d930cb563457e975c47afc7386 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 20 Jan 2022 22:38:51 +0100 Subject: [PATCH] kvserver: use EncodedError in SnapshotResponse We were previously using a "Message" string to indicate details about an error. We can do so much better now and actually encode the error. This wasn't possible when this field was first added, but it is now, so let's use it. As always, there's a migration concern, which means the old field stays around & is populated as well as interpreted for one release. We then use this new-found freedom to improve which errors were marked as "failed snapshot" errors. Previously, any error coming in on a `SnapshotResponse` were considered snapshot errors and were considered retriable. This was causing `TestLearnerSnapshotFailsRollback` to run for 90s, as `TestCluster`'s replication changes use a [SucceedsSoon] to retry snapshot errors - but that test actually injects an error that it wants to fail-fast. Now, since snapshot error marks propagate over the wire, we can do the marking on the *sender* of the SnapshotResponse, and we can only mark messages that correspond to an actual failure to apply the snapshot (as opposed to an injected error, or a hard error due to a malformed request). The test now takes around one second, for a rare 90x speed-up. As a drive-by, we're also removing `errMalformedSnapshot`, which became unused when we stopped sending the raft log in raft snaps a few releases back, and which had managed to hide from the `unused` lint. [SucceedsSoon]: https://github.com/cockroachdb/cockroach/blob/37175f77bf374d1bcb76bc39a65149788be06134/pkg/testutils/testcluster/testcluster.go#L628-L631 Fixes #74621. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 2 ++ pkg/kv/kvserver/markers.go | 4 +++ pkg/kv/kvserver/raft.proto | 11 ++++++ pkg/kv/kvserver/raft_transport.go | 7 ++-- pkg/kv/kvserver/replica_command.go | 17 ++------- pkg/kv/kvserver/replica_learner_test.go | 8 ++--- pkg/kv/kvserver/store_raft.go | 5 +-- pkg/kv/kvserver/store_snapshot.go | 47 +++++++++++++++++-------- 8 files changed, 64 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index cb174368921c..0de463cc3fa2 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -442,6 +442,7 @@ proto_library( "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto", "//pkg/roachpb:roachpb_proto", "//pkg/storage/enginepb:enginepb_proto", + "@com_github_cockroachdb_errors//errorspb:errorspb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", ], @@ -458,6 +459,7 @@ go_proto_library( "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb:with-mocks", "//pkg/storage/enginepb", + "@com_github_cockroachdb_errors//errorspb", "@com_github_gogo_protobuf//gogoproto", "@io_etcd_go_etcd_raft_v3//raftpb", ], diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 29b7269e4469..9bef525358a4 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -15,6 +15,10 @@ import ( "github.com/cockroachdb/errors" ) +// errMarkSnapshotError is used as an error mark for errors that get returned +// to the initiator of a snapshot. This generally classifies errors as transient, +// i.e. communicates an intention for the caller to retry. +// // NB: don't change the string here; this will cause cross-version issues // since this singleton is used as a marker. var errMarkSnapshotError = errors.New("snapshot failed") diff --git a/pkg/kv/kvserver/raft.proto b/pkg/kv/kvserver/raft.proto index a11ac8fda205..2a63c5a48893 100644 --- a/pkg/kv/kvserver/raft.proto +++ b/pkg/kv/kvserver/raft.proto @@ -12,6 +12,7 @@ syntax = "proto3"; package cockroach.kv.kvserver; option go_package = "kvserver"; +import "errorspb/errors.proto"; import "roachpb/errors.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; @@ -210,8 +211,18 @@ message SnapshotResponse { reserved 4; } Status status = 1; + // Message is a message explaining an ERROR return value. It is not set for any + // other status. + // + // DEPRECATED: this field can be removed in 22.2. As of 22.1, the encoded_error + // field is always used instead. (22.1 itself needs to populate both due to + // needing to be wire-compatible with 21.2). string message = 2; reserved 3; + // encoded_error encodes the error when the status is ERROR. + // + // MIGRATION: only guaranteed to be set when the message field is no longer there. + errorspb.EncodedError encoded_error = 4 [(gogoproto.nullable) = false]; } // ConfChangeContext is encoded in the raftpb.ConfChange.Context field. diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 44a6ebe6f41a..7275d612308c 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -420,9 +420,12 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return err } if req.Header == nil { + err := errors.New("client error: no header in first snapshot request message") return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, - Message: "client error: no header in first snapshot request message"}) + Status: SnapshotResponse_ERROR, + Message: err.Error(), + EncodedError: errors.EncodeError(ctx, err), + }) } rmr := req.Header.RaftMessageRequest handler, ok := t.getHandler(rmr.ToReplica.StoreID) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 3d9834956bb6..c43886ffe0a2 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2499,27 +2499,14 @@ func (r *Replica) sendSnapshot( sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) } - if err := r.store.cfg.Transport.SendSnapshot( + return r.store.cfg.Transport.SendSnapshot( ctx, r.store.allocator.storePool, req, snap, newBatchFn, sent, - ); err != nil { - if errors.Is(err, errMalformedSnapshot) { - tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()) - if dir, err := r.store.checkpoint(ctx, tag); err != nil { - log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err) - } else { - log.Warningf(ctx, "created checkpoint %s", dir) - } - - log.Fatal(ctx, "malformed snapshot generated") - } - return errors.Mark(err, errMarkSnapshotError) - } - return nil + ) } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 2700c7fc2749..2f1c2c52a551 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -313,11 +313,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { skip.UnderShort(t) // Takes 90s. runTest := func(t *testing.T, replicaType roachpb.ReplicaType) { - var rejectSnapshots int64 + var rejectSnapshotErr atomic.Value // error knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { - if atomic.LoadInt64(&rejectSnapshots) > 0 { - return errors.New(`nope`) + if err := rejectSnapshotErr.Load().(error); err != nil { + return err } return nil } @@ -329,7 +329,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { defer tc.Stopper().Stop(ctx) scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(&rejectSnapshots, 1) + rejectSnapshotErr.Store(errors.New("boom")) var err error switch replicaType { case roachpb.LEARNER: diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index e7cc24dba5f1..521255020e61 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -74,8 +74,9 @@ func (s *Store) HandleSnapshot( if s.IsDraining() { return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, - Message: storeDrainingMsg, + Status: SnapshotResponse_ERROR, + Message: storeDrainingMsg, + EncodedError: errors.EncodeError(ctx, errors.Errorf("%s", storeDrainingMsg)), }) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e5ed6dd91677..1d484d2cb3f0 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -306,10 +306,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } } -// errMalformedSnapshot indicates that the snapshot in question is malformed, -// for e.g. missing raft log entries. -var errMalformedSnapshot = errors.New("malformed snapshot generated") - // Send implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Send( ctx context.Context, @@ -536,7 +532,10 @@ func (s *Store) checkSnapshotOverlapLocked( // NB: this check seems redundant since placeholders are also represented in // replicasByKey (and thus returned in getOverlappingKeyRangeLocked). if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok { - return errors.Errorf("%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica) + return errors.Mark(errors.Errorf( + "%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", + s, exRng, snapHeader.RaftMessageRequest.FromReplica), + errMarkSnapshotError) } // TODO(benesch): consider discovering and GC'ing *all* overlapping ranges, @@ -581,7 +580,10 @@ func (s *Store) checkSnapshotOverlapLocked( msg += "; initiated GC:" s.replicaGCQueue.AddAsync(ctx, exReplica, gcPriority) } - return errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()) // exReplica can be nil + return errors.Mark( + errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()), // exReplica can be nil + errMarkSnapshotError, + ) } return nil } @@ -592,6 +594,8 @@ func (s *Store) receiveSnapshot( ) error { if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { + // NB: we intentionally don't mark this error as errMarkSnapshotError so + // that we don't end up retrying injected errors in tests. return sendSnapshotError(stream, err) } } @@ -632,7 +636,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return pErr.GoError() + return sendSnapshotError(stream, pErr.GoError()) } defer func() { @@ -687,16 +691,22 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { - return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + err := pErr.GoError() + // We mark this error as a snapshot error which will be interpreted by the + // sender as this being a retriable error, see isSnapshotError(). + err = errors.Mark(err, errMarkSnapshotError) + err = errors.Wrap(err, "failed to apply snapshot") + return sendSnapshotError(stream, err) } return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED}) } func sendSnapshotError(stream incomingSnapshotStream, err error) error { return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, - Message: err.Error(), + Status: SnapshotResponse_ERROR, + Message: err.Error(), + EncodedError: errors.EncodeError(context.Background(), err), }) } @@ -1041,9 +1051,18 @@ func sendSnapshot( } switch resp.Status { case SnapshotResponse_ERROR: - storePool.throttle(throttleFailed, resp.Message, to.StoreID) - return errors.Errorf("%s: remote couldn't accept %s with error: %s", - to, snap, resp.Message) + var err error + if resp.EncodedError.Error != nil { + // NB: errMarkSnapshotError must be set on the other end, if it is + // set. We're not going to add it here. + err = errors.DecodeError(ctx, resp.EncodedError) + } else { + // Deprecated path. + err = errors.Errorf("%s", resp.Message) + err = errors.Mark(err, errMarkSnapshotError) + } + storePool.throttle(throttleFailed, err.Error(), to.StoreID) + return errors.Wrapf(err, "%s: remote couldn't accept %s", to, snap) case SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. default: