From 71dfc0926917cd1dd835073fdaae85dd9b343e2b Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Thu, 20 Oct 2022 13:07:28 -0400 Subject: [PATCH 1/2] roachtest: configure ignore and block lists for 23.1 in roachtests Configure ignore and block lists for 23.1 in roachtests. Release note: None --- pkg/cmd/roachtest/tests/activerecord_blocklist.go | 5 +++++ pkg/cmd/roachtest/tests/gopg_blocklist.go | 5 +++++ pkg/cmd/roachtest/tests/hibernate_blocklist.go | 8 ++++++++ pkg/cmd/roachtest/tests/jasyncsql_blocklist.go | 5 +++++ pkg/cmd/roachtest/tests/libpq_blocklist.go | 5 +++++ pkg/cmd/roachtest/tests/liquibase_blocklist.go | 5 +++++ pkg/cmd/roachtest/tests/pgx_blocklist.go | 5 +++++ pkg/cmd/roachtest/tests/psycopg_blocklist.go | 5 +++++ 8 files changed, 43 insertions(+) diff --git a/pkg/cmd/roachtest/tests/activerecord_blocklist.go b/pkg/cmd/roachtest/tests/activerecord_blocklist.go index 2ccd204cb657..0885c4fc46d1 100644 --- a/pkg/cmd/roachtest/tests/activerecord_blocklist.go +++ b/pkg/cmd/roachtest/tests/activerecord_blocklist.go @@ -16,6 +16,7 @@ var activeRecordBlocklists = blocklistsForVersion{ {"v21.2", "activeRecordBlockList21_2", activeRecordBlockList21_2, "activeRecordIgnoreList21_2", activeRecordIgnoreList21_2}, {"v22.1", "activeRecordBlockList22_1", activeRecordBlockList22_1, "activeRecordIgnoreList22_1", activeRecordIgnoreList22_1}, {"v22.2", "activeRecordBlockList22_2", activeRecordBlockList22_2, "activeRecordIgnoreList22_2", activeRecordIgnoreList22_2}, + {"v23.1", "activeRecordBlockList23_1", activeRecordBlockList23_1, "activeRecordIgnoreList23_1", activeRecordIgnoreList23_1}, } // These are lists of known activerecord test errors and failures. @@ -29,6 +30,8 @@ var activeRecordBlocklists = blocklistsForVersion{ // Please keep these lists alphabetized for easy diffing. // After a failed run, an updated version of this blocklist should be available // in the test log. +var activeRecordBlockList23_1 = blocklist{} + var activeRecordBlockList22_2 = blocklist{} var activeRecordBlockList22_1 = blocklist{} @@ -39,6 +42,8 @@ var activeRecordBlockList21_1 = blocklist{} var activeRecordBlockList20_2 = blocklist{} +var activeRecordIgnoreList23_1 = activeRecordIgnoreList22_2 + var activeRecordIgnoreList22_2 = activeRecordIgnoreList22_1 var activeRecordIgnoreList22_1 = blocklist{ diff --git a/pkg/cmd/roachtest/tests/gopg_blocklist.go b/pkg/cmd/roachtest/tests/gopg_blocklist.go index a9fa4f9794b4..b8bd82eb92e0 100644 --- a/pkg/cmd/roachtest/tests/gopg_blocklist.go +++ b/pkg/cmd/roachtest/tests/gopg_blocklist.go @@ -16,6 +16,7 @@ var gopgBlocklists = blocklistsForVersion{ {"v21.2", "gopgBlockList21_2", gopgBlockList21_2, "gopgIgnoreList21_2", gopgIgnoreList21_2}, {"v22.1", "gopgBlockList22_1", gopgBlockList22_1, "gopgIgnoreList22_1", gopgIgnoreList22_1}, {"v22.2", "gopgBlockList22_2", gopgBlockList22_2, "gopgIgnoreList22_2", gopgIgnoreList22_2}, + {"v23.1", "gopgBlockList23_1", gopgBlockList23_1, "gopgIgnoreList23_1", gopgIgnoreList23_1}, } // These are lists of known gopg test errors and failures. @@ -27,6 +28,8 @@ var gopgBlocklists = blocklistsForVersion{ // After a failed run, an updated version of this blocklist should be available // in the test log. +var gopgBlockList23_1 = gopgBlockList22_2 + var gopgBlockList22_2 = gopgBlockList22_1 var gopgBlockList22_1 = gopgBlockList21_2 @@ -81,6 +84,8 @@ var gopgBlockList20_2 = blocklist{ "v10.TestUnixSocket": "31113", } +var gopgIgnoreList23_1 = gopgIgnoreList22_2 + var gopgIgnoreList22_2 = gopgIgnoreList22_1 var gopgIgnoreList22_1 = gopgIgnoreList21_2 diff --git a/pkg/cmd/roachtest/tests/hibernate_blocklist.go b/pkg/cmd/roachtest/tests/hibernate_blocklist.go index fc6fe45e326f..1856d32f47fb 100644 --- a/pkg/cmd/roachtest/tests/hibernate_blocklist.go +++ b/pkg/cmd/roachtest/tests/hibernate_blocklist.go @@ -16,6 +16,7 @@ var hibernateBlocklists = blocklistsForVersion{ {"v21.2", "hibernateBlockList21_2", hibernateBlockList21_2, "hibernateIgnoreList21_2", hibernateIgnoreList21_2}, {"v22.1", "hibernateBlockList22_1", hibernateBlockList22_1, "hibernateIgnoreList22_1", hibernateIgnoreList22_1}, {"v22.2", "hibernateBlockList22_2", hibernateBlockList22_2, "hibernateIgnoreList22_2", hibernateIgnoreList22_2}, + {"v23.1", "hibernateBlockList23_1", hibernateBlockList23_1, "hibernateIgnoreList23_1", hibernateIgnoreList23_1}, } var hibernateSpatialBlocklists = blocklistsForVersion{ @@ -23,11 +24,14 @@ var hibernateSpatialBlocklists = blocklistsForVersion{ {"v21.2", "hibernateSpatialBlockList21_2", hibernateSpatialBlockList21_2, "", nil}, {"v22.1", "hibernateSpatialBlockList22_1", hibernateSpatialBlockList22_1, "", nil}, {"v22.2", "hibernateSpatialBlockList22_2", hibernateSpatialBlockList22_2, "", nil}, + {"v23.1", "hibernateSpatialBlockList23_1", hibernateSpatialBlockList23_1, "", nil}, } // Please keep these lists alphabetized for easy diffing. // After a failed run, an updated version of this blocklist should be available // in the test log. +var hibernateSpatialBlockList23_1 = blocklist{} + var hibernateSpatialBlockList22_2 = blocklist{} var hibernateSpatialBlockList22_1 = blocklist{} @@ -36,6 +40,8 @@ var hibernateSpatialBlockList21_2 = blocklist{} var hibernateSpatialBlockList21_1 = blocklist{} +var hibernateBlockList23_1 = hibernateBlockList22_2 + var hibernateBlockList22_2 = hibernateBlockList22_1 var hibernateBlockList22_1 = blocklist{ @@ -238,6 +244,8 @@ var hibernateBlockList20_2 = blocklist{ "org.hibernate.test.where.annotations.EagerManyToOneFetchModeSelectWhereTest.testAssociatedWhereClause": "unknown", } +var hibernateIgnoreList23_1 = hibernateIgnoreList22_2 + var hibernateIgnoreList22_2 = hibernateIgnoreList22_1 var hibernateIgnoreList22_1 = hibernateIgnoreList21_2 diff --git a/pkg/cmd/roachtest/tests/jasyncsql_blocklist.go b/pkg/cmd/roachtest/tests/jasyncsql_blocklist.go index 4d1eef5c91ca..5dbcf44ad338 100644 --- a/pkg/cmd/roachtest/tests/jasyncsql_blocklist.go +++ b/pkg/cmd/roachtest/tests/jasyncsql_blocklist.go @@ -13,8 +13,11 @@ package tests var jasyncsqlBlocklists = blocklistsForVersion{ {"v22.1", "jasyncsqlBlocklist22_1", jasyncBlocklist22_1, "jasyncsqlIgnoreList22_1", jasyncsqlIgnoreList22_1}, {"v22.2", "jasyncsqlBlocklist22_2", jasyncBlocklist22_2, "jasyncsqlIgnoreList22_2", jasyncsqlIgnoreList22_2}, + {"v23.1", "jasyncsqlBlocklist23_1", jasyncBlocklist23_1, "jasyncsqlIgnoreList23_1", jasyncsqlIgnoreList23_1}, } +var jasyncBlocklist23_1 = jasyncBlocklist22_2 + var jasyncBlocklist22_2 = jasyncBlocklist22_1 var jasyncBlocklist22_1 = blocklist{ @@ -75,6 +78,8 @@ var jasyncBlocklist22_1 = blocklist{ "com.github.aysnc.sql.db.integration.pool.SuspendingPoolSpec.transactions with pool should commit simple inserts , prepared statements": "unknown", } +var jasyncsqlIgnoreList23_1 = jasyncsqlIgnoreList22_2 + var jasyncsqlIgnoreList22_2 = jasyncsqlIgnoreList22_1 var jasyncsqlIgnoreList22_1 = blocklist{} diff --git a/pkg/cmd/roachtest/tests/libpq_blocklist.go b/pkg/cmd/roachtest/tests/libpq_blocklist.go index cbbea227bde0..9b56a2d2e9a7 100644 --- a/pkg/cmd/roachtest/tests/libpq_blocklist.go +++ b/pkg/cmd/roachtest/tests/libpq_blocklist.go @@ -16,8 +16,11 @@ var libPQBlocklists = blocklistsForVersion{ {"v21.2", "libPQBlocklist21_2", libPQBlocklist21_2, "libPQIgnorelist21_2", libPQIgnorelist21_2}, {"v22.1", "libPQBlocklist22_1", libPQBlocklist22_1, "libPQIgnorelist22_1", libPQIgnorelist22_1}, {"v22.2", "libPQBlocklist22_2", libPQBlocklist22_2, "libPQIgnorelist22_2", libPQIgnorelist22_2}, + {"v23.1", "libPQBlocklist23_1", libPQBlocklist23_1, "libPQIgnorelist23_1", libPQIgnorelist23_1}, } +var libPQBlocklist23_1 = libPQBlocklist22_2 + var libPQBlocklist22_2 = libPQBlocklist22_1 var libPQBlocklist22_1 = blocklist{ @@ -126,6 +129,8 @@ var libPQBlocklist20_2 = blocklist{ "pq.TestStringWithNul": "26366", } +var libPQIgnorelist23_1 = libPQIgnorelist22_2 + var libPQIgnorelist22_2 = libPQIgnorelist22_1 var libPQIgnorelist22_1 = libPQIgnorelist21_2 diff --git a/pkg/cmd/roachtest/tests/liquibase_blocklist.go b/pkg/cmd/roachtest/tests/liquibase_blocklist.go index f45980358adf..bb922bc7b774 100644 --- a/pkg/cmd/roachtest/tests/liquibase_blocklist.go +++ b/pkg/cmd/roachtest/tests/liquibase_blocklist.go @@ -16,8 +16,11 @@ var liquibaseBlocklists = blocklistsForVersion{ {"v21.2", "liquibaseBlocklist21_2", liquibaseBlocklist21_2, "liquibaseIgnorelist21_2", liquibaseIgnorelist21_2}, {"v22.1", "liquibaseBlocklist22_1", liquibaseBlocklist22_1, "liquibaseIgnorelist21_2", liquibaseIgnorelist22_1}, {"v22.2", "liquibaseBlocklist22_2", liquibaseBlocklist22_2, "liquibaseIgnorelist21_2", liquibaseIgnorelist22_2}, + {"v23.1", "liquibaseBlocklist23_1", liquibaseBlocklist23_1, "liquibaseIgnorelist23_1", liquibaseIgnorelist23_1}, } +var liquibaseBlocklist23_1 = liquibaseBlocklist22_2 + var liquibaseBlocklist22_2 = liquibaseBlocklist22_1 var liquibaseBlocklist22_1 = blocklist{ @@ -34,6 +37,8 @@ var liquibaseBlocklist21_1 = liquibaseBlocklist20_2 var liquibaseBlocklist20_2 = blocklist{} +var liquibaseIgnorelist23_1 = liquibaseIgnorelist22_2 + var liquibaseIgnorelist22_2 = liquibaseIgnorelist22_1 var liquibaseIgnorelist22_1 = liquibaseIgnorelist21_2 diff --git a/pkg/cmd/roachtest/tests/pgx_blocklist.go b/pkg/cmd/roachtest/tests/pgx_blocklist.go index 00d0f25f9336..2ecf6ec10c65 100644 --- a/pkg/cmd/roachtest/tests/pgx_blocklist.go +++ b/pkg/cmd/roachtest/tests/pgx_blocklist.go @@ -16,11 +16,14 @@ var pgxBlocklists = blocklistsForVersion{ {"v21.2", "pgxBlocklist21_2", pgxBlocklist21_2, "pgxIgnorelist21_2", pgxIgnorelist21_2}, {"v22.1", "pgxBlocklist22_1", pgxBlocklist22_1, "pgxIgnorelist22_1", pgxIgnorelist22_1}, {"v22.2", "pgxBlocklist22_2", pgxBlocklist22_2, "pgxIgnorelist22_2", pgxIgnorelist22_2}, + {"v23.1", "pgxBlocklist23_1", pgxBlocklist23_1, "pgxIgnorelist23_1", pgxIgnorelist23_1}, } // Please keep these lists alphabetized for easy diffing. // After a failed run, an updated version of this blocklist should be available // in the test log. +var pgxBlocklist23_1 = blocklist{} + var pgxBlocklist22_2 = blocklist{} var pgxBlocklist22_1 = blocklist{} @@ -31,6 +34,8 @@ var pgxBlocklist21_1 = blocklist{} var pgxBlocklist20_2 = blocklist{} +var pgxIgnorelist23_1 = pgxIgnorelist22_2 + var pgxIgnorelist22_2 = pgxIgnorelist22_1 var pgxIgnorelist22_1 = pgxIgnorelist21_2 diff --git a/pkg/cmd/roachtest/tests/psycopg_blocklist.go b/pkg/cmd/roachtest/tests/psycopg_blocklist.go index 520158bf343a..85e1b5b18ffb 100644 --- a/pkg/cmd/roachtest/tests/psycopg_blocklist.go +++ b/pkg/cmd/roachtest/tests/psycopg_blocklist.go @@ -16,6 +16,7 @@ var psycopgBlocklists = blocklistsForVersion{ {"v21.2", "psycopgBlockList21_2", psycopgBlockList21_2, "psycopgIgnoreList21_2", psycopgIgnoreList21_2}, {"v22.1", "psycopgBlockList22_1", psycopgBlockList22_1, "psycopgIgnoreList22_1", psycopgIgnoreList22_1}, {"v22.2", "psycopgBlockList22_2", psycopgBlockList22_2, "psycopgIgnoreList22_2", psycopgIgnoreList22_2}, + {"v23.1", "psycopgBlockList23_1", psycopgBlockList23_1, "psycopgIgnoreList23_1", psycopgIgnoreList23_1}, } // These are lists of known psycopg test errors and failures. @@ -29,6 +30,8 @@ var psycopgBlocklists = blocklistsForVersion{ // Please keep these lists alphabetized for easy diffing. // After a failed run, an updated version of this blocklist should be available // in the test log. +var psycopgBlockList23_1 = blocklist{} + var psycopgBlockList22_2 = blocklist{} var psycopgBlockList22_1 = blocklist{} @@ -50,6 +53,8 @@ var psycopgBlockList20_2 = blocklist{ "tests.test_async_keyword.CancelTests.test_async_cancel": "41335", } +var psycopgIgnoreList23_1 = psycopgIgnoreList22_2 + var psycopgIgnoreList22_2 = psycopgIgnoreList22_1 var psycopgIgnoreList22_1 = psycopgIgnoreList21_2 From 4490ef18fbbba2158619acd1062e234dd8447536 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 20 Jan 2022 22:38:51 +0100 Subject: [PATCH 2/2] kvserver: use EncodedError in SnapshotResponse We were previously using a "Message" string to indicate details about an error. We can do so much better now and actually encode the error. This wasn't possible when this field was first added, but it is now, so let's use it. As always, there's a migration concern, which means the old field stays around & is populated as well as interpreted for one release. We then use this new-found freedom to improve which errors were marked as "failed snapshot" errors. Previously, any error coming in on a `SnapshotResponse` were considered snapshot errors and were considered retriable. This was causing `TestLearnerSnapshotFailsRollback` to run for 90s, as `TestCluster`'s replication changes use a [SucceedsSoon] to retry snapshot errors - but that test actually injects an error that it wants to fail-fast. Now, since snapshot error marks propagate over the wire, we can do the marking on the *sender* of the SnapshotResponse, and we can only mark messages that correspond to an actual failure to apply the snapshot (as opposed to an injected error, or a hard error due to a malformed request). The test now takes around one second, for a rare 90x speed-up. As a drive-by, we're also removing `errMalformedSnapshot`, which became unused when we stopped sending the raft log in raft snaps a few releases back, and which had managed to hide from the `unused` lint. [SucceedsSoon]: https://github.com/cockroachdb/cockroach/blob/37175f77bf374d1bcb76bc39a65149788be06134/pkg/testutils/testcluster/testcluster.go#L628-L631 Fixes #74621. Release note: None --- pkg/kv/kvserver/kvserverpb/BUILD.bazel | 3 + pkg/kv/kvserver/kvserverpb/raft.go | 28 +++++++++ pkg/kv/kvserver/kvserverpb/raft.proto | 14 ++++- pkg/kv/kvserver/markers.go | 4 ++ pkg/kv/kvserver/raft_transport.go | 15 ++--- pkg/kv/kvserver/replica_command.go | 25 ++------ pkg/kv/kvserver/replica_learner_test.go | 8 +-- pkg/kv/kvserver/store_raft.go | 9 +-- pkg/kv/kvserver/store_snapshot.go | 84 +++++++++++++++---------- pkg/kv/kvserver/store_test.go | 3 +- 10 files changed, 121 insertions(+), 72 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index 8396a6b0884d..c64a0cd2f7e4 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -17,6 +17,7 @@ go_library( deps = [ "//pkg/roachpb", "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", ], ) @@ -38,6 +39,7 @@ proto_library( "//pkg/storage/enginepb:enginepb_proto", "//pkg/util/hlc:hlc_proto", "//pkg/util/tracing/tracingpb:tracingpb_proto", + "@com_github_cockroachdb_errors//errorspb:errorspb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:timestamp_proto", "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", @@ -59,6 +61,7 @@ go_proto_library( "//pkg/util/hlc", "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", # keep + "@com_github_cockroachdb_errors//errorspb", "@com_github_gogo_protobuf//gogoproto", "@io_etcd_go_etcd_raft_v3//raftpb", ], diff --git a/pkg/kv/kvserver/kvserverpb/raft.go b/pkg/kv/kvserver/kvserverpb/raft.go index 3940a7a1c8ca..9612ce2e9d45 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.go +++ b/pkg/kv/kvserver/kvserverpb/raft.go @@ -10,5 +10,33 @@ package kvserverpb +import ( + "context" + + "github.com/cockroachdb/errors" +) + // SafeValue implements the redact.SafeValue interface. func (SnapshotRequest_Type) SafeValue() {} + +// Error returns the error contained in the snapshot response, if any. +// +// The bool indicates whether this message uses the deprecated behavior of +// encoding an error as a string. +func (m *DelegateSnapshotResponse) Error() (deprecated bool, _ error) { + return m.SnapResponse.Error() +} + +// Error returns the error contained in the snapshot response, if any. +// +// The bool indicates whether this message uses the deprecated behavior of +// encoding an error as a string. +func (m *SnapshotResponse) Error() (deprecated bool, _ error) { + if m.Status != SnapshotResponse_ERROR { + return false, nil + } + if m.EncodedError.IsSet() { + return false, errors.DecodeError(context.Background(), m.EncodedError) + } + return true, errors.Newf("%s", m.DeprecatedMessage) +} diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4923d96ffecd..0f8276cb13b1 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -12,6 +12,7 @@ syntax = "proto3"; package cockroach.kv.kvserver.kvserverpb; option go_package = "kvserverpb"; +import "errorspb/errors.proto"; import "roachpb/errors.proto"; import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; @@ -237,11 +238,22 @@ message SnapshotResponse { reserved 4; } Status status = 1; - string message = 2; + // Message is a message explaining an ERROR return value. It is not set for any + // other status. + // + // As of 23.1, the encoded_error field is always used instead. 23.1 itself + // needs to populate both due to needing to be compatible with 22.2. Once + // the MinSupportedVersion is 23.1, this can be removed. + string deprecated_message = 2; reserved 3; // Traces from snapshot processing, returned on status APPLIED or ERROR. repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false]; + + // encoded_error encodes the error when the status is ERROR. + // + // MIGRATION: only guaranteed to be set when the message field is no longer there. + errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false]; } // DelegateSnapshotRequest is the request used to delegate send snapshot requests. diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 92597731397e..4f95d04d377c 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -15,6 +15,10 @@ import ( "github.com/cockroachdb/errors" ) +// errMarkSnapshotError is used as an error mark for errors that get returned +// to the initiator of a snapshot. This generally classifies errors as transient, +// i.e. communicates an intention for the caller to retry. +// // NB: don't change the string here; this will cause cross-version issues // since this singleton is used as a marker. var errMarkSnapshotError = errors.New("snapshot failed") diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 33cfb4524bc3..ab71cb548711 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -329,12 +329,10 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh } // Check to ensure the header is valid. if req == nil { + err := errors.New("client error: no message in first delegated snapshot request") return stream.Send( &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: "client error: no message in first delegated snapshot request", - }, + SnapResponse: snapRespErr(err), }, ) } @@ -364,9 +362,8 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return err } if req.Header == nil { - return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: "client error: no header in first snapshot request message"}) + err := errors.New("client error: no header in first snapshot request message") + return stream.Send(snapRespErr(err)) } rmr := req.Header.RaftMessageRequest handler, ok := t.getHandler(rmr.ToReplica.StoreID) @@ -651,14 +648,14 @@ func (t *RaftTransport) DelegateSnapshot( nodeID := req.DelegatedSender.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return err + return errors.Mark(err, errMarkSnapshotError) } client := NewMultiRaftClient(conn) // Creates a rpc stream between the leaseholder and sender. stream, err := client.DelegateRaftSnapshot(ctx) if err != nil { - return err + return errors.Mark(err, errMarkSnapshotError) } defer func() { if err := stream.CloseSend(); err != nil { diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index dd30a423ff59..73e9e4e4f76d 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2723,11 +2723,12 @@ func (r *Replica) sendSnapshot( ) }, ) - - if err != nil { - return errors.Mark(err, errMarkSnapshotError) + // Only mark explicitly as snapshot error (which is retriable) if we timed out. + // Otherwise, it's up to the remote to add this mark where appropriate. + if errors.HasType(err, (*contextutil.TimeoutError)(nil)) { + err = errors.Mark(err, errMarkSnapshotError) } - return nil + return err } // followerSnapshotsEnabled is used to enable or disable follower snapshots. @@ -2882,7 +2883,7 @@ func (r *Replica) followerSendSnapshot( } } - err = contextutil.RunWithTimeout( + return contextutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { return r.store.cfg.Transport.SendSnapshot( ctx, @@ -2895,20 +2896,6 @@ func (r *Replica) followerSendSnapshot( ) }, ) - if err != nil { - if errors.Is(err, errMalformedSnapshot) { - tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()) - if dir, err := r.store.checkpoint(ctx, tag); err != nil { - log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err) - } else { - log.Warningf(ctx, "created checkpoint %s", dir) - } - - log.Fatal(ctx, "malformed snapshot generated") - } - return errors.Mark(err, errMarkSnapshotError) - } - return nil } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 06933e69aeec..073078ab9828 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -430,11 +430,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { skip.UnderRace(t) runTest := func(t *testing.T, replicaType roachpb.ReplicaType) { - var rejectSnapshots int64 + var rejectSnapshotErr atomic.Value // error knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { - if atomic.LoadInt64(&rejectSnapshots) > 0 { - return errors.New(`nope`) + if err := rejectSnapshotErr.Load().(error); err != nil { + return err } return nil } @@ -446,7 +446,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { defer tc.Stopper().Stop(ctx) scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(&rejectSnapshots, 1) + rejectSnapshotErr.Store(errors.New("boom")) var err error switch replicaType { case roachpb.LEARNER: diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 257ab1f8acce..10aeab123f5d 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -180,10 +180,7 @@ func (s *Store) HandleDelegatedSnapshot( if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req, stream); err != nil { return stream.Send( &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), - }, + SnapResponse: snapRespErr(err), CollectedSpans: sp.GetConfiguredRecording(), }, ) @@ -191,8 +188,8 @@ func (s *Store) HandleDelegatedSnapshot( resp := &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_APPLIED, - Message: "Snapshot successfully applied by recipient", + Status: kvserverpb.SnapshotResponse_APPLIED, + DeprecatedMessage: "Snapshot successfully applied by recipient", }, CollectedSpans: sp.GetConfiguredRecording(), } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 07fff97b5b0a..3c5e2acf1552 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -504,10 +504,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } } -// errMalformedSnapshot indicates that the snapshot in question is malformed, -// for e.g. missing raft log entries. -var errMalformedSnapshot = errors.New("malformed snapshot generated") - // Send implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Send( ctx context.Context, @@ -885,7 +881,10 @@ func (s *Store) checkSnapshotOverlapLocked( // NB: this check seems redundant since placeholders are also represented in // replicasByKey (and thus returned in getOverlappingKeyRangeLocked). if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok { - return errors.Errorf("%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica) + return errors.Mark(errors.Errorf( + "%s: canAcceptSnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", + s, exRng, snapHeader.RaftMessageRequest.FromReplica), + errMarkSnapshotError) } // TODO(benesch): consider discovering and GC'ing *all* overlapping ranges, @@ -930,7 +929,10 @@ func (s *Store) checkSnapshotOverlapLocked( msg += "; initiated GC:" s.replicaGCQueue.AddAsync(ctx, exReplica, gcPriority) } - return errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()) // exReplica can be nil + return errors.Mark( + errors.Errorf("%s %v (incoming %v)", msg, exReplica, snapHeader.State.Desc.RSpan()), // exReplica can be nil + errMarkSnapshotError, + ) } return nil } @@ -964,6 +966,8 @@ func (s *Store) receiveSnapshot( if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { + // NB: we intentionally don't mark this error as errMarkSnapshotError so + // that we don't end up retrying injected errors in tests. return sendSnapshotError(stream, err) } } @@ -1004,7 +1008,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return pErr.GoError() + return sendSnapshotError(stream, pErr.GoError()) } defer func() { @@ -1078,10 +1082,13 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { - return sendSnapshotErrorWithTrace(stream, - errors.Wrap(err.GoError(), "failed to apply snapshot"), rec, - ) + if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + err := pErr.GoError() + // We mark this error as a snapshot error which will be interpreted by the + // sender as this being a retriable error, see isSnapshotError(). + err = errors.Mark(err, errMarkSnapshotError) + err = errors.Wrap(err, "failed to apply snapshot") + return sendSnapshotErrorWithTrace(stream, err, rec) } return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, @@ -1096,11 +1103,24 @@ func sendSnapshotError(stream incomingSnapshotStream, err error) error { func sendSnapshotErrorWithTrace( stream incomingSnapshotStream, err error, trace tracingpb.Recording, ) error { - return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), - CollectedSpans: trace, - }) + resp := snapRespErr(err) + resp.CollectedSpans = trace + return stream.Send(resp) +} + +func snapRespErr(err error) *kvserverpb.SnapshotResponse { + return &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, + EncodedError: errors.EncodeError(context.Background(), err), + DeprecatedMessage: err.Error(), + } +} + +func maybeHandleDeprecatedSnapErr(deprecated bool, err error) error { + if !deprecated { + return err + } + return errors.Mark(err, errMarkSnapshotError) } // SnapshotStorePool narrows StorePool to make sendSnapshot easier to test. @@ -1505,9 +1525,8 @@ func sendSnapshot( switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: sp.ImportRemoteRecording(resp.CollectedSpans) - storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID) - return errors.Errorf("%s: remote couldn't accept %s with error: %s", - to, snap, resp.Message) + storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID) + return errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap) case kvserverpb.SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server") @@ -1594,7 +1613,9 @@ func sendSnapshot( } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.Message) + return errors.Wrapf( + maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote failed to apply snapshot", to, + ) case kvserverpb.SnapshotResponse_APPLIED: return nil default: @@ -1622,10 +1643,9 @@ func delegateSnapshot( } switch resp.SnapResponse.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Errorf( - "%s: sender couldn't accept %s with error: %s", delegatedSender, - req, resp.SnapResponse.Message, - ) + return errors.Wrapf( + maybeHandleDeprecatedSnapErr(resp.Error()), + "%s: sender couldn't accept %s", delegatedSender, req) case kvserverpb.SnapshotResponse_ACCEPTED: // The sender accepted the request, it will continue with sending. log.VEventf( @@ -1643,20 +1663,20 @@ func delegateSnapshot( // Wait for response to see if the receiver successfully applied the snapshot. resp, err = stream.Recv() if err != nil { - return errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender) + return errors.Mark( + errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender), errMarkSnapshotError, + ) } // Wait for EOF to ensure server side processing is complete. if unexpectedResp, err := stream.Recv(); err != io.EOF { if err != nil { - return errors.Wrapf( + return errors.Mark(errors.Wrapf( err, "%s: expected EOF, got resp=%v with error", - delegatedSender.StoreID, unexpectedResp, - ) + delegatedSender.StoreID, unexpectedResp), errMarkSnapshotError) } - return errors.Newf( + return errors.Mark(errors.Newf( "%s: expected EOF, got resp=%v", delegatedSender.StoreID, - unexpectedResp, - ) + unexpectedResp), errMarkSnapshotError) } sp := tracing.SpanFromContext(ctx) if sp != nil { @@ -1664,7 +1684,7 @@ func delegateSnapshot( } switch resp.SnapResponse.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Newf("%s", resp.SnapResponse.Message) + return maybeHandleDeprecatedSnapErr(resp.Error()) case kvserverpb.SnapshotResponse_APPLIED: // This is the response we're expecting. Snapshot successfully applied. log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender) diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 098bbfd6ccd2..b410a9d68554 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2911,7 +2911,8 @@ func TestSendSnapshotThrottling(t *testing.T) { { sp := &fakeStorePool{} resp := &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, + Status: kvserverpb.SnapshotResponse_ERROR, + EncodedError: errors.EncodeError(ctx, errors.New("boom")), } c := fakeSnapshotStream{resp, nil} err := sendSnapshot(