Skip to content

Commit

Permalink
kvserver: self delegated snapshots
Browse files Browse the repository at this point in the history
This commit adds a new rpc stream for sending raft message requests between
replicas which allows for delegating snapshots. Currently this patch implements
the leaseholder delegating to itself, but in future patches the leaseholder
will be able to delegate snapshot sending to follower replicas. A new raft
message request type of `DelegatedSnapshotRequest` includes a header of
nessesary fields from a `SnapshotRequest` and the replica descriptor of the new
sender replica. This allows the leaseholder to fill in snapshot metadata before
delegating to the new sender store to generate the snapshot and transmit it to
the recipient.

Related to #42491
Release note: None
  • Loading branch information
amygao9 committed Apr 11, 2022
1 parent 3d6f41b commit 92572ba
Show file tree
Hide file tree
Showing 14 changed files with 630 additions and 44 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<tr><td><code>kv.replica_circuit_breaker.slow_replication_threshold</code></td><td>duration</td><td><code>1m0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td></tr>
<tr><td><code>kv.replica_stats.addsst_request_size_factor</code></td><td>integer</td><td><code>50000</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td></tr>
<tr><td><code>kv.replication_reports.interval</code></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td></tr>
<tr><td><code>kv.snapshot_delegation.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to allow snapshots from follower replicas</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3"
)
Expand Down Expand Up @@ -180,6 +181,19 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot(
return store.HandleSnapshot(ctx, header, respStream)
}

func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot(
ctx context.Context,
req *kvserverpb.DelegateSnapshotRequest,
stream kvserver.DelegateSnapshotResponseStream,
span *tracing.Span,
) error {
store, err := h.getStore()
if err != nil {
return err
}
return store.HandleDelegatedSnapshot(ctx, req, stream, span)
}

// testClusterPartitionedRange is a convenient abstraction to create a range on a node
// in a multiTestContext which can be partitioned and unpartitioned.
type testClusterPartitionedRange struct {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -3452,6 +3453,15 @@ func (errorChannelTestHandler) HandleSnapshot(
panic("unimplemented")
}

func (errorChannelTestHandler) HandleDelegatedSnapshot(
ctx context.Context,
req *kvserverpb.DelegateSnapshotRequest,
stream kvserver.DelegateSnapshotResponseStream,
span *tracing.Span,
) error {
panic("unimplemented")
}

// This test simulates a scenario where one replica has been removed from the
// range's Raft group but it is unaware of the fact. We check that this replica
// coming back from the dead cannot cause elections.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvserverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proto_library(
"//pkg/roachpb:roachpb_proto",
"//pkg/storage/enginepb:enginepb_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/tracing/tracingpb:tracingpb_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:timestamp_proto",
"@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto",
Expand All @@ -55,6 +56,7 @@ go_proto_library(
"//pkg/roachpb",
"//pkg/storage/enginepb",
"//pkg/util/hlc",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
"@io_etcd_go_etcd_raft_v3//raftpb",
Expand Down
43 changes: 43 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ package cockroach.kv.kvserver.kvserverpb;
option go_package = "kvserverpb";

import "roachpb/errors.proto";
import "roachpb/internal_raft.proto";
import "roachpb/metadata.proto";
import "kv/kvserver/liveness/livenesspb/liveness.proto";
import "kv/kvserver/kvserverpb/state.proto";
import "etcd/raft/v3/raftpb/raft.proto";
import "gogoproto/gogo.proto";
import "util/tracing/tracingpb/recorded_span.proto";

// RaftHeartbeat is a request that contains the barebones information for a
// raftpb.MsgHeartbeat raftpb.Message. RaftHeartbeats are coalesced and sent
Expand Down Expand Up @@ -214,6 +216,47 @@ message SnapshotResponse {
reserved 3;
}

// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
message DelegateSnapshotRequest {
message Header {

uint64 range_id = 1 [(gogoproto.customname) = "RangeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];

// The replica that delegates the snapshot request, in most cases the leaseholder.
// The snapshot request should originate from the coordinator.
roachpb.ReplicaDescriptor coordinator_replica = 2 [(gogoproto.nullable) = false];

// The replica receiving the snapshot.
roachpb.ReplicaDescriptor recipient_replica = 3 [(gogoproto.nullable) = false];

// The priority of the snapshot.
SnapshotRequest.Priority priority = 4;

// The type of the snapshot.
SnapshotRequest.Type type = 6;

// The Raft term of the coordinator (in most cases the leaseholder) replica.
// The term is used during snapshot receiving to reject messages from an older term.
uint64 term = 8;
}

Header header = 1;

// The replica selected to act as the snapshot sender.
roachpb.ReplicaDescriptor delegated_sender = 2 [(gogoproto.nullable) = false];

// The truncated state of the Raft log on the coordinator replica.
roachpb.RaftTruncatedState truncated_state = 3;
}

message DelegateSnapshotResponse {
SnapshotResponse snapResponse = 1;
// collected_spans stores trace spans recorded during the execution of this
// request.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 2 [(gogoproto.nullable) = false];
}

// ConfChangeContext is encoded in the raftpb.ConfChange.Context field.
message ConfChangeContext {
string command_id = 1 [(gogoproto.customname) = "CommandID"];
Expand Down
115 changes: 113 additions & 2 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ type SnapshotResponseStream interface {
Recv() (*kvserverpb.SnapshotRequest, error)
}

// DelegateSnapshotResponseStream is the subset of the
// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses.
type DelegateSnapshotResponseStream interface {
Send(request *kvserverpb.DelegateSnapshotResponse) error
Recv() (*kvserverpb.DelegateSnapshotRequest, error)
}

// RaftMessageHandler is the interface that must be implemented by
// arguments to RaftTransport.Listen.
type RaftMessageHandler interface {
Expand All @@ -122,7 +129,20 @@ type RaftMessageHandler interface {

// HandleSnapshot is called for each new incoming snapshot stream, after
// parsing the initial SnapshotRequest_Header on the stream.
HandleSnapshot(ctx context.Context, header *kvserverpb.SnapshotRequest_Header, respStream SnapshotResponseStream) error
HandleSnapshot(
ctx context.Context,
header *kvserverpb.SnapshotRequest_Header,
respStream SnapshotResponseStream,
) error

// HandleDelegatedSnapshot is called for each incoming delegated snapshot
// request.
HandleDelegatedSnapshot(
ctx context.Context,
req *kvserverpb.DelegateSnapshotRequest,
stream DelegateSnapshotResponseStream,
span *tracing.Span,
) error
}

type raftTransportStats struct {
Expand Down Expand Up @@ -407,6 +427,72 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
}
}

// DelegateRaftSnapshot handles incoming delegated snapshot requests and passes
// the request to pass off to the sender store. Errors during the snapshots
// process are sent back as a response.
func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error {
errCh := make(chan error, 1)
taskCtx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context())
remoteParent, err := tracing.ExtractSpanMetaFromGRPCCtx(taskCtx, t.Tracer)
if err != nil {
log.Warningf(taskCtx, "error extracting tracing info from gRPC: %s", err)
}
taskCtx, span := t.Tracer.StartSpanCtx(
taskCtx, tracing.BatchMethodName,
tracing.WithRemoteParentFromSpanMeta(remoteParent),
tracing.WithServerSpanKind,
)
defer cancel()
if err := t.stopper.RunAsyncTaskEx(
taskCtx,
stop.TaskOpts{
TaskName: "storage.RaftTransport: processing snapshot delegation",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
errCh <- func() error {
req, err := stream.Recv()
if err != nil {
return err
}
// Check to ensure the header is valid.
if req.Header == nil {
return stream.Send(
&kvserverpb.DelegateSnapshotResponse{
SnapResponse: &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: "client error: no header in first delegated snapshot request message",
},
},
)
}
// Get the handler of the sender store.
handler, ok := t.getHandler(req.DelegatedSender.StoreID)
if !ok {
log.Warningf(
ctx,
"unable to accept Raft message: %+v: no handler registered for"+
" the sender store"+" %+v",
req.Header.CoordinatorReplica.StoreID,
req.DelegatedSender.StoreID,
)
return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID)
}

// Pass off the snapshot request to the sender store.
return handler.HandleDelegatedSnapshot(ctx, req, stream, span)
}()
},
); err != nil {
return err
}
select {
case <-t.stopper.ShouldQuiesce():
return nil
case err := <-errCh:
return err
}
}

// RaftSnapshot handles incoming streaming snapshot requests.
func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error {
errCh := make(chan error, 1)
Expand All @@ -415,7 +501,7 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error
if err := t.stopper.RunAsyncTaskEx(
taskCtx,
stop.TaskOpts{
TaskName: "storage.RaftTransport: processing snapshot",
TaskName: "storage.RaftTransport: processing snapshot reception",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
errCh <- func() error {
Expand Down Expand Up @@ -709,3 +795,28 @@ func (t *RaftTransport) SendSnapshot(
}()
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, bytesSentCounter)
}

// DelegateSnapshot creates a rpc stream between the leaseholder and the
// new designated sender for delegated snapshot requests.
func (t *RaftTransport) DelegateSnapshot(
ctx context.Context, storePool *StorePool, req *kvserverpb.DelegateSnapshotRequest,
) error {
nodeID := req.DelegatedSender.NodeID
conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
}
client := NewMultiRaftClient(conn)

// Creates a rpc stream between the leaseholder and sender.
stream, err := client.DelegateRaftSnapshot(ctx)
if err != nil {
return err
}
defer func() {
if err := stream.CloseSend(); err != nil {
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()
return delegateSnapshot(ctx, stream, req)
}
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func (s channelServer) HandleSnapshot(
panic("unexpected HandleSnapshot")
}

func (s channelServer) HandleDelegatedSnapshot(
ctx context.Context,
req *kvserverpb.DelegateSnapshotRequest,
stream kvserver.DelegateSnapshotResponseStream,
span *tracing.Span,
) error {
panic("unimplemented")
}

// raftTransportTestContext contains objects needed to test RaftTransport.
// Typical usage will add multiple nodes with AddNode, attach channels
// to at least one store with ListenStore, and send messages with Send.
Expand Down
Loading

0 comments on commit 92572ba

Please sign in to comment.