Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: use EncodedError in SnapshotResponse #75248

Merged
merged 1 commit into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvserverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
deps = [
"//pkg/roachpb",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand All @@ -38,6 +39,7 @@ proto_library(
"//pkg/storage/enginepb:enginepb_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/tracing/tracingpb:tracingpb_proto",
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:timestamp_proto",
"@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto",
Expand All @@ -59,6 +61,7 @@ go_proto_library(
"//pkg/util/hlc",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid", # keep
"@com_github_cockroachdb_errors//errorspb",
"@com_github_gogo_protobuf//gogoproto",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,33 @@

package kvserverpb

import (
"context"

"github.com/cockroachdb/errors"
)

// SafeValue implements the redact.SafeValue interface.
func (SnapshotRequest_Type) SafeValue() {}

// Error returns the error contained in the snapshot response, if any.
//
// The bool indicates whether this message uses the deprecated behavior of
// encoding an error as a string.
func (m *DelegateSnapshotResponse) Error() (deprecated bool, _ error) {
return m.SnapResponse.Error()
}

// Error returns the error contained in the snapshot response, if any.
//
// The bool indicates whether this message uses the deprecated behavior of
// encoding an error as a string.
func (m *SnapshotResponse) Error() (deprecated bool, _ error) {
if m.Status != SnapshotResponse_ERROR {
return false, nil
}
if m.EncodedError.IsSet() {
return false, errors.DecodeError(context.Background(), m.EncodedError)
}
return true, errors.Newf("%s", m.DeprecatedMessage)
}
14 changes: 13 additions & 1 deletion pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ syntax = "proto3";
package cockroach.kv.kvserver.kvserverpb;
option go_package = "kvserverpb";

import "errorspb/errors.proto";
import "roachpb/errors.proto";
import "roachpb/internal_raft.proto";
import "roachpb/metadata.proto";
Expand Down Expand Up @@ -237,11 +238,22 @@ message SnapshotResponse {
reserved 4;
}
Status status = 1;
string message = 2;
// Message is a message explaining an ERROR return value. It is not set for any
// other status.
//
// As of 23.1, the encoded_error field is always used instead. 23.1 itself
// needs to populate both due to needing to be compatible with 22.2. Once
// the MinSupportedVersion is 23.1, this can be removed.
string deprecated_message = 2;
reserved 3;

// Traces from snapshot processing, returned on status APPLIED or ERROR.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false];

// encoded_error encodes the error when the status is ERROR.
//
// MIGRATION: only guaranteed to be set when the message field is no longer there.
errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false];
}

// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"github.com/cockroachdb/errors"
)

// errMarkSnapshotError is used as an error mark for errors that get returned
// to the initiator of a snapshot. This generally classifies errors as transient,
// i.e. communicates an intention for the caller to retry.
//
// NB: don't change the string here; this will cause cross-version issues
// since this singleton is used as a marker.
var errMarkSnapshotError = errors.New("snapshot failed")
Expand Down
15 changes: 6 additions & 9 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,10 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh
}
// Check to ensure the header is valid.
if req == nil {
err := errors.New("client error: no message in first delegated snapshot request")
return stream.Send(
&kvserverpb.DelegateSnapshotResponse{
SnapResponse: &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: "client error: no message in first delegated snapshot request",
},
SnapResponse: snapRespErr(err),
},
)
}
Expand Down Expand Up @@ -364,9 +362,8 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error
return err
}
if req.Header == nil {
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: "client error: no header in first snapshot request message"})
err := errors.New("client error: no header in first snapshot request message")
return stream.Send(snapRespErr(err))
}
rmr := req.Header.RaftMessageRequest
handler, ok := t.getHandler(rmr.ToReplica.StoreID)
Expand Down Expand Up @@ -651,14 +648,14 @@ func (t *RaftTransport) DelegateSnapshot(
nodeID := req.DelegatedSender.NodeID
conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
return errors.Mark(err, errMarkSnapshotError)
}
client := NewMultiRaftClient(conn)

// Creates a rpc stream between the leaseholder and sender.
stream, err := client.DelegateRaftSnapshot(ctx)
if err != nil {
return err
return errors.Mark(err, errMarkSnapshotError)
}
defer func() {
if err := stream.CloseSend(); err != nil {
Expand Down
25 changes: 6 additions & 19 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2723,11 +2723,12 @@ func (r *Replica) sendSnapshot(
)
},
)

if err != nil {
return errors.Mark(err, errMarkSnapshotError)
// Only mark explicitly as snapshot error (which is retriable) if we timed out.
// Otherwise, it's up to the remote to add this mark where appropriate.
if errors.HasType(err, (*contextutil.TimeoutError)(nil)) {
err = errors.Mark(err, errMarkSnapshotError)
}
return nil
return err
}

// followerSnapshotsEnabled is used to enable or disable follower snapshots.
Expand Down Expand Up @@ -2882,7 +2883,7 @@ func (r *Replica) followerSendSnapshot(
}
}

err = contextutil.RunWithTimeout(
return contextutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
return r.store.cfg.Transport.SendSnapshot(
ctx,
Expand All @@ -2895,20 +2896,6 @@ func (r *Replica) followerSendSnapshot(
)
},
)
if err != nil {
if errors.Is(err, errMalformedSnapshot) {
tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short())
if dir, err := r.store.checkpoint(ctx, tag); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err)
} else {
log.Warningf(ctx, "created checkpoint %s", dir)
}

log.Fatal(ctx, "malformed snapshot generated")
}
return errors.Mark(err, errMarkSnapshotError)
}
return nil
}

// replicasCollocated is used in AdminMerge to ensure that the ranges are
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
skip.UnderRace(t)

runTest := func(t *testing.T, replicaType roachpb.ReplicaType) {
var rejectSnapshots int64
var rejectSnapshotErr atomic.Value // error
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) > 0 {
return errors.New(`nope`)
if err := rejectSnapshotErr.Load().(error); err != nil {
return err
}
return nil
}
Expand All @@ -446,7 +446,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
defer tc.Stopper().Stop(ctx)

scratchStartKey := tc.ScratchRange(t)
atomic.StoreInt64(&rejectSnapshots, 1)
rejectSnapshotErr.Store(errors.New("boom"))
var err error
switch replicaType {
case roachpb.LEARNER:
Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,16 @@ func (s *Store) HandleDelegatedSnapshot(
if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req, stream); err != nil {
return stream.Send(
&kvserverpb.DelegateSnapshotResponse{
SnapResponse: &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
},
SnapResponse: snapRespErr(err),
CollectedSpans: sp.GetConfiguredRecording(),
},
)
}

resp := &kvserverpb.DelegateSnapshotResponse{
SnapResponse: &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
Message: "Snapshot successfully applied by recipient",
Status: kvserverpb.SnapshotResponse_APPLIED,
DeprecatedMessage: "Snapshot successfully applied by recipient",
},
CollectedSpans: sp.GetConfiguredRecording(),
}
Expand Down
Loading