diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index f65dd9a9c596..2225938169b8 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -12,6 +12,8 @@ go_library( "consistency_queue.go", "debug_print.go", "doc.go", + "flow_control_raft_transport.go", + "flow_control_stores.go", "lease_history.go", "markers.go", "merge_queue.go", @@ -136,6 +138,9 @@ go_library( "//pkg/kv/kvserver/idalloc", "//pkg/kv/kvserver/intentresolver", "//pkg/kv/kvserver/kvadmission", + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/kvstorage", @@ -264,6 +269,7 @@ go_test( "consistency_queue_test.go", "debug_print_test.go", "errors_test.go", + "flow_control_raft_transport_test.go", "gossip_test.go", "helpers_test.go", "intent_resolver_integration_test.go", @@ -378,6 +384,10 @@ go_test( "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/intentresolver", + "//pkg/kv/kvserver/kvadmission", + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/kvstorage", diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index cd74c2fcc228..b8d70ba85d1d 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" @@ -2474,6 +2475,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) { nodedialer.New(tc.Servers[0].RPCContext(), gossip.AddressResolver(tc.Servers[0].Gossip())), nil, /* grpcServer */ tc.Servers[0].Stopper(), + kvflowdispatch.NewDummyDispatch(), + kvserver.NoopStoresFlowControlIntegration{}, + kvserver.NoopRaftTransportDisconnectListener{}, + nil, /* knobs */ ) errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1)) transport.Listen(store0.StoreID(), errChan) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 2e8b1aba3595..72fdbe5b30b1 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -3199,6 +3200,10 @@ func TestReplicaGCRace(t *testing.T) { nodedialer.New(tc.Servers[0].RPCContext(), gossip.AddressResolver(fromStore.Gossip())), nil, /* grpcServer */ tc.Servers[0].Stopper(), + kvflowdispatch.NewDummyDispatch(), + kvserver.NoopStoresFlowControlIntegration{}, + kvserver.NoopRaftTransportDisconnectListener{}, + nil, /* knobs */ ) errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1)) fromTransport.Listen(fromStore.StoreID(), errChan) @@ -3698,6 +3703,10 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) { gossip.AddressResolver(tc.GetFirstStoreFromServer(t, 0).Gossip())), nil, /* grpcServer */ tc.Servers[0].Stopper(), + kvflowdispatch.NewDummyDispatch(), + kvserver.NoopStoresFlowControlIntegration{}, + kvserver.NoopRaftTransportDisconnectListener{}, + nil, /* knobs */ ) errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1)) transport0.Listen(target0.StoreID, errChan) diff --git a/pkg/kv/kvserver/flow_control_raft_transport.go b/pkg/kv/kvserver/flow_control_raft_transport.go new file mode 100644 index 000000000000..f4535aa10f66 --- /dev/null +++ b/pkg/kv/kvserver/flow_control_raft_transport.go @@ -0,0 +1,146 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" +) + +// raftTransportForFlowControl abstracts the node-level raft transport, and is +// used by the canonical replicaFlowControlIntegration implementation. It +// exposes the set of (remote) stores the raft transport is connected to. If the +// underlying gRPC streams break and don't reconnect, this indicates as much. +// Ditto if they're reconnected to. Also see RaftTransportDisconnectListener, +// which is used to observe every instance of gRPC streams breaking. +type raftTransportForFlowControl interface { + isConnectedTo(storeID roachpb.StoreID) bool +} + +var _ raftTransportForFlowControl = &RaftTransport{} + +// isConnectedTo implements the raftTransportForFlowControl interface. +func (r *RaftTransport) isConnectedTo(storeID roachpb.StoreID) bool { + r.kvflowControl.mu.RLock() + defer r.kvflowControl.mu.RUnlock() + return r.kvflowControl.mu.connectionTracker.isStoreConnected(storeID) +} + +// RaftTransportDisconnectListener observes every instance of the raft +// transport disconnecting replication traffic to the given (remote) stores. +type RaftTransportDisconnectListener interface { + OnRaftTransportDisconnected(context.Context, ...roachpb.StoreID) +} + +// connectionTrackerForFlowControl tracks the set of client-side stores and +// server-side nodes the raft transport is connected to. The "client" and +// "server" refer to the client and server side of the RaftTransport stream (see +// flow_control_integration.go). Client-side stores return tokens, it's where +// work gets admitted. Server-side nodes are where tokens were originally +// deducted. +type connectionTrackerForFlowControl struct { + stores map[roachpb.StoreID]struct{} + nodes map[roachpb.NodeID]map[rpc.ConnectionClass]struct{} +} + +func newConnectionTrackerForFlowControl() *connectionTrackerForFlowControl { + return &connectionTrackerForFlowControl{ + stores: make(map[roachpb.StoreID]struct{}), + nodes: make(map[roachpb.NodeID]map[rpc.ConnectionClass]struct{}), + } +} + +// isStoreConnected returns whether we're connected to the given (client-side) +// store. +func (c *connectionTrackerForFlowControl) isStoreConnected(storeID roachpb.StoreID) bool { + _, found := c.stores[storeID] + return found +} + +// markStoresConnected is used to inform the tracker that we've received +// raft messages from nodes with the given set of stores. +func (c *connectionTrackerForFlowControl) markStoresConnected(storeIDs []roachpb.StoreID) { + for _, storeID := range storeIDs { + c.stores[storeID] = struct{}{} + } +} + +// markStoresDisconnected marks the given set of stores as disconnected. +func (c *connectionTrackerForFlowControl) markStoresDisconnected(storeIDs []roachpb.StoreID) { + for _, storeID := range storeIDs { + delete(c.stores, storeID) + } +} + +// isNodeConnected returns whether we're connected to the given (server-side) +// node, independent of the specific RPC connection class. +func (q *connectionTrackerForFlowControl) isNodeConnected(nodeID roachpb.NodeID) bool { + _, found := q.nodes[nodeID] + return found +} + +// markNodeConnected informs the tracker that we're connected to the given +// node using the given RPC connection class. +func (q *connectionTrackerForFlowControl) markNodeConnected( + nodeID roachpb.NodeID, class rpc.ConnectionClass, +) { + if len(q.nodes[nodeID]) == 0 { + q.nodes[nodeID] = map[rpc.ConnectionClass]struct{}{} + } + q.nodes[nodeID][class] = struct{}{} +} + +// markNodeDisconnected informs the tracker that a previous connection to +// the given node along the given RPC connection class is now broken. +func (q *connectionTrackerForFlowControl) markNodeDisconnected( + nodeID roachpb.NodeID, class rpc.ConnectionClass, +) { + delete(q.nodes[nodeID], class) + if len(q.nodes[nodeID]) == 0 { + delete(q.nodes, nodeID) + } +} + +func (c *connectionTrackerForFlowControl) testingPrint() string { + storeIDs := make([]roachpb.StoreID, 0, len(c.stores)) + nodeIDs := make([]roachpb.NodeID, 0, len(c.nodes)) + for storeID := range c.stores { + storeIDs = append(storeIDs, storeID) + } + for nodeID := range c.nodes { + nodeIDs = append(nodeIDs, nodeID) + } + + var buf strings.Builder + sort.Sort(roachpb.StoreIDSlice(storeIDs)) + sort.Sort(roachpb.NodeIDSlice(nodeIDs)) + buf.WriteString(fmt.Sprintf("connected-stores (server POV): %s\n", roachpb.StoreIDSlice(storeIDs))) + buf.WriteString(fmt.Sprintf("connected-nodes (client POV): %s\n", roachpb.NodeIDSlice(nodeIDs))) + return buf.String() +} + +// NoopRaftTransportDisconnectListener is a no-op implementation of the +// RaftTransportDisconnectListener interface. +type NoopRaftTransportDisconnectListener struct{} + +var _ RaftTransportDisconnectListener = NoopRaftTransportDisconnectListener{} + +// OnRaftTransportDisconnected implements the RaftTransportDisconnectListener +// interface. +func (n NoopRaftTransportDisconnectListener) OnRaftTransportDisconnected( + ctx context.Context, storeIDs ...roachpb.StoreID, +) { +} diff --git a/pkg/kv/kvserver/flow_control_raft_transport_test.go b/pkg/kv/kvserver/flow_control_raft_transport_test.go new file mode 100644 index 000000000000..c06720d372ca --- /dev/null +++ b/pkg/kv/kvserver/flow_control_raft_transport_test.go @@ -0,0 +1,508 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3/raftpb" +) + +// TestFlowControlRaftTransport tests the integration of flow tokens and the +// RaftTransport. It offers the following commands: +// +// - "init" +// Initializes the raft transport test harness. +// +// - "add" node=n store=s +// Add a transport link on the given node, and registers a given store on +// said node. +// +// - "send" range=r from=n/s/ to=n/s/ commit= +// Send a raft message with the given commit index for the named range, from +// the given node+store+replica to the given node+store+replica. +// +// - "dispatch" from=n +// node=n store=s range=r pri= up-to-log-position=/ +// ... +// Dispatch flow tokens (identified by a log prefix) from a given node to +// the named node+store, for the specific range and priority. +// +// - "client-mark-idle" from=n to=n +// Mark the transport connection between two nodes as idle. This is +// equivalent to the underlying RaftTransport stream being torn down. +// Specifically, the client initiated stream from the first node towards the +// second. So this is a unidirectional breakage. Of course the server side +// can observe it, and various tests showcase this fact. +// +// - "fallback-dispatch" from=n +// Dispatch all pending flow tokens from the named node. This is equivalent +// to the periodic, fallback dispatch the transport does to guarantee +// delivery even if streams are idle or no messages are being sent to +// piggyback on top of. +// +// - "drop-disconnected-tokens" from=n +// Drop tokens that are pending delivery to nodes we're disconnected from. +// This is equivalent to the periodic memory reclamation the transport does +// to ensure we don't accumulate dispatches unboundedly. +// +// - "set-initial-store-ids" from=n stores=s[,s]* +// Inform the raft transport on the given node of its initial set of store +// IDs. This is unrelated to the "add" command above - it's only used to +// test whether we ship over the set of right store IDs over the raft +// transport, as needed by the flow token protocol. +// +// - "set-additional-store-ids" from=n stores=s[,s]* +// Inform the raft transport on the given node of its additional set of +// store IDs. This is unrelated to the "add" command above - it's only used +// to test whether we ship over the set of right store IDs over the raft +// transport, as needed by the flow token protocol. +// +// - "connection-tracker" from=n +// Print out the exact nodes and stores the RaftTransport on the given node +// is connected to, as a server and as a client. +// +// - "disconnect-listener" from=n +// Print out the exact points at which the RaftTransportDisconnectListener +// was invoked by the RaftTransport on the given node. +// +// - "pending-dispatch" from=n to=n +// List the pending dispatches from one node to another. +// +// - "metrics" +// Print out transport metrics. +func TestFlowControlRaftTransport(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + kvadmission.FlowTokenDispatchInterval.Override(ctx, &st.SV, time.Hour) // we control this manually below + + datadriven.Walk(t, datapathutils.TestDataPath(t, "flow_control_raft_transport"), + func(t *testing.T, path string) { + var rttc *raftTransportTestContext + defer func() { + if rttc != nil { + rttc.Stop() + } + }() + + controlM := make(map[roachpb.NodeID]*transportControl) + datadriven.RunTest(t, path, + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + if rttc != nil { + rttc.Stop() + } + rttc = newRaftTransportTestContext(t, st) + return "" + + case "add": + nodeID := parseNodeID(t, d, "node") + storeID := parseStoreID(t, d, "store") + workerTeardownCh := make(chan roachpb.NodeID, 1) + controlM[nodeID] = &transportControl{ + dispatch: kvflowdispatch.New(metric.NewRegistry(), nil, &base.NodeIDContainer{}), + disconnectListener: &mockRaftTransportDisconnectListener{buf: &builderWithMu{}}, + workerTeardownCh: workerTeardownCh, + } + controlM[nodeID].knobs = &kvserver.RaftTransportTestingKnobs{ + TriggerFallbackDispatchCh: make(chan time.Time), + OnFallbackDispatch: func() { + controlM[nodeID].triggeredFallbackDispatch.Set(true) + }, + MarkSendQueueAsIdleCh: make(chan roachpb.NodeID), + OnWorkerTeardown: func(nodeID roachpb.NodeID) { + workerTeardownCh <- nodeID + }, + OnServerStreamDisconnected: func() { + controlM[nodeID].serverStreamDisconnected.Set(true) + }, + } + + transport, addr := rttc.AddNodeWithoutGossip( + nodeID, + util.TestAddr, + rttc.stopper, + controlM[nodeID].dispatch, + kvserver.NoopStoresFlowControlIntegration{}, + controlM[nodeID].disconnectListener, + controlM[nodeID].knobs, + ) + rttc.GossipNode(nodeID, addr) + controlM[nodeID].chanServer = rttc.ListenStore(nodeID, storeID) + require.NoError(t, transport.Start(ctx)) + return "" + + case "send": + // Parse range=r. + var arg string + d.ScanArgs(t, "range", &arg) + ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r")) + require.NoError(t, err) + rangeID := roachpb.RangeID(ri) + + from := parseReplicaDescriptor(t, d, "from") // parse from=n/s/ + to := parseReplicaDescriptor(t, d, "to") // parse to=n/s/ + + // Parse commit=. + d.ScanArgs(t, "commit", &arg) + c, err := strconv.Atoi(arg) + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + if !rttc.Send(from, to, rangeID, raftpb.Message{Commit: uint64(c)}) { + breaker, ok := rttc.transports[from.NodeID].GetCircuitBreaker(to.NodeID, rpc.DefaultClass) + require.True(t, ok) + breaker.Reset() + } + select { + case req := <-controlM[to.NodeID].chanServer.ch: + if req.Message.Commit == uint64(c) { + return nil + } + case <-time.After(time.Second): + } + return errors.Errorf("expected message commit=%d", c) + }) + return "" + + case "dispatch": // cargo-culted from kvflowdispatch.TestDispatch + // Parse node=n. + fromNodeID := parseNodeID(t, d, "from") + control, found := controlM[fromNodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", fromNodeID) + + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + require.Len(t, parts, 5, "expected form 'node=n store=s range=r pri= up-to-log-position=/'") + var ( + entries kvflowcontrolpb.AdmittedRaftLogEntries + nodeID roachpb.NodeID + ) + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + inner := strings.Split(parts[i], "=") + require.Len(t, inner, 2) + arg := strings.TrimSpace(inner[1]) + + switch { + case strings.HasPrefix(parts[i], "node="): + // Parse node=n. + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + nodeID = roachpb.NodeID(ni) + + case strings.HasPrefix(parts[i], "store="): + // Parse store=s. + si, err := strconv.Atoi(strings.TrimPrefix(arg, "s")) + require.NoError(t, err) + entries.StoreID = roachpb.StoreID(si) + + case strings.HasPrefix(parts[i], "range="): + // Parse range=r. + ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r")) + require.NoError(t, err) + entries.RangeID = roachpb.RangeID(ri) + + case strings.HasPrefix(parts[i], "pri="): + // Parse pri=. + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] + require.True(t, found) + entries.AdmissionPriority = int32(pri) + + case strings.HasPrefix(parts[i], "up-to-log-position="): + // Parse up-to-log-position=/. + entries.UpToRaftLogPosition = parseLogPosition(t, arg) + + default: + t.Fatalf("unrecognized prefix: %s", parts[i]) + } + } + control.dispatch.Dispatch(ctx, nodeID, entries) + } + return "" + + case "client-mark-idle": + fromNodeID := parseNodeID(t, d, "from") + toNodeID := parseNodeID(t, d, "to") + + control, found := controlM[fromNodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", fromNodeID) + select { + case control.knobs.MarkSendQueueAsIdleCh <- toNodeID: + case <-time.After(time.Second): + return "timed out" + } + select { + case gotNodeID := <-control.workerTeardownCh: + require.Equal(t, gotNodeID, toNodeID) + case <-time.After(time.Second): + return "timed out" + } + + toControl, found := controlM[toNodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", toNodeID) + testutils.SucceedsSoon(t, func() error { + if toControl.serverStreamDisconnected.Get() { + return nil + } + return errors.Errorf("waiting for server-side stream to disconnect") + }) + toControl.serverStreamDisconnected.Set(false) // reset + return "" + + case "drop-disconnected-tokens": + nodeID := parseNodeID(t, d, "from") + transport, found := rttc.transports[nodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", nodeID) + transport.TestingDropFlowTokensForDisconnectedNodes() + return "" + + case "set-initial-store-ids": + nodeID := parseNodeID(t, d, "from") + transport, found := rttc.transports[nodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", nodeID) + transport.SetInitialStoreIDs(parseStoreIDs(t, d, "stores")) + return "" + + case "set-additional-store-ids": + nodeID := parseNodeID(t, d, "from") + transport, found := rttc.transports[nodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", nodeID) + transport.SetAdditionalStoreIDs(parseStoreIDs(t, d, "stores")) + return "" + + case "connection-tracker": + fromNodeID := parseNodeID(t, d, "from") + transport, found := rttc.transports[fromNodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", fromNodeID) + return transport.TestingPrintFlowControlConnectionTracker() + + case "disconnect-listener": + fromNodeID := parseNodeID(t, d, "from") + control, found := controlM[fromNodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", fromNodeID) + return control.disconnectListener.buf.stringAndReset() + + case "fallback-dispatch": + fromNodeID := parseNodeID(t, d, "from") + control, found := controlM[fromNodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", fromNodeID) + select { + case control.knobs.TriggerFallbackDispatchCh <- time.Time{}: + case <-time.After(time.Second): + return "timed out" + } + testutils.SucceedsSoon(t, func() error { + if control.triggeredFallbackDispatch.Get() { + return nil + } + return errors.Errorf("waiting for fallback mechanism to activate") + }) + control.triggeredFallbackDispatch.Set(false) // reset + return "" + + case "pending-dispatch": // cargo-culted from kvflowdispatch.TestDispatch + fromNodeID := parseNodeID(t, d, "from") + toNodeID := parseNodeID(t, d, "to") + + control, found := controlM[fromNodeID] + require.True(t, found, "uninitialized node, did you use 'add node=n%s'?", fromNodeID) + + var buf strings.Builder + es := control.dispatch.PendingDispatchFor(toNodeID) + sort.Slice(es, func(i, j int) bool { // for determinism + if es[i].RangeID != es[j].RangeID { + return es[i].RangeID < es[j].RangeID + } + if es[i].StoreID != es[j].StoreID { + return es[i].StoreID < es[j].StoreID + } + if es[i].AdmissionPriority != es[j].AdmissionPriority { + return es[i].AdmissionPriority < es[j].AdmissionPriority + } + return es[i].UpToRaftLogPosition.Less(es[j].UpToRaftLogPosition) + }) + for i, entries := range es { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString( + fmt.Sprintf("range=r%d pri=%s store=s%d up-to-log-position=%s", + entries.RangeID, + admissionpb.WorkPriority(entries.AdmissionPriority), + entries.StoreID, + entries.UpToRaftLogPosition, + ), + ) + control.dispatch.Dispatch(ctx, toNodeID, entries) // re-add to dispatch + } + return buf.String() + + case "metrics": + var buf strings.Builder + var nodeIDs roachpb.NodeIDSlice + for nodeID := range rttc.transports { + nodeIDs = append(nodeIDs, nodeID) + } + sort.Sort(nodeIDs) + + for _, nodeID := range nodeIDs { + transport := rttc.transports[nodeID] + buf.WriteString(fmt.Sprintf("node=n%s: dispatches-dropped=%d\n", + nodeID, + transport.Metrics().FlowTokenDispatchesDropped.Count(), + )) + } + return buf.String() + + default: + return "unknown command" + } + }) + }, + ) +} + +type transportControl struct { + dispatch *kvflowdispatch.Dispatch + disconnectListener *mockRaftTransportDisconnectListener + knobs *kvserver.RaftTransportTestingKnobs + serverStreamDisconnected syncutil.AtomicBool + triggeredFallbackDispatch syncutil.AtomicBool + workerTeardownCh chan roachpb.NodeID + chanServer channelServer +} + +func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPosition { + inner := strings.Split(input, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + return kvflowcontrolpb.RaftLogPosition{ + Term: uint64(term), + Index: uint64(index), + } +} + +func parseNodeID(t *testing.T, d *datadriven.TestData, key string) roachpb.NodeID { + var arg string + d.ScanArgs(t, key, &arg) + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + return roachpb.NodeID(ni) +} + +func parseStoreID(t *testing.T, d *datadriven.TestData, key string) roachpb.StoreID { + var arg string + d.ScanArgs(t, key, &arg) + si, err := strconv.Atoi(strings.TrimPrefix(arg, "s")) + require.NoError(t, err) + return roachpb.StoreID(si) +} + +func parseStoreIDs(t *testing.T, d *datadriven.TestData, key string) []roachpb.StoreID { + var arg string + d.ScanArgs(t, key, &arg) + var storeIDs []roachpb.StoreID + for _, part := range strings.Split(arg, ",") { + si, err := strconv.Atoi(strings.TrimPrefix(part, "s")) + require.NoError(t, err) + storeIDs = append(storeIDs, roachpb.StoreID(si)) + } + return storeIDs +} + +func parseReplicaDescriptor( + t *testing.T, d *datadriven.TestData, key string, +) roachpb.ReplicaDescriptor { + var arg string + var desc roachpb.ReplicaDescriptor + d.ScanArgs(t, key, &arg) + parts := strings.Split(arg, "/") + require.Len(t, parts, 3) + ni, err := strconv.Atoi(strings.TrimPrefix(parts[0], "n")) + require.NoError(t, err) + store, err := strconv.Atoi(strings.TrimPrefix(parts[1], "s")) + require.NoError(t, err) + repl, err := strconv.Atoi(parts[2]) + require.NoError(t, err) + + desc.NodeID = roachpb.NodeID(ni) + desc.StoreID = roachpb.StoreID(store) + desc.ReplicaID = roachpb.ReplicaID(repl) + return desc +} + +type builderWithMu struct { + mu syncutil.Mutex + buf strings.Builder +} + +func (b *builderWithMu) printf(format string, a ...interface{}) { + b.mu.Lock() + defer b.mu.Unlock() + if b.buf.Len() > 0 { + fmt.Fprintf(&b.buf, "\n") + } + fmt.Fprintf(&b.buf, format, a...) +} + +func (b *builderWithMu) stringAndReset() string { + b.mu.Lock() + defer b.mu.Unlock() + str := b.buf.String() + b.buf.Reset() + return str +} + +type mockRaftTransportDisconnectListener struct { + buf *builderWithMu +} + +var _ kvserver.RaftTransportDisconnectListener = &mockRaftTransportDisconnectListener{} + +// OnRaftTransportDisconnected implements the kvserver.StoresForFlowControl interface. +func (m *mockRaftTransportDisconnectListener) OnRaftTransportDisconnected( + ctx context.Context, storeIDs ...roachpb.StoreID, +) { + m.buf.printf("disconnected-from: %s", roachpb.StoreIDSlice(storeIDs)) +} diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go new file mode 100644 index 000000000000..fe0d14ad69ac --- /dev/null +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -0,0 +1,155 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// StoresForFlowControl is used to integrate with replication flow control. It +// exposes the underlying kvflowcontrol.Handles and is informed of (remote) +// stores we're no longer connected via the raft transport. +type StoresForFlowControl interface { + kvflowcontrol.Handles + RaftTransportDisconnectListener +} + +// storesForFlowControl is a concrete implementation of the +// StoresForFlowControl interface, backed by a set of Stores. +type storesForFlowControl Stores + +var _ StoresForFlowControl = &storesForFlowControl{} + +// MakeStoresForFlowControl returns the canonical StoresForFlowControl +// implementation. +func MakeStoresForFlowControl(stores *Stores) StoresForFlowControl { + return (*storesForFlowControl)(stores) +} + +// Lookup is part of the StoresForFlowControl interface. +func (sh *storesForFlowControl) Lookup( + rangeID roachpb.RangeID, +) (handle kvflowcontrol.Handle, found bool) { + ls := (*Stores)(sh) + if err := ls.VisitStores(func(s *Store) error { + if h, ok := makeStoreForFlowControl(s).Lookup(rangeID); ok { + handle = h + found = true + } + return nil + }); err != nil { + ctx := ls.AnnotateCtx(context.Background()) + log.Errorf(ctx, "unexpected error: %s", err) + return nil, false + } + return handle, found +} + +// ResetStreams is part of the StoresForFlowControl interface. +func (sh *storesForFlowControl) ResetStreams(ctx context.Context) { + ls := (*Stores)(sh) + if err := ls.VisitStores(func(s *Store) error { + makeStoreForFlowControl(s).ResetStreams(ctx) + return nil + }); err != nil { + ctx = ls.AnnotateCtx(ctx) + log.Errorf(ctx, "unexpected error: %s", err) + } +} + +// OnRaftTransportDisconnected is part of the StoresForFlowControl +// interface. +func (sh *storesForFlowControl) OnRaftTransportDisconnected( + ctx context.Context, storeIDs ...roachpb.StoreID, +) { + ls := (*Stores)(sh) + if err := ls.VisitStores(func(s *Store) error { + makeStoreForFlowControl(s).OnRaftTransportDisconnected(ctx, storeIDs...) + return nil + }); err != nil { + ctx := ls.AnnotateCtx(context.Background()) + log.Errorf(ctx, "unexpected error: %s", err) + } +} + +// storeForFlowControl is a concrete implementation of the +// StoresForFlowControl interface, backed by a single Store. +type storeForFlowControl Store + +var _ StoresForFlowControl = &storeForFlowControl{} + +// makeStoreForFlowControl returns a new storeForFlowControl instance. +func makeStoreForFlowControl(store *Store) *storeForFlowControl { + return (*storeForFlowControl)(store) +} + +// Lookup is part of the StoresForFlowControl interface. +func (sh *storeForFlowControl) Lookup( + rangeID roachpb.RangeID, +) (_ kvflowcontrol.Handle, found bool) { + s := (*Store)(sh) + repl := s.GetReplicaIfExists(rangeID) + if repl == nil { + return nil, false + } + return nil, false // TODO(irfansharif): Fill this in. +} + +// ResetStreams is part of the StoresForFlowControl interface. +func (sh *storeForFlowControl) ResetStreams(ctx context.Context) { + s := (*Store)(sh) + s.VisitReplicas(func(r *Replica) (wantMore bool) { + // TODO(irfansharif): Fill this in. + return true + }) +} + +// OnRaftTransportDisconnected is part of the StoresForFlowControl +// interface. +func (sh *storeForFlowControl) OnRaftTransportDisconnected( + ctx context.Context, storeIDs ...roachpb.StoreID, +) { + s := (*Store)(sh) + s.mu.replicasByRangeID.Range(func(replica *Replica) { + // TODO(irfansharif): Fill this in. + }) +} + +// NoopStoresFlowControlIntegration is a no-op implementation of the +// StoresForFlowControl interface. +type NoopStoresFlowControlIntegration struct{} + +var _ StoresForFlowControl = NoopStoresFlowControlIntegration{} + +// Lookup is part of the StoresForFlowControl interface. +func (l NoopStoresFlowControlIntegration) Lookup(roachpb.RangeID) (kvflowcontrol.Handle, bool) { + return nil, false +} + +// ResetStreams is part of the StoresForFlowControl interface. +func (l NoopStoresFlowControlIntegration) ResetStreams(context.Context) { +} + +// Inspect is part of the StoresForFlowControl interface. +func (l NoopStoresFlowControlIntegration) Inspect() []roachpb.RangeID { + return nil +} + +// OnRaftTransportDisconnected is part of the RaftTransportDisconnectListener +// interface. +func (NoopStoresFlowControlIntegration) OnRaftTransportDisconnected( + context.Context, ...roachpb.StoreID, +) { +} diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index c0446fd040ed..175b864bfe01 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -8,6 +8,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvpb", + "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/raftlog", "//pkg/roachpb", "//pkg/settings", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index def2d71c9507..0ebb62537f20 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -92,6 +93,40 @@ var ProvisionedBandwidth = settings.RegisterByteSizeSetting( "for each store. It can be over-ridden on a per-store basis using the --store flag", 0).WithPublic() +// FlowTokenDropInterval determines the frequency at which we check for pending +// flow token dispatches to nodes we're no longer connected to, in order to drop +// them. +var FlowTokenDropInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.flow_token.drop_interval", + "the interval at which the raft transport checks for pending flow token dispatches "+ + "to nodes we're no longer connected to, in order to drop them; set to 0 to disable the mechanism", + 30*time.Second, + settings.NonNegativeDuration, +) + +// FlowTokenDispatchInterval determines the frequency at which we check for +// pending flow token dispatches from idle connections, in order to deliver +// them. +var FlowTokenDispatchInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.flow_token.dispatch_interval", + "the interval at which the raft transport checks for pending flow token dispatches "+ + "from idle connections and delivers them", + time.Second, + settings.PositiveDuration, settings.NonNegativeDurationWithMaximum(time.Minute), +) + +// ConnectedStoreExpiration controls how long the RaftTransport layers considers +// a stream connected without it having observed any messages from it. +var ConnectedStoreExpiration = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.raft_transport.connected_store_expiration", + "the interval at which the raft transport prunes its set of connected stores; set to 0 to disable the mechanism", + 5*time.Minute, + settings.NonNegativeDuration, +) + // Controller provides admission control for the KV layer. type Controller interface { // AdmitKVWork must be called before performing KV work. @@ -430,6 +465,20 @@ func (n *controllerImpl) AdmitRaftEntry( return } + if log.V(1) { + log.Infof(ctx, "decoded raft admission meta below-raft: pri=%s create-time=%d proposer=n%s receiver=[n?,s%s] tenant=t%d tokensā‰ˆ%d sideloaded=%t raft-entry=%d/%d", + admissionpb.WorkPriority(meta.AdmissionPriority), + meta.AdmissionCreateTime, + meta.AdmissionOriginNode, + storeID, + tenantID.ToUint64(), + kvflowcontrol.Tokens(len(entry.Data)), + typ.IsSideloaded(), + entry.Term, + entry.Index, + ) + } + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) if storeAdmissionQ == nil { log.Errorf(ctx, "unable to find queue for store: %s", storeID) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index de6641205a58..1e95f1277c8a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -144,6 +144,13 @@ type Handle interface { Close(context.Context) } +// Handles represent a set of flow control handles. Note that handles are +// typically held on replicas initiating replication traffic, so on a given node +// they're uniquely identified by their range ID. +type Handles interface { + Lookup(roachpb.RangeID) (Handle, bool) +} + // Dispatch is used (i) to dispatch information about admitted raft log entries // to specific nodes, and (ii) to read pending dispatches. type Dispatch interface { @@ -155,7 +162,7 @@ type Dispatch interface { // entries to specific nodes (typically where said entries originated, where // flow tokens were deducted and waiting to be returned). type DispatchWriter interface { - Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) + Dispatch(context.Context, roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) } // DispatchReader is used to read pending dispatches. It's used in the raft diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel index a286451bb7e5..3d45b91914a8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel @@ -3,14 +3,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "kvflowdispatch", - srcs = ["kvflowdispatch.go"], + srcs = [ + "dummy.go", + "kvflowdispatch.go", + "kvflowdispatch_metrics.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/roachpb", "//pkg/util/admission/admissionpb", + "//pkg/util/log", + "//pkg/util/metric", "//pkg/util/syncutil", ], ) @@ -22,12 +29,15 @@ go_test( data = glob(["testdata/**"]), embed = [":kvflowdispatch"], deps = [ + "//pkg/base", + "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/roachpb", "//pkg/testutils/datapathutils", "//pkg/util/admission/admissionpb", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/metric", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_stretchr_testify//require", ], diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/dummy.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/dummy.go new file mode 100644 index 000000000000..a41ed702ad78 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/dummy.go @@ -0,0 +1,40 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowdispatch + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +type dummy struct{} + +var _ kvflowcontrol.Dispatch = &dummy{} + +// NewDummyDispatch returns a dummy implementation of kvflowcontrol.Dispatch, +// for use in tests. +func NewDummyDispatch() kvflowcontrol.Dispatch { + return &dummy{} +} + +func (d *dummy) Dispatch(context.Context, roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) { +} + +func (d *dummy) PendingDispatch() []roachpb.NodeID { + return nil +} + +func (d *dummy) PendingDispatchFor(id roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries { + return nil +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go index fddb450425cb..0a3576ceb285 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go @@ -11,10 +11,15 @@ package kvflowdispatch import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -27,6 +32,11 @@ type Dispatch struct { // outbox maintains pending dispatches on a per-node basis. outbox map[roachpb.NodeID]dispatches } + metrics *metrics + // handles is used to dispatch tokens locally. Remote token dispatches are + // driven by the RaftTransport. + handles kvflowcontrol.Handles + nodeID *base.NodeIDContainer } // dispatchKey is used to coalesce dispatches bound for a given node. If @@ -44,29 +54,77 @@ type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition var _ kvflowcontrol.Dispatch = &Dispatch{} // New constructs a new Dispatch. -func New() *Dispatch { - d := &Dispatch{} +func New( + registry *metric.Registry, handles kvflowcontrol.Handles, nodeID *base.NodeIDContainer, +) *Dispatch { + d := &Dispatch{ + handles: handles, + nodeID: nodeID, + } d.mu.outbox = make(map[roachpb.NodeID]dispatches) + d.metrics = newMetrics() + registry.AddMetricStruct(d.metrics) return d } // Dispatch is part of the kvflowcontrol.Dispatch interface. -func (d *Dispatch) Dispatch(nodeID roachpb.NodeID, entries kvflowcontrolpb.AdmittedRaftLogEntries) { +func (d *Dispatch) Dispatch( + ctx context.Context, nodeID roachpb.NodeID, entries kvflowcontrolpb.AdmittedRaftLogEntries, +) { + if log.V(1) { + log.Infof(ctx, "dispatching %s to n%s", entries, nodeID) + } + pri := admissionpb.WorkPriority(entries.AdmissionPriority) + wc := admissionpb.WorkClassFromPri(pri) + if nodeID == d.nodeID.Get() { // local fast-path + handle, found := d.handles.Lookup(entries.RangeID) + if found { + handle.ReturnTokensUpto( + ctx, + admissionpb.WorkPriority(entries.AdmissionPriority), + entries.UpToRaftLogPosition, kvflowcontrol.Stream{ + StoreID: entries.StoreID, + }) + } + // If we've not found the local kvflowcontrol.Handle, it's because the + // range leaseholder/leader has recently been moved elsewhere. It's ok + // to drop these tokens on the floor since we already returned it when + // moving the leaseholder/leader. + d.metrics.LocalDispatch[wc].Inc(1) + return + } + d.metrics.RemoteDispatch[wc].Inc(1) + d.mu.Lock() defer d.mu.Unlock() if _, ok := d.mu.outbox[nodeID]; !ok { d.mu.outbox[nodeID] = dispatches{} + d.metrics.PendingNodes.Inc(1) } dk := dispatchKey{ entries.RangeID, entries.StoreID, - admissionpb.WorkPriority(entries.AdmissionPriority), + pri, } + existing, found := d.mu.outbox[nodeID][dk] if !found || existing.Less(entries.UpToRaftLogPosition) { d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition + + if !found { + d.metrics.PendingDispatches[wc].Inc(1) + } else { + // We're replacing an existing dispatch with one with a higher log + // position. Increment the coalesced metric. + d.metrics.CoalescedDispatches[wc].Inc(1) + } + } + if found && !existing.Less(entries.UpToRaftLogPosition) { + // We're dropping a dispatch given we already have a pending one with a + // higher log position. Increment the coalesced metric. + d.metrics.CoalescedDispatches[wc].Inc(1) } } @@ -101,7 +159,16 @@ func (d *Dispatch) PendingDispatchFor( AdmissionPriority: int32(key.WorkPriority), UpToRaftLogPosition: dispatch, }) + wc := admissionpb.WorkClassFromPri(key.WorkPriority) + d.metrics.PendingDispatches[wc].Dec(1) } + delete(d.mu.outbox, nodeID) + d.metrics.PendingNodes.Dec(1) return entries } + +// testingMetrics returns the underlying metrics struct for testing purposes. +func (d *Dispatch) testingMetrics() *metrics { + return d.metrics +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_metrics.go new file mode 100644 index 000000000000..f5b59086043b --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_metrics.go @@ -0,0 +1,102 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowdispatch + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" +) + +var ( + pendingDispatches = metric.Metadata{ + Name: "kvadmission.flow_token_dispatch.pending_%s", + Help: "Number of pending %s flow token dispatches", + Measurement: "Dispatches", + Unit: metric.Unit_COUNT, + } + + pendingNodes = metric.Metadata{ + Name: "kvadmission.flow_token_dispatch.pending_nodes", + Help: "Number of nodes pending flow token dispatches", + Measurement: "Nodes", + Unit: metric.Unit_COUNT, + } + + localDispatches = metric.Metadata{ + Name: "kvadmission.flow_token_dispatch.local_%s", + Help: "Number of local %s flow token dispatches", + Measurement: "Dispatches", + Unit: metric.Unit_COUNT, + } + + remoteDispatches = metric.Metadata{ + Name: "kvadmission.flow_token_dispatch.remote_%s", + Help: "Number of remote %s flow token dispatches", + Measurement: "Dispatches", + Unit: metric.Unit_COUNT, + } + + coalescedDispatches = metric.Metadata{ + Name: "kvadmission.flow_token_dispatch.coalesced_%s", + Help: "Number of coalesced %s flow token dispatches (where we're informing the sender of a higher log entry being admitted)", + Measurement: "Dispatches", + Unit: metric.Unit_COUNT, + } +) + +type metrics struct { + PendingDispatches [admissionpb.NumWorkClasses]*metric.Gauge + CoalescedDispatches [admissionpb.NumWorkClasses]*metric.Counter + LocalDispatch [admissionpb.NumWorkClasses]*metric.Counter + RemoteDispatch [admissionpb.NumWorkClasses]*metric.Counter + PendingNodes *metric.Gauge +} + +var _ metric.Struct = &metrics{} + +func newMetrics() *metrics { + var m metrics + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + wc := wc // copy loop variable + m.PendingDispatches[wc] = metric.NewGauge( + annotateMetricTemplateWithWorkClass(wc, pendingDispatches), + ) + m.LocalDispatch[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, localDispatches), + ) + m.RemoteDispatch[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, remoteDispatches), + ) + m.CoalescedDispatches[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, coalescedDispatches), + ) + } + m.PendingNodes = metric.NewGauge(pendingNodes) + return &m +} + +func (m *metrics) MetricStruct() {} + +// annotateMetricTemplateWithWorkClass uses the given metric template to build +// one suitable for the specific work class. +func annotateMetricTemplateWithWorkClass( + wc admissionpb.WorkClass, tmpl metric.Metadata, +) metric.Metadata { + rv := tmpl + rv.Name = fmt.Sprintf(tmpl.Name, wc) + rv.Help = fmt.Sprintf(tmpl.Help, wc) + return rv +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 6610f902ba15..909d30e5c86b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -11,18 +11,22 @@ package kvflowdispatch import ( + "context" "fmt" "sort" "strconv" "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) @@ -31,12 +35,23 @@ func TestDispatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var dispatch *Dispatch datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - dispatch = New() + nodeIDContainer := &base.NodeIDContainer{} + if d.HasArg("node") { + // Parse node=n. + var arg string + d.ScanArgs(t, "node", &arg) + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + nodeID := roachpb.NodeID(ni) + nodeIDContainer.Set(ctx, nodeID) + } + dispatch = New(metric.NewRegistry(), dummyHandles{}, nodeIDContainer) return "" case "dispatch": @@ -89,7 +104,7 @@ func TestDispatch(t *testing.T) { t.Fatalf("unrecognized prefix: %s", parts[i]) } } - dispatch.Dispatch(nodeID, entries) + dispatch.Dispatch(ctx, nodeID, entries) } return "" @@ -143,6 +158,9 @@ func TestDispatch(t *testing.T) { } return buf.String() + case "metrics": + return printMetrics(dispatch) + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } @@ -162,3 +180,37 @@ func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPositio Index: uint64(index), } } + +func printMetrics(d *Dispatch) string { + metrics := d.testingMetrics() + var buf strings.Builder + buf.WriteString(fmt.Sprintf(`pending-nodes=%d +[regular] pending=%d coalesced=%d dispatch{local=%d remote=%d} +[elastic] pending=%d coalesced=%d dispatch{local=%d remote=%d} +`, + metrics.PendingNodes.Value(), + metrics.PendingDispatches[admissionpb.RegularWorkClass].Value(), + metrics.CoalescedDispatches[admissionpb.RegularWorkClass].Count(), + metrics.LocalDispatch[admissionpb.RegularWorkClass].Count(), + metrics.RemoteDispatch[admissionpb.RegularWorkClass].Count(), + metrics.PendingDispatches[admissionpb.ElasticWorkClass].Value(), + metrics.CoalescedDispatches[admissionpb.ElasticWorkClass].Count(), + metrics.LocalDispatch[admissionpb.ElasticWorkClass].Count(), + metrics.RemoteDispatch[admissionpb.ElasticWorkClass].Count(), + )) + return buf.String() +} + +type dummyHandles struct{} + +func (d dummyHandles) Lookup(id roachpb.RangeID) (kvflowcontrol.Handle, bool) { + return nil, false +} + +func (d dummyHandles) ResetStreams(ctx context.Context) {} + +func (d dummyHandles) Inspect() []roachpb.RangeID { + return nil +} + +var _ kvflowcontrol.Handles = dummyHandles{} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/local_remote_dispatch b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/local_remote_dispatch new file mode 100644 index 000000000000..52bcf5848fe0 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/local_remote_dispatch @@ -0,0 +1,28 @@ +# Verify that the local dispatch fast-path works as expected. + +# Initialize the dispatch on n1. +init node=n1 +---- + +# Issue dispatches to n1 and n2. +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 +node=n2 range=r2 pri=normal-pri store=s2 up-to-log-position=4/20 +---- + +# Verify that the metrics shows only 1 pending node (the remote one, n2) and +# increments dispatch{local=...} metric appopriately. +metrics +---- +pending-nodes=1 +[regular] pending=1 coalesced=0 dispatch{local=1 remote=1} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + +pending-dispatch +---- +node=n2 + +pending-dispatch-for node=n1 +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering index 947588448e74..e65891823f52 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering @@ -9,15 +9,34 @@ node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=5/20 ---- +# Verify that the metrics indicate one pending and one coalesced flow tokens +# dispatch for 4/20. +metrics +---- +pending-nodes=1 +[regular] pending=1 coalesced=1 dispatch{local=0 remote=2} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + +# Read off pending dispatchs, observing only the one with the higher log +# position. pending-dispatch-for node=n1 ---- range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=5/20 +# Do the same as above, but dispatching them out of order. The higher position +# still takes precedence. dispatch node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/20 node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/19 ---- +# Verify that the metrics increment the coalesced count for 6/19. +metrics +---- +pending-nodes=1 +[regular] pending=1 coalesced=2 dispatch{local=0 remote=4} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + pending-dispatch-for node=n1 ---- range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20 diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores index 629b4be08ca3..1902ac1f36bb 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores @@ -15,6 +15,14 @@ node=n2 range=r3 pri=normal-pri store=s3 up-to-log-position=5/21 node=n2 range=r3 pri=high-pri store=s3 up-to-log-position=5/22 ---- +# Verify that the metrics show 2 pending nodes with 4 pending flow tokens +# dispatches (nothing gets coalesced). +metrics +---- +pending-nodes=2 +[regular] pending=4 coalesced=0 dispatch{local=0 remote=4} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + pending-dispatch ---- node=n1 @@ -24,6 +32,14 @@ pending-dispatch-for node=n1 ---- range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=4/20 +# Verify that there's only 1 pending node left (n1 was cleared) and pending flow +# tokens dispatch count was reduced by 1. +metrics +---- +pending-nodes=1 +[regular] pending=3 coalesced=0 dispatch{local=0 remote=4} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + pending-dispatch ---- node=n2 @@ -37,4 +53,11 @@ range=r3 pri=high-pri store=s3 up-to-log-position=log-position=5/22 pending-dispatch ---- +# Verify no pending nodes or flow tokens dispatches. +metrics +---- +pending-nodes=0 +[regular] pending=0 coalesced=0 dispatch{local=0 remote=4} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + # vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch index c74a021ecce3..3e482c0bc681 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch @@ -8,6 +8,14 @@ dispatch node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 ---- +# Verify that the metrics show 1 pending node, with 1 pending flow tokens +# dispatch. +metrics +---- +pending-nodes=1 +[regular] pending=1 coalesced=0 dispatch{local=0 remote=1} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + pending-dispatch ---- node=n1 @@ -22,4 +30,12 @@ pending-dispatch pending-dispatch-for node=n1 ---- +# Verify that the metrics show 0 pending nodes, with 0 pending flow +# tokens dispatches. +metrics +---- +pending-nodes=0 +[regular] pending=0 coalesced=0 dispatch{local=0 remote=1} +[elastic] pending=0 coalesced=0 dispatch{local=0 remote=0} + # vim:ft=sh diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 37104d621bfa..ac8030298d90 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -100,6 +100,35 @@ message RaftMessageRequest { message RaftMessageRequestBatch { repeated RaftMessageRequest requests = 1 [(gogoproto.nullable) = false]; + + // StoreIDs identifies all the stores on the client node. It's populated on + // the first RaftMessageRequestBatch sent along MultiRaft.RaftMessageBatch + // gRPC stream identifying at least one store, and the populated once more + // if any additional stores have been initialized[^1]. This data is used by + // the kvflowcontrol machinery to track the exact set of stores on the client + // node. It uses this information to react to the gRPC streams breaking. Since + // these streams are used to piggy information about which log entries were + // admitted below raft[^2] in order for the server-side to free up flow + // tokens, if the stream breaks we possibly risk leaking these tokens. So + // when these streams break, we use information about the client's stores to + // release all held tokens[^3]. + // + // [^1]: This two-step process is because of how and when we allocate + // StoreIDs. Ignoring nodes that are bootstrapping the cluster (which + // just picks the initial set of StoreIDs -- see + // pkg/server.bootstrapCluster), whenever a new node is added, it's + // assigned a node ID and store ID by an existing node in CRDB (see + // kvpb.JoinNodeResponse). Subsequent store IDs, for multi-store nodes, + // are generated by the joining node by incrementing a sequence ID + // generator (see pkg/server.(*Node).initializeAdditionalStores). All of + // which is to say that the very first time we issue a + // RaftMessageRequestBatch, we might not have all the StoreIDs. But we + // will very shortly after, and certainly before and replicas get + // allocated to the additional store. + // [^2]: See kvflowcontrolpb.AdmittedRaftLogEntries and its use in + // RaftMessageRequest. + // [^3]: See I1 from kvflowcontrol/doc.go. + repeated uint64 store_ids = 2 [(gogoproto.customname) = "StoreIDs", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"]; } message RaftMessageResponseUnion { diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 1e7f5d150bbf..7892290fcb68 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -18,8 +18,13 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -27,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -149,14 +155,105 @@ type RaftTransport struct { stopper *stop.Stopper metrics *RaftTransportMetrics - queues [rpc.NumConnectionClasses]syncutil.IntMap // map[roachpb.NodeID]*raftSendQueue + // Queues maintains a map[roachpb.NodeID]*raftSendQueue on a per rpc-class + // level. When new raftSendQueues are instantiated, or old ones deleted, we + // also maintain kvflowControl.mu.connectedTracker. So writing to this field + // is done while holding kvflowControl.mu. + queues [rpc.NumConnectionClasses]syncutil.IntMap + dialer *nodedialer.Dialer handlers syncutil.IntMap // map[roachpb.StoreID]*RaftMessageHandler + + kvflowControl struct { + // Everything nested under this struct is used to return flow tokens + // from the receiver (where work was admitted) up to the sender (where + // work originated and tokens were deducted). There are few parts to the + // protocol, and can be experimented with using TestFlowControlRaftTransport + // and various TestFlowControl* tests in this package. We briefly sketch + // how the various pieces fit below, repeating some of what is described + // in kvflowcontrol/doc.go (which details about how/why we integrate + // with the RaftTransport so intimately). + // + // Background: Each CRDB node acts as both the "server" and the "client" + // of bidirectional RaftTransport streams. That is, raft messages from + // n1->n2 are sent from the client code on n1 to server code on n2. + // Responses to those raft messages are sent from the client code on n2 + // to server code in n1. + // + // - When flow tokens are deducted where the MsgApps originate, and + // after they're admitted below-raft, the remote nodes send back + // kvflowcontrolpb.AdmittedRaftLogEntries through the RaftMessageBatch + // stream. This happens on the client side, and we read from + // dispatchReader and either attach the protos to already outbound + // messages, or fire off one-off messages periodically if there's + // little raft activity but still tokens to be returned. + // + // - On the server side, we intercept every message and look for these + // protos, and inform the storesForFlowControl integration interface + // of this fact. + // + // - The server side maintains the set of client-side stores it's + // currently connected to, from the POV of these server side streams. + // Since tokens are returned to the server along these streams, when + // they disconnect, it's possible for us to be leaking tokens in + // transit. So it uses information about the set of client-side stores + // it's no longer connected to, to simply free up all held tokens. See + // uses of connectionTracker below and how the storesForFlowControl + // interface is informed about stores we're no longer connected to. + // + // - There's some complexity in how this set of connected stores is + // tracked. For one it's a "connected" set, not a "disconnected" + // one. It's hard to do the latter. Ignoring memory maintenance + // issues that come from nodes that are no longer part of the + // cluster, we have multiple RPC classes from the client side and + // clients are also free to establish/disconnect many streams + // concurrently without synchronization on their end. The server is + // not guaranteed to learn about these streams connecting or + // disconnecting in any particular order, so it's difficult to track + // directly by looking at connection/disconnection events alone + // whether we're connected somehow to a given client. Some form of + // heartbeat scheme comes to mind, which fits more naturally with + // tracking the set of "connected" stores. We can still react to + // streams disconnecting by clearing the relevant stores from the + // tracked set, relying on a subsequent heartbeat (from, say, a + // different stream/RPC class) to re-track that client+its store as + // connected. + // + // - How does the server learn about what stores are present on each + // client? When establishing the stream, the client simply sends this + // information over. If the client learns of newly added stores + // (possible during early startup), it sends it again. See uses of + // localStoreIDs and setAdditionalStoreIDs below. + // + // - Idle streams are periodically culled from the client side. It's + // possible that we cull a stream without having delivered all flow + // tokens to the sender (for example, if below-raft admission happens + // after the stream is culled). The server side detected these + // disconnected streams and releases tokens, but on the client side we + // don't want to accumulate to-be-delivered flow tokens unboundedly. + // So we track the set of servers we're connected to, across all RPC + // classes, and periodically just clear our outbox if there are + // dispatches bound for nodes we're simply not connected to, across + // all RPC classes. See uses of connectedNodes below. + mu struct { + syncutil.RWMutex + localStoreIDs []roachpb.StoreID // sent to servers to track client-side stores + connectionTracker *connectionTrackerForFlowControl + } + setAdditionalStoreIDs atomic.Bool + dispatchReader kvflowcontrol.DispatchReader + handles kvflowcontrol.Handles + disconnectListener RaftTransportDisconnectListener + } + + knobs *RaftTransportTestingKnobs } // raftSendQueue is a queue of outgoing RaftMessageRequest messages. type raftSendQueue struct { reqs chan *kvserverpb.RaftMessageRequest + // The specific node this queue is sending RaftMessageRequests to. + nodeID roachpb.NodeID // The number of bytes in flight. Must be updated *atomically* on sending and // receiving from the reqs channel. bytes atomic.Int64 @@ -169,7 +266,11 @@ func NewDummyRaftTransport(st *cluster.Settings, tracer *tracing.Tracer) *RaftTr return nil, errors.New("dummy resolver") } return NewRaftTransport(log.MakeTestingAmbientContext(tracer), st, tracer, - nodedialer.New(nil, resolver), nil, nil) + nodedialer.New(nil, resolver), nil, nil, + kvflowdispatch.NewDummyDispatch(), NoopStoresFlowControlIntegration{}, + NoopRaftTransportDisconnectListener{}, + nil, + ) } // NewRaftTransport creates a new RaftTransport. @@ -180,14 +281,27 @@ func NewRaftTransport( dialer *nodedialer.Dialer, grpcServer *grpc.Server, stopper *stop.Stopper, + kvflowTokenDispatch kvflowcontrol.DispatchReader, + kvflowHandles kvflowcontrol.Handles, + disconnectListener RaftTransportDisconnectListener, + knobs *RaftTransportTestingKnobs, ) *RaftTransport { + if knobs == nil { + knobs = &RaftTransportTestingKnobs{} + } t := &RaftTransport{ AmbientContext: ambient, st: st, tracer: tracer, stopper: stopper, dialer: dialer, + knobs: knobs, } + t.kvflowControl.dispatchReader = kvflowTokenDispatch + t.kvflowControl.handles = kvflowHandles + t.kvflowControl.disconnectListener = disconnectListener + t.kvflowControl.mu.connectionTracker = newConnectionTrackerForFlowControl() + t.initMetrics() if grpcServer != nil { RegisterMultiRaftServer(grpcServer, t) @@ -195,6 +309,40 @@ func NewRaftTransport( return t } +// Start various internal goroutines needed for the RaftTransport's internal +// functioning. +func (t *RaftTransport) Start(ctx context.Context) error { + if err := t.startDroppingFlowTokensForDisconnectedNodes(ctx); err != nil { + return errors.Wrapf(err, "failed to run flow token dispatch loop") + } + return nil +} + +// SetInitialStoreIDs informs the RaftTransport of the initial set of store +// IDs the local node starts off with. If it's a restarting node, restarted with +// no additional stores, this is just all the local store IDs. For nodes newly +// added to the cluster, it's just the first initialized store. If there are +// additional stores post-restart, or stores other than the first for a newly +// added node, they're provided using (*RaftTransport).SetAdditionalStoreIDs. +func (t *RaftTransport) SetInitialStoreIDs(storeIDs []roachpb.StoreID) { + t.kvflowControl.mu.Lock() + defer t.kvflowControl.mu.Unlock() + t.kvflowControl.mu.localStoreIDs = storeIDs +} + +// SetAdditionalStoreIDs informs the RaftTransport of any additional stores the +// local node starts off with. See commentary on SetInitialStoreIDs for more +// details. +func (t *RaftTransport) SetAdditionalStoreIDs(storeIDs []roachpb.StoreID) { + if len(storeIDs) == 0 { + return // nothing to do + } + t.kvflowControl.mu.Lock() + defer t.kvflowControl.mu.Unlock() + t.kvflowControl.mu.localStoreIDs = append(t.kvflowControl.mu.localStoreIDs, storeIDs...) + t.kvflowControl.setAdditionalStoreIDs.Store(true) +} + // Metrics returns metrics tracking this transport. func (t *RaftTransport) Metrics() *RaftTransportMetrics { return t.metrics @@ -235,6 +383,33 @@ func (t *RaftTransport) getHandler(storeID roachpb.StoreID) (RaftMessageHandler, func (t *RaftTransport) handleRaftRequest( ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, ) *kvpb.Error { + for i := range req.AdmittedRaftLogEntries { + // Process any flow tokens that were returned over the RaftTransport. Do + // this first thing, before these requests enter the receive queues + // which could drop them if full and cause token leaks, or bail after + // processing the raft heartbeats. See I8 from kvflowcontrol/doc.go. + admittedEntries := req.AdmittedRaftLogEntries[i] + handle, found := t.kvflowControl.handles.Lookup(admittedEntries.RangeID) + if found { + handle.ReturnTokensUpto( + ctx, + admissionpb.WorkPriority(admittedEntries.AdmissionPriority), + admittedEntries.UpToRaftLogPosition, + kvflowcontrol.Stream{StoreID: admittedEntries.StoreID}, + ) + } + + if log.V(1) { + log.Infof(ctx, "informed of below-raft %s", admittedEntries) + } + } + if req.ToReplica.StoreID == roachpb.StoreID(0) && len(req.AdmittedRaftLogEntries) > 0 { + // The fallback token dispatch mechanism does not specify a destination + // replica, and as such, there's no handler for it. We don't want to + // return StoreNotFoundErrors in such cases. + return nil + } + handler, ok := t.getHandler(req.ToReplica.StoreID) if !ok { log.Warningf(ctx, "unable to accept Raft message from %+v: no handler registered for %+v", @@ -263,12 +438,26 @@ func newRaftMessageResponse( } // RaftMessageBatch proxies the incoming requests to the listening server interface. -func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer) error { +func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer) (lastErr error) { errCh := make(chan error, 1) // Node stopping error is caught below in the select. taskCtx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context()) + taskCtx = t.AnnotateCtx(taskCtx) defer cancel() + + var storeIDs []roachpb.StoreID + defer func() { + ctx := t.AnnotateCtx(context.Background()) + t.kvflowControl.mu.Lock() + t.kvflowControl.mu.connectionTracker.markStoresDisconnected(storeIDs) + t.kvflowControl.mu.Unlock() + t.kvflowControl.disconnectListener.OnRaftTransportDisconnected(ctx, storeIDs...) + if fn := t.knobs.OnServerStreamDisconnected; fn != nil { + fn() + } + }() + if err := t.stopper.RunAsyncTaskEx( taskCtx, stop.TaskOpts{ @@ -282,10 +471,18 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer if err != nil { return err } + if len(batch.StoreIDs) > 0 { + // Collect the set of store IDs from the client side to + // later free up relevant flow tokens once the gRPC + // stream breaks/disconnects. + storeIDs = batch.StoreIDs + } + t.kvflowControl.mu.Lock() + t.kvflowControl.mu.connectionTracker.markStoresConnected(storeIDs) + t.kvflowControl.mu.Unlock() if len(batch.Requests) == 0 { continue } - for i := range batch.Requests { req := &batch.Requests[i] t.metrics.MessagesRcvd.Inc(1) @@ -398,7 +595,7 @@ func (t *RaftTransport) Stop(storeID roachpb.StoreID) { // lost and a new instance of processQueue will be started by the next message // to be sent. func (t *RaftTransport) processQueue( - q *raftSendQueue, stream MultiRaft_RaftMessageBatchClient, + q *raftSendQueue, stream MultiRaft_RaftMessageBatchClient, class rpc.ConnectionClass, ) error { errCh := make(chan error, 1) @@ -429,25 +626,104 @@ func (t *RaftTransport) processQueue( return err } + maybeAnnotateWithAdmittedRaftLogEntries := func( + req *kvserverpb.RaftMessageRequest, + admitted []kvflowcontrolpb.AdmittedRaftLogEntries, + ) { + if len(admitted) == 0 { + return // nothing to do + } + req.AdmittedRaftLogEntries = append(req.AdmittedRaftLogEntries, admitted...) + flowTokenDispatchCount := len(req.AdmittedRaftLogEntries) + if log.V(2) && flowTokenDispatchCount > 0 { + for i, admittedEntries := range req.AdmittedRaftLogEntries { + log.Infof(ctx, "informing n%s of below-raft %s: %d out of %d dispatches", + q.nodeID, admittedEntries, + i+1, flowTokenDispatchCount, + ) + } + } + } + + var sentInitialStoreIDs, sentAdditionalStoreIDs bool + maybeAnnotateWithStoreIDs := func(batch *kvserverpb.RaftMessageRequestBatch) { + shouldSendAdditionalStoreIDs := t.kvflowControl.setAdditionalStoreIDs.Load() && !sentAdditionalStoreIDs + if !sentInitialStoreIDs || shouldSendAdditionalStoreIDs { + t.kvflowControl.mu.RLock() + batch.StoreIDs = nil + batch.StoreIDs = append(batch.StoreIDs, t.kvflowControl.mu.localStoreIDs...) + t.kvflowControl.mu.RUnlock() + // Unconditionally set sentInitialStoreIDs, since we always have + // the initial store IDs before the additional ones. + sentInitialStoreIDs = true + // Mark that we've sent the additional store IDs, to not need to + // re-send it again. + sentAdditionalStoreIDs = shouldSendAdditionalStoreIDs + log.VInfof(ctx, 1, "informing n%d of %d local store ID(s) (%s) over the raft transport[%s]", + q.nodeID, len(batch.StoreIDs), roachpb.StoreIDSlice(batch.StoreIDs), class) + } + } + + clearRequestBatch := func(batch *kvserverpb.RaftMessageRequestBatch) { + // Reuse the Requests slice, but zero out the contents to avoid delaying + // GC of memory referenced from within. + for i := range batch.Requests { + batch.Requests[i] = kvserverpb.RaftMessageRequest{} + } + batch.Requests = batch.Requests[:0] + batch.StoreIDs = nil + } + var raftIdleTimer timeutil.Timer defer raftIdleTimer.Stop() + + var dispatchPendingFlowTokensTimer timeutil.Timer + defer dispatchPendingFlowTokensTimer.Stop() + dispatchPendingFlowTokensTimer.Reset(kvadmission.FlowTokenDispatchInterval.Get(&t.st.SV)) + + dispatchPendingFlowTokensCh := dispatchPendingFlowTokensTimer.C + if t.knobs.TriggerFallbackDispatchCh != nil { + dispatchPendingFlowTokensCh = t.knobs.TriggerFallbackDispatchCh + } + batch := &kvserverpb.RaftMessageRequestBatch{} for { raftIdleTimer.Reset(raftIdleTimeout) + select { case <-t.stopper.ShouldQuiesce(): return nil + case <-raftIdleTimer.C: raftIdleTimer.Read = true return nil + case err := <-errCh: return err + case req := <-q.reqs: size := int64(req.Size()) q.bytes.Add(-size) budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - size + + // Piggyback any pending flow token dispatches on raft transport + // messages already bound for the remote node. If the stream + // over which we're returning these flow tokens breaks, this is + // detected by the remote node, where tokens were originally + // deducted, who then frees up all held tokens (see I1 from + // kvflowcontrol/doc.go). If the stream is culled because it's + // idle, that's deducted remotely using the same stream-break + // mechanism. If there are no open streams to a given node and + // there's still pending flow tokens, we'll drop those tokens to + // reclaim memory in dropFlowTokensForDisconnectedNodes. For + // idle-but-not-culled connections, we have a fallback timer to + // periodically transmit one-off RaftMessageRequests for timely + // token returns. + pendingDispatches := t.kvflowControl.dispatchReader.PendingDispatchFor(q.nodeID) + maybeAnnotateWithAdmittedRaftLogEntries(req, pendingDispatches) batch.Requests = append(batch.Requests, *req) releaseRaftMessageRequest(req) + // Pull off as many queued requests as possible, within reason. for budget > 0 { select { @@ -462,18 +738,54 @@ func (t *RaftTransport) processQueue( } } - err := stream.Send(batch) - if err != nil { + maybeAnnotateWithStoreIDs(batch) + if err := stream.Send(batch); err != nil { + t.metrics.FlowTokenDispatchesDropped.Inc(int64(len(pendingDispatches))) + return err + } + t.metrics.MessagesSent.Inc(int64(len(batch.Requests))) + clearRequestBatch(batch) + + case <-dispatchPendingFlowTokensCh: + dispatchPendingFlowTokensTimer.Read = true + dispatchPendingFlowTokensTimer.Reset(kvadmission.FlowTokenDispatchInterval.Get(&t.st.SV)) + + pendingDispatches := t.kvflowControl.dispatchReader.PendingDispatchFor(q.nodeID) + if len(pendingDispatches) == 0 { + continue // nothing to do + } + + // TODO(irfansharif): There's no limit on how many pending + // dispatches are going to be attached to the outgoing raft + // messages, both here and above. It can be excessive -- limit this + // by some count/byte policy as part of #95563. + req := newRaftMessageRequest() + maybeAnnotateWithAdmittedRaftLogEntries(req, pendingDispatches) + batch.Requests = append(batch.Requests, *req) + releaseRaftMessageRequest(req) + + maybeAnnotateWithStoreIDs(batch) + if err := stream.Send(batch); err != nil { + t.metrics.FlowTokenDispatchesDropped.Inc(int64(len(pendingDispatches))) return err } t.metrics.MessagesSent.Inc(int64(len(batch.Requests))) + clearRequestBatch(batch) - // Reuse the Requests slice, but zero out the contents to avoid delaying - // GC of memory referenced from within. - for i := range batch.Requests { - batch.Requests[i] = kvserverpb.RaftMessageRequest{} + if fn := t.knobs.OnFallbackDispatch; fn != nil { + fn() + } + + case gotNodeID := <-t.knobs.MarkSendQueueAsIdleCh: + if q.nodeID == gotNodeID { + return nil + } + select { + // Echo the node ID back into MarkSendQueueAsIdleCh until it reaches + // a raftSendQueue for this ID. Only used in tests. + case t.knobs.MarkSendQueueAsIdleCh <- gotNodeID: + default: } - batch.Requests = batch.Requests[:0] } } } @@ -486,8 +798,14 @@ func (t *RaftTransport) getQueue( queuesMap := &t.queues[class] value, ok := queuesMap.Load(int64(nodeID)) if !ok { - q := raftSendQueue{reqs: make(chan *kvserverpb.RaftMessageRequest, raftSendBufferSize)} + t.kvflowControl.mu.Lock() + q := raftSendQueue{ + reqs: make(chan *kvserverpb.RaftMessageRequest, raftSendBufferSize), + nodeID: nodeID, + } value, ok = queuesMap.LoadOrStore(int64(nodeID), unsafe.Pointer(&q)) + t.kvflowControl.mu.connectionTracker.markNodeConnected(nodeID, class) + t.kvflowControl.mu.Unlock() } return (*raftSendQueue)(value), ok } @@ -580,8 +898,19 @@ func (t *RaftTransport) startProcessNewQueue( if !existingQueue { log.Fatalf(ctx, "queue for n%d does not exist", toNodeID) } + defer func() { + if fn := t.knobs.OnWorkerTeardown; fn != nil { + fn(toNodeID) + } + }() defer cleanup(q) defer t.queues[class].Delete(int64(toNodeID)) + defer func() { + t.kvflowControl.mu.Lock() + t.queues[class].Delete(int64(toNodeID)) + t.kvflowControl.mu.connectionTracker.markNodeDisconnected(toNodeID, class) + t.kvflowControl.mu.Unlock() + }() conn, err := t.dialer.Dial(ctx, toNodeID, class) if err != nil { // DialNode already logs sufficiently, so just return. @@ -598,7 +927,7 @@ func (t *RaftTransport) startProcessNewQueue( return } - if err := t.processQueue(q, stream); err != nil { + if err := t.processQueue(q, stream, class); err != nil { log.Warningf(ctx, "while processing outgoing Raft queue to node %d: %s:", toNodeID, err) } } @@ -607,12 +936,102 @@ func (t *RaftTransport) startProcessNewQueue( pprof.Do(ctx, pprof.Labels("remote_node_id", toNodeID.String()), worker) }) if err != nil { + t.kvflowControl.mu.Lock() t.queues[class].Delete(int64(toNodeID)) + t.kvflowControl.mu.connectionTracker.markNodeDisconnected(toNodeID, class) + t.kvflowControl.mu.Unlock() return false } return true } +// startDroppingFlowTokensForDisconnectedNodes kicks of an asynchronous worker +// that periodically scans for nodes we're no longer connected to, and if there +// are any pending flow tokens bound for that node, it simply drops them. This +// "connected nodes" is a client-side view of the world. On the server-side when +// gRPC streams disconnect, we release all held tokens. So simply dropping these +// pending dispatches on the client side does not cause token leaks, and exists +// to prevent an unbounded accumulation of memory. +func (t *RaftTransport) startDroppingFlowTokensForDisconnectedNodes(ctx context.Context) error { + return t.stopper.RunAsyncTask( + ctx, + "kvserver.RaftTransport: drop flow tokens for disconnected nodes", + func(ctx context.Context) { + settingChangeCh := make(chan struct{}, 1) + kvadmission.FlowTokenDropInterval.SetOnChange( + &t.st.SV, func(ctx context.Context) { + select { + case settingChangeCh <- struct{}{}: + default: + } + }) + + timer := timeutil.NewTimer() + defer timer.Stop() + + for { + interval := kvadmission.FlowTokenDropInterval.Get(&t.st.SV) + if interval > 0 { + timer.Reset(interval) + } else { + // Disable the mechanism. + timer.Stop() + timer = timeutil.NewTimer() + } + select { + case <-timer.C: + timer.Read = true + t.dropFlowTokensForDisconnectedNodes() + continue + + case <-settingChangeCh: + // Loop around to use the updated timer. + continue + + case <-ctx.Done(): + return + + case <-t.stopper.ShouldQuiesce(): + return + } + } + }, + ) +} + +// TestingDropFlowTokensForDisconnectedNodes exports +// dropFlowTokensForDisconnectedNodes for testing purposes. +func (t *RaftTransport) TestingDropFlowTokensForDisconnectedNodes() { + t.dropFlowTokensForDisconnectedNodes() +} + +// TestingPrintFlowControlConnectionTracker renders the state of the underlying +// connection tracker. +func (t *RaftTransport) TestingPrintFlowControlConnectionTracker() string { + t.kvflowControl.mu.RLock() + defer t.kvflowControl.mu.RUnlock() + return t.kvflowControl.mu.connectionTracker.testingPrint() +} + +func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() { + t.kvflowControl.mu.RLock() + defer t.kvflowControl.mu.RUnlock() + for _, nodeID := range t.kvflowControl.dispatchReader.PendingDispatch() { + if t.kvflowControl.mu.connectionTracker.isNodeConnected(nodeID) { + // If there's already a queue active, there's nothing to do. We rely + // on timely piggybacking of flow tokens on existing raft transport + // messages. It's only when all queues are idle/culled that we use + // this worker to drop any held tokens. The expectation is that on + // the server-side of the stream, we'll have returned all tokens + // when streams disconnect. + continue + } + // Drop any held tokens for that node. + pendingDispatches := t.kvflowControl.dispatchReader.PendingDispatchFor(nodeID) + t.metrics.FlowTokenDispatchesDropped.Inc(int64(len(pendingDispatches))) + } +} + // SendSnapshot streams the given outgoing snapshot. The caller is responsible // for closing the OutgoingSnapshot. func (t *RaftTransport) SendSnapshot( @@ -695,9 +1114,35 @@ func (t *RaftTransport) DelegateSnapshot( errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError) case kvserverpb.DelegateSnapshotResponse_APPLIED: // This is the response we're expecting. Snapshot successfully applied. - log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", resp) + log.VEventf(ctx, 3, "%s: delegated snapshot was successfully applied", resp) return nil default: return err } } + +// RaftTransportTestingKnobs provide fine-grained control over the RaftTransport +// for tests. +type RaftTransportTestingKnobs struct { + // MarkSendQueueAsIdleCh is used to selectively mark a raft send queue as + // idle, identified by the remote node ID. The remote node ID must be + // connected to by this transport. + MarkSendQueueAsIdleCh chan roachpb.NodeID + // OnWorkerTeardown, if set, is invoked when a worker thread for a given + // send queue is tore down. + OnWorkerTeardown func(roachpb.NodeID) + // OnServerStreamDisconnected is invoked whenever the RaftMessageBatch + // stream is disconnected on the server side. + OnServerStreamDisconnected func() + // TriggerFallbackDispatchCh is used to manually trigger the fallback + // dispatch in tests. + TriggerFallbackDispatchCh chan time.Time + // OnFallbackDispatch is invoked whenever the fallback token dispatch + // mechanism is used. + OnFallbackDispatch func() +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t *RaftTransportTestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*RaftTransportTestingKnobs)(nil) diff --git a/pkg/kv/kvserver/raft_transport_metrics.go b/pkg/kv/kvserver/raft_transport_metrics.go index 971903c9503f..cea7d5eae8c6 100644 --- a/pkg/kv/kvserver/raft_transport_metrics.go +++ b/pkg/kv/kvserver/raft_transport_metrics.go @@ -23,6 +23,8 @@ type RaftTransportMetrics struct { ReverseSent *metric.Counter ReverseRcvd *metric.Counter + + FlowTokenDispatchesDropped *metric.Counter } func (t *RaftTransport) initMetrics() { @@ -91,5 +93,12 @@ responses to Raft messages. Responses are received over another stream.`, Measurement: "Messages", Unit: metric.Unit_COUNT, }), + + FlowTokenDispatchesDropped: metric.NewCounter(metric.Metadata{ + Name: "raft.transport.flow-token-dispatches-dropped", + Help: "Number of flow token dispatches dropped by the Raft Transport", + Measurement: "Dispatches", + Unit: metric.Unit_COUNT, + }), } } diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index c478056a11ea..fd9ed75045a7 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -22,6 +22,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -112,15 +114,17 @@ type raftTransportTestContext struct { transports map[roachpb.NodeID]*kvserver.RaftTransport nodeRPCContext *rpc.Context gossip *gossip.Gossip + st *cluster.Settings } -func newRaftTransportTestContext(t testing.TB) *raftTransportTestContext { +func newRaftTransportTestContext(t testing.TB, st *cluster.Settings) *raftTransportTestContext { ctx := context.Background() tr := tracing.NewTracer() rttc := &raftTransportTestContext{ t: t, stopper: stop.NewStopper(stop.WithTracer(tr)), transports: map[roachpb.NodeID]*kvserver.RaftTransport{}, + st: st, } rttc.nodeRPCContext = rpc.NewContext(ctx, rpc.ContextOptions{ TenantID: roachpb.SystemTenantID, @@ -128,7 +132,7 @@ func newRaftTransportTestContext(t testing.TB) *raftTransportTestContext { Clock: &timeutil.DefaultTimeSource{}, ToleratedOffset: time.Nanosecond, Stopper: rttc.stopper, - Settings: cluster.MakeTestingClusterSettings(), + Settings: st, }) // Ensure that tests using this test context and restart/shut down // their servers do not inadvertently start talking to servers from @@ -152,7 +156,12 @@ func (rttc *raftTransportTestContext) Stop() { // before they can be used in other methods of // raftTransportTestContext. The node will be gossiped immediately. func (rttc *raftTransportTestContext) AddNode(nodeID roachpb.NodeID) *kvserver.RaftTransport { - transport, addr := rttc.AddNodeWithoutGossip(nodeID, util.TestAddr, rttc.stopper) + transport, addr := rttc.AddNodeWithoutGossip( + nodeID, util.TestAddr, rttc.stopper, + kvflowdispatch.NewDummyDispatch(), kvserver.NoopStoresFlowControlIntegration{}, + kvserver.NoopRaftTransportDisconnectListener{}, + nil, + ) rttc.GossipNode(nodeID, addr) return transport } @@ -162,18 +171,28 @@ func (rttc *raftTransportTestContext) AddNode(nodeID roachpb.NodeID) *kvserver.R // raftTransportTestContext. Unless you are testing the effects of // delaying gossip, use AddNode instead. func (rttc *raftTransportTestContext) AddNodeWithoutGossip( - nodeID roachpb.NodeID, addr net.Addr, stopper *stop.Stopper, + nodeID roachpb.NodeID, + addr net.Addr, + stopper *stop.Stopper, + kvflowTokenDispatch kvflowcontrol.DispatchReader, + kvflowHandles kvflowcontrol.Handles, + disconnectListener kvserver.RaftTransportDisconnectListener, + knobs *kvserver.RaftTransportTestingKnobs, ) (*kvserver.RaftTransport, net.Addr) { grpcServer, err := rpc.NewServer(rttc.nodeRPCContext) require.NoError(rttc.t, err) ctwWithTracer := log.MakeTestingAmbientCtxWithNewTracer() transport := kvserver.NewRaftTransport( ctwWithTracer, - cluster.MakeTestingClusterSettings(), + rttc.st, ctwWithTracer.Tracer, nodedialer.New(rttc.nodeRPCContext, gossip.AddressResolver(rttc.gossip)), grpcServer, rttc.stopper, + kvflowTokenDispatch, + kvflowHandles, + disconnectListener, + knobs, ) rttc.transports[nodeID] = transport ln, err := netutil.ListenAndServeGRPC(stopper, grpcServer, addr) @@ -223,7 +242,7 @@ func (rttc *raftTransportTestContext) Send( func TestSendAndReceive(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rttc := newRaftTransportTestContext(t) + rttc := newRaftTransportTestContext(t, cluster.MakeTestingClusterSettings()) defer rttc.Stop() // Create several servers, each of which has two stores (A raft @@ -391,7 +410,7 @@ func TestSendAndReceive(t *testing.T) { func TestInOrderDelivery(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rttc := newRaftTransportTestContext(t) + rttc := newRaftTransportTestContext(t, cluster.MakeTestingClusterSettings()) defer rttc.Stop() const numMessages = 100 @@ -429,7 +448,7 @@ func TestInOrderDelivery(t *testing.T) { func TestRaftTransportCircuitBreaker(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rttc := newRaftTransportTestContext(t) + rttc := newRaftTransportTestContext(t, cluster.MakeTestingClusterSettings()) defer rttc.Stop() serverReplica := roachpb.ReplicaDescriptor{ @@ -437,7 +456,15 @@ func TestRaftTransportCircuitBreaker(t *testing.T) { StoreID: 2, ReplicaID: 2, } - _, serverAddr := rttc.AddNodeWithoutGossip(serverReplica.NodeID, util.TestAddr, rttc.stopper) + _, serverAddr := rttc.AddNodeWithoutGossip( + serverReplica.NodeID, + util.TestAddr, + rttc.stopper, + kvflowdispatch.NewDummyDispatch(), + kvserver.NoopStoresFlowControlIntegration{}, + kvserver.NoopRaftTransportDisconnectListener{}, + nil, + ) serverChannel := rttc.ListenStore(serverReplica.NodeID, serverReplica.StoreID) clientReplica := roachpb.ReplicaDescriptor{ @@ -484,7 +511,7 @@ func TestRaftTransportCircuitBreaker(t *testing.T) { func TestRaftTransportIndependentRanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rttc := newRaftTransportTestContext(t) + rttc := newRaftTransportTestContext(t, cluster.MakeTestingClusterSettings()) defer rttc.Stop() server := roachpb.ReplicaDescriptor{ @@ -531,7 +558,7 @@ func TestRaftTransportIndependentRanges(t *testing.T) { func TestReopenConnection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rttc := newRaftTransportTestContext(t) + rttc := newRaftTransportTestContext(t, cluster.MakeTestingClusterSettings()) defer rttc.Stop() // Use a special stopper for the initial server so that we can fully stop it @@ -543,7 +570,15 @@ func TestReopenConnection(t *testing.T) { ReplicaID: 2, } serverTransport, serverAddr := - rttc.AddNodeWithoutGossip(serverReplica.NodeID, util.TestAddr, serverStopper) + rttc.AddNodeWithoutGossip( + serverReplica.NodeID, + util.TestAddr, + serverStopper, + kvflowdispatch.NewDummyDispatch(), + kvserver.NoopStoresFlowControlIntegration{}, + kvserver.NoopRaftTransportDisconnectListener{}, + nil, + ) rttc.GossipNode(serverReplica.NodeID, serverAddr) rttc.ListenStore(serverReplica.NodeID, serverReplica.StoreID) @@ -574,7 +609,15 @@ func TestReopenConnection(t *testing.T) { ReplicaID: 3, } - rttc.AddNodeWithoutGossip(replacementReplica.NodeID, serverAddr, rttc.stopper) + rttc.AddNodeWithoutGossip( + replacementReplica.NodeID, + serverAddr, + rttc.stopper, + kvflowdispatch.NewDummyDispatch(), + kvserver.NoopStoresFlowControlIntegration{}, + kvserver.NoopRaftTransportDisconnectListener{}, + nil, + ) replacementChannel := rttc.ListenStore(replacementReplica.NodeID, replacementReplica.StoreID) // Try sending a message to the old server's store (at the address its @@ -630,7 +673,7 @@ func TestReopenConnection(t *testing.T) { func TestSendFailureToConnectDoesNotHangRaft(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rttc := newRaftTransportTestContext(t) + rttc := newRaftTransportTestContext(t, cluster.MakeTestingClusterSettings()) defer rttc.Stop() // Create a single server from which we're going to call send. diff --git a/pkg/kv/kvserver/raft_transport_unit_test.go b/pkg/kv/kvserver/raft_transport_unit_test.go index ec4f04f5a2e1..35907a3a7bd8 100644 --- a/pkg/kv/kvserver/raft_transport_unit_test.go +++ b/pkg/kv/kvserver/raft_transport_unit_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -77,6 +78,10 @@ func TestRaftTransportStartNewQueue(t *testing.T) { nodedialer.New(rpcC, resolver), grpcServer, stopper, + kvflowdispatch.NewDummyDispatch(), + NoopStoresFlowControlIntegration{}, + NoopRaftTransportDisconnectListener{}, + nil, /* knobs */ ) ln, err := netutil.ListenAndServeGRPC(stopper, grpcServer, &util.UnresolvedAddr{NetworkField: "tcp", AddressField: "localhost:0"}) diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 090acdbf7a65..c5e2f1e373d5 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -38,6 +38,7 @@ import ( aload "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" @@ -232,7 +233,18 @@ func createTestStoreWithoutStart( // and it's important that this doesn't cause crashes. Just set up the // "real thing" since it's straightforward enough. cfg.NodeDialer = nodedialer.New(rpcContext, gossip.AddressResolver(cfg.Gossip)) - cfg.Transport = NewRaftTransport(cfg.AmbientCtx, cfg.Settings, cfg.Tracer(), cfg.NodeDialer, server, stopper) + cfg.Transport = NewRaftTransport( + cfg.AmbientCtx, + cfg.Settings, + cfg.Tracer(), + cfg.NodeDialer, + server, + stopper, + kvflowdispatch.NewDummyDispatch(), + NoopStoresFlowControlIntegration{}, + NoopRaftTransportDisconnectListener{}, + nil, /* knobs */ + ) stores := NewStores(cfg.AmbientCtx, cfg.Clock) nodeDesc := &roachpb.NodeDescriptor{NodeID: 1} diff --git a/pkg/kv/kvserver/testdata/flow_control_raft_transport/basic_piggyback b/pkg/kv/kvserver/testdata/flow_control_raft_transport/basic_piggyback new file mode 100644 index 000000000000..6f66b7b02425 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_raft_transport/basic_piggyback @@ -0,0 +1,40 @@ +# Walk through the basics of the data-driven syntax. + +init +---- + +# Set up two nodes, each with one store. +add node=n1 store=s1 +---- + +add node=n2 store=s2 +---- + +# Dispatch flow tokens from n1 to n2. +dispatch from=n1 +node=n2 store=s1 range=r1 pri=normal-pri up-to-log-position=5/20 +node=n2 store=s1 range=r1 pri=low-pri up-to-log-position=5/21 +---- + +# Note that they're still pending delivery. +pending-dispatch from=n1 to=n2 +---- +range=r1 pri=low-pri store=s1 up-to-log-position=log-position=5/21 +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=5/20 + +# Send a raft message for r1 from n1 to n2, each node holding a replica with +# id=1,2 respectively. We expect the tokens to get piggybacked here. +send range=r1 from=n1/s1/1 to=n2/s2/2 commit=1 +---- + +# Note that there are no more pending dispatches, and not because they were +# dropped. +pending-dispatch from=n1 to=n2 +---- + +metrics +---- +node=n1: dispatches-dropped=0 +node=n2: dispatches-dropped=0 + +# vim:ft=sh diff --git a/pkg/kv/kvserver/testdata/flow_control_raft_transport/fallback_dispatch b/pkg/kv/kvserver/testdata/flow_control_raft_transport/fallback_dispatch new file mode 100644 index 000000000000..206ae3e53a5c --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_raft_transport/fallback_dispatch @@ -0,0 +1,48 @@ +# Walk through what happens in fallback path of token dispatch, i.e. we don't +# have raft messages to piggy token returns on top of but we then fire one-off +# messages to deliver tokens. + +init +---- + +# Set up two nodes, each with one store. +add node=n1 store=s1 +---- + +add node=n2 store=s2 +---- + +# Send a raft message for r1 from n1 to n2 and vice versa, each node holding a +# replica with id=1,2 respectively. We do this to create the non-idle connect +# between the two nodes (it's done on demand). +send range=r1 from=n1/s1/1 to=n2/s2/2 commit=1 +---- + +send range=r1 from=n2/s2/2 to=n1/s1/1 commit=1 +---- + +dispatch from=n2 +node=n1 store=s1 range=r1 pri=normal-pri up-to-log-position=6/20 +node=n1 store=s1 range=r1 pri=low-pri up-to-log-position=6/21 +---- + +# Note that they're still pending delivery. +pending-dispatch from=n2 to=n1 +---- +range=r1 pri=low-pri store=s1 up-to-log-position=log-position=6/21 +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20 + +fallback-dispatch from=n2 +---- + +# Note that there are no more pending dispatches, and not because they were +# dropped. +pending-dispatch from=n2 to=n1 +---- + +metrics +---- +node=n1: dispatches-dropped=0 +node=n2: dispatches-dropped=0 + +# vim:ft=sh diff --git a/pkg/kv/kvserver/testdata/flow_control_raft_transport/idle_connection b/pkg/kv/kvserver/testdata/flow_control_raft_transport/idle_connection new file mode 100644 index 000000000000..eb9e5b24b189 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_raft_transport/idle_connection @@ -0,0 +1,92 @@ +# Walk through what exactly happens when a client marks its outbound raft +# transport stream as idle. It's helpful to also read the store_data test in +# this directory. + +init +---- + +# Set up two nodes, each with one store. +add node=n1 store=s1 +---- + +add node=n2 store=s2 +---- + +# Send a raft message for r1 from n1 to n2 and vice versa, each node holding a +# replica with id=1,2 respectively. We do this to create the two non-idle +# client-> server connections between the two nodes, going both ways. It's done +# on demand. +send range=r1 from=n1/s1/1 to=n2/s2/2 commit=1 +---- + +send range=r1 from=n2/s2/2 to=n1/s1/1 commit=1 +---- + +# Verify that n1's marked n2 as something its connected to (as a client), and +# vice-versa. This test isn't making use of the +# set-{initial,additional}-store-ids directives used in the store_data test, +# which is triggered in deployments and showcases the server's POV. This test +# is narrowly showing what happens from the client's POV -- so ignore all +# server POV details. +connection-tracker from=n1 +---- +connected-stores (server POV): +connected-nodes (client POV): n2 + +connection-tracker from=n2 +---- +connected-stores (server POV): +connected-nodes (client POV): n1 + +# Return some flow tokens from n2 back to n1, where these tokens originally +# would have been deducted. +dispatch from=n2 +node=n1 store=s1 range=r1 pri=normal-pri up-to-log-position=6/20 +node=n1 store=s1 range=r1 pri=low-pri up-to-log-position=6/21 +---- + +# Note that they're pending delivery. +pending-dispatch from=n2 to=n1 +---- +range=r1 pri=low-pri store=s1 up-to-log-position=log-position=6/21 +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20 + + +# Mark the client-initiated stream from n2->n1 as idle. +client-mark-idle from=n2 to=n1 +---- + +# n2's wound down its transport streams and has marked n1 as something it's +# disconnected from as a client. +connection-tracker from=n2 +---- +connected-stores (server POV): +connected-nodes (client POV): + +# n1's not done the same since it still is connected to n2 as a client. We +# don't actually care about n1's perspective in this test. +connection-tracker from=n1 +---- +connected-stores (server POV): +connected-nodes (client POV): n2 + +# Note that they're still pending delivery. They'll only get pruned out by the +# periodic pruning process, which is invoked next. +pending-dispatch from=n2 to=n1 +---- +range=r1 pri=low-pri store=s1 up-to-log-position=log-position=6/21 +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20 + +drop-disconnected-tokens from=n2 +---- + +# Observe that there's nothing pending dispatch and the metrics indicate as much. +pending-dispatch from=n2 to=n1 +---- + +metrics +---- +node=n1: dispatches-dropped=0 +node=n2: dispatches-dropped=2 + +# vim:ft=sh diff --git a/pkg/kv/kvserver/testdata/flow_control_raft_transport/store_data b/pkg/kv/kvserver/testdata/flow_control_raft_transport/store_data new file mode 100644 index 000000000000..3e9474aadd47 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_raft_transport/store_data @@ -0,0 +1,83 @@ +# Walk through the basics of the data-driven syntax. + +init +---- + +# Set up two nodes, each with one store. +add node=n1 store=s1 +---- + +add node=n2 store=s2 +---- + +set-initial-store-ids from=n1 stores=s1 +---- + +set-initial-store-ids from=n2 stores=s2,s3 +---- + +set-additional-store-ids from=n1 stores=s4 +---- + +# Send a raft message for r1 from n1 to n2 and vice versa, each node holding a +# replica with id=1,2 respectively. We do this to create the non-idle +# connection between the two nodes (it's done on demand). We should transmit +# (s2,s3) from n2->n1, and (s1,s4) from n1->n2. So effectively n1 should be +# connected to n2:s2,s3, and n2 to n1:s1,s4. +send range=r1 from=n1/s1/1 to=n2/s2/2 commit=1 +---- + +send range=r1 from=n2/s2/2 to=n1/s1/1 commit=1 +---- + +# Confirm the node+store connectivity, as seen by each node. The server POV is +# always store-oriented, since the server cares most about what remote stores +# got disconnected in order to release tokens that are identified partly by +# store IDs. The raft transport client cares most about what servers it's +# connected to, identified by node ID, since the client is the one managing the +# outbox of tokens to return to the server. If it's no longer connected to some +# server, two things happen: +# - The server detects this, and releases tokens (as described above). +# - The client can reclaim some memory in its outbox, where messages destined +# for the server just no longer need delivery. +connection-tracker from=n1 +---- +connected-stores (server POV): s2,s3 +connected-nodes (client POV): n2 + +connection-tracker from=n2 +---- +connected-stores (server POV): s1,s4 +connected-nodes (client POV): n1 + +# Mark the client-initiated stream from n2->n1 as idle. +client-mark-idle from=n2 to=n1 +---- + +# Confirm that n1 detects this on the server side, and marks that it's no +# longer connected to s2 and s3. It still show's that it's connected to n2 as a +# client, but this test primarily cares about the empty server POV. +# +# The set of connected nodes however is unchanged from n1's perspective, as +# there's still an n1->n2 link. The connected-nodes tracking is done on the +# client side. +connection-tracker from=n1 +---- +connected-stores (server POV): +connected-nodes (client POV): n2 + +# Confirm that the RaftTransport has informed the flow-control integration +# layer of this fact. +disconnect-listener from=n1 +---- +disconnected-from: s2,s3 + +# Sanity check that n2 marks that it's no longer connected to n1 as a client. +# Since it acts as the server for the n1->n2 link, it shows that it's connected +# to n1's stores. +connection-tracker from=n2 +---- +connected-stores (server POV): s1,s4 +connected-nodes (client POV): + +# vim:ft=sh diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 45a0201023ab..3b482f9420b6 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -44,6 +44,17 @@ func (s NodeIDSlice) Len() int { return len(s) } func (s NodeIDSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s NodeIDSlice) Less(i, j int) bool { return s[i] < s[j] } +func (s NodeIDSlice) String() string { + var sb strings.Builder + for i, ni := range s { + if i > 0 { + sb.WriteRune(',') + } + fmt.Fprintf(&sb, "n%d", ni) + } + return sb.String() +} + // StoreID is a custom type for a cockroach store ID. type StoreID int32 @@ -54,6 +65,20 @@ func (s StoreIDSlice) Len() int { return len(s) } func (s StoreIDSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s StoreIDSlice) Less(i, j int) bool { return s[i] < s[j] } +func (s StoreIDSlice) String() string { + var sb strings.Builder + for i, st := range s { + if i > 0 { + sb.WriteRune(',') + } + fmt.Fprintf(&sb, "s%d", st) + } + return sb.String() +} + +// SafeValue implements the redact.SafeValue interface. +func (s StoreIDSlice) SafeValue() {} + // String implements the fmt.Stringer interface. // It is used to format the ID for use in Gossip keys. func (n StoreID) String() string { diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 7386ed296d2d..219b1e095f71 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -126,6 +126,7 @@ go_library( "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", "//pkg/kv/kvserver/kvadmission", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/kvstorage", diff --git a/pkg/server/init.go b/pkg/server/init.go index 90e40d2dbc47..9e406fffd2e1 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -158,6 +158,33 @@ func (i *initState) validate() error { return nil } +// initialStoreIDs returns the initial set of store IDs a node starts off with. +// If it's a restarting node, restarted with no additional stores, this is just +// all the local store IDs. If there's an additional store post-restart that's +// yet to be initialized, it's not found in this list. For nodes newly added to +// the cluster, it's just the first initialized store. For the remaining stores +// in all these cases, they're accessible through +// (*initState).additionalStoreIDs once they're initialized. +func (i *initState) initialStoreIDs() ([]roachpb.StoreID, error) { + return getStoreIDsInner(i.initializedEngines) +} + +func (i *initState) additionalStoreIDs() ([]roachpb.StoreID, error) { + return getStoreIDsInner(i.uninitializedEngines) +} + +func getStoreIDsInner(engines []storage.Engine) ([]roachpb.StoreID, error) { + storeIDs := make([]roachpb.StoreID, 0, len(engines)) + for _, eng := range engines { + storeID, err := eng.GetStoreID() + if err != nil { + return nil, err + } + storeIDs = append(storeIDs, roachpb.StoreID(storeID)) + } + return storeIDs, nil +} + // joinResult is used to represent the result of a node attempting to join // an already bootstrapped cluster. type joinResult struct { @@ -706,6 +733,9 @@ func inspectEngines( } nodeID = storeIdent.NodeID + if err := eng.SetStoreID(ctx, int32(storeIdent.StoreID)); err != nil { + return nil, err + } initializedEngines = append(initializedEngines, eng) } clusterVersion, err := kvstorage.SynthesizeClusterVersionFromEngines( diff --git a/pkg/server/server.go b/pkg/server/server.go index 7f0640c56ae2..9fd7b2dd7924 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -547,8 +548,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { /* deterministic */ false, ) + storesForFlowControl := kvserver.MakeStoresForFlowControl(stores) + kvflowTokenDispatch := kvflowdispatch.New(registry, storesForFlowControl, nodeIDContainer) raftTransport := kvserver.NewRaftTransport( - cfg.AmbientCtx, st, cfg.AmbientCtx.Tracer, nodeDialer, grpcServer.Server, stopper, + cfg.AmbientCtx, + st, + cfg.AmbientCtx.Tracer, + nodeDialer, + grpcServer.Server, + stopper, + kvflowTokenDispatch, + storesForFlowControl, + storesForFlowControl, + nil, /* knobs */ ) registry.AddMetricStruct(raftTransport.Metrics()) @@ -1662,6 +1674,14 @@ func (s *Server) PreStart(ctx context.Context) error { // initState -- and everything after it is actually starting the server, // using the listeners and init state. + initialStoreIDs, err := state.initialStoreIDs() + if err != nil { + return err + } + + // Inform the raft transport of these initial store IDs. + s.raftTransport.SetInitialStoreIDs(initialStoreIDs) + // Spawn a goroutine that will print a nice message when Gossip connects. // Note that we already know the clusterID, but we don't know that Gossip // has connected. The pertinent case is that of restarting an entire @@ -1810,6 +1830,14 @@ func (s *Server) PreStart(ctx context.Context) error { // stores) s.node.waitForAdditionalStoreInit() + additionalStoreIDs, err := state.additionalStoreIDs() + if err != nil { + return err + } + + // Inform the raft transport of these additional store IDs. + s.raftTransport.SetAdditionalStoreIDs(additionalStoreIDs) + // Connect the engines to the disk stats map constructor. This needs to // wait until after waitForAdditionalStoreInit returns since it realizes on // wholly initialized stores (it reads the StoreIdentKeys). It also needs @@ -1976,6 +2004,11 @@ func (s *Server) PreStart(ctx context.Context) error { // Start the closed timestamp loop. s.ctSender.Run(workersCtx, state.nodeID) + // Start dispatching extant flow tokens. + if err := s.raftTransport.Start(workersCtx); err != nil { + return err + } + // Attempt to upgrade cluster version now that the sql server has been // started. At this point we know that all startupmigrations and permanent // upgrades have successfully been run so it is safe to upgrade to the diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 53a677e04359..ae214cbffaa2 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -984,6 +984,9 @@ type Engine interface { // Used to show the store ID in logs and to initialize the shared object // creator ID (if shared object storage is configured). SetStoreID(ctx context.Context, storeID int32) error + + // GetStoreID is used to retrieve the configured store ID. + GetStoreID() (int32, error) } // Batch is the interface for batch specific operations. diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index df1cde671daa..f696176823ad 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -859,6 +859,21 @@ func (p *Pebble) SetStoreID(ctx context.Context, storeID int32) error { return nil } +// GetStoreID returns to configured store ID. +func (p *Pebble) GetStoreID() (int32, error) { + if p == nil { + return 0, errors.AssertionFailedf("GetStoreID requires non-nil Pebble") + } + if p.storeIDPebbleLog == nil { + return 0, errors.AssertionFailedf("GetStoreID requires an initialized store ID container") + } + storeID := p.storeIDPebbleLog.Get() + if storeID == 0 { + return 0, errors.AssertionFailedf("GetStoreID must be called after calling SetStoreID") + } + return storeID, nil +} + // ResolveEncryptedEnvOptions creates the EncryptionEnv and associated file // registry if this store has encryption-at-rest enabled; otherwise returns a // nil EncryptionEnv. diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index 00ddddb11aa2..6effa04c3120 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -117,6 +117,7 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "ReplicaID": {}, "ReplicaType": {}, "StoreID": {}, + "StoreIDSlice": {}, "TenantID": {}, "TransactionStatus": {}, }, diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index b12f8e1f195b..bae737636ac5 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -487,3 +487,17 @@ func (m *testMetricsProvider) setMetricsForStores(stores []int32, metrics pebble }) } } + +type noopOnLogEntryAdmitted struct{} + +func (n *noopOnLogEntryAdmitted) AdmittedLogEntry( + context.Context, + roachpb.NodeID, + admissionpb.WorkPriority, + roachpb.StoreID, + roachpb.RangeID, + LogPosition, +) { +} + +var _ OnLogEntryAdmitted = &noopOnLogEntryAdmitted{} diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 44c5c0a8cb77..5d3ca55c968e 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -855,7 +855,16 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { } else { // NB: We don't use grant chains for store tokens, so they don't apply // to replicated writes. - + if log.V(1) { + log.Infof(q.ambientCtx, "async-path: len(waiting-work)=%d dequeued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t", + tenant.waitingWorkHeap.Len(), + tenant.id, item.priority, + item.replicated.RangeID, + item.replicated.Origin, + item.replicated.LogPosition, + item.replicated.Ingested, + ) + } defer releaseWaitingWork(item) q.onAdmittedReplicatedWork.admittedReplicatedWork( roachpb.MustMakeTenantID(tenant.id), @@ -1968,6 +1977,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( // have a separate goroutine invoke these callbacks (without holding // coord.mu). We could directly invoke here too if not holding the lock. q.onLogEntryAdmitted.AdmittedLogEntry( + q.q[wc].ambientCtx, rwi.Origin, pri, q.storeID, @@ -1982,6 +1992,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( // post-admission bookkeeping. type OnLogEntryAdmitted interface { AdmittedLogEntry( + ctx context.Context, origin roachpb.NodeID, /* node where the entry originated */ pri admissionpb.WorkPriority, /* admission priority of the entry */ storeID roachpb.StoreID, /* store on which the entry was admitted */ @@ -1997,7 +2008,12 @@ type NoopOnLogEntryAdmitted struct{} var _ OnLogEntryAdmitted = &NoopOnLogEntryAdmitted{} func (n *NoopOnLogEntryAdmitted) AdmittedLogEntry( - roachpb.NodeID, admissionpb.WorkPriority, roachpb.StoreID, roachpb.RangeID, LogPosition, + context.Context, + roachpb.NodeID, + admissionpb.WorkPriority, + roachpb.StoreID, + roachpb.RangeID, + LogPosition, ) { }