From 40151deb29884a9990d253336436dd5d4b95783c Mon Sep 17 00:00:00 2001 From: e-mbrown Date: Wed, 30 Mar 2022 13:34:45 -0400 Subject: [PATCH 1/4] kv: alter error for SET TRANSACTION AS OF SYSTEM TIME if reads or writes are already performed When a txn performed a read or write before setting up a historical timestamp a crash report was generated. This commit handles the error case. Release note: None --- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 26 +++++++++++++++++++-- pkg/kv/mock_transactional_sender.go | 10 ++++++++ pkg/kv/sender.go | 6 +++++ pkg/sql/conn_executor.go | 3 +++ pkg/sql/logictest/testdata/logic_test/as_of | 24 +++++++++++++++++++ pkg/sql/txn_state.go | 25 ++++++++++++++++++++ 6 files changed, 92 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 6d840203e246..cb6f71799d48 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1084,11 +1084,11 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam tc.mu.Lock() defer tc.mu.Unlock() // The transaction must not have already been used in this epoch. - if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() { + if tc.hasPerformedReadsLocked() { return errors.WithContextTags(errors.AssertionFailedf( "cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx) } - if tc.mu.txn.Sequence != 0 { + if tc.hasPerformedWritesLocked() { return errors.WithContextTags(errors.AssertionFailedf( "cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx) } @@ -1406,3 +1406,25 @@ func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { tc.mu.txnState = txnPending } } + +// HasPerformedReads is part of the TxnSender interface. +func (tc *TxnCoordSender) HasPerformedReads() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.hasPerformedReadsLocked() +} + +// HasPerformedWrites is part of the TxnSender interface. +func (tc *TxnCoordSender) HasPerformedWrites() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.hasPerformedWritesLocked() +} + +func (tc *TxnCoordSender) hasPerformedReadsLocked() bool { + return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() +} + +func (tc *TxnCoordSender) hasPerformedWritesLocked() bool { + return tc.mu.txn.Sequence != 0 +} diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 621d2c06712c..0af00a2da39c 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -231,6 +231,16 @@ func (m *MockTransactionalSender) GetTxnRetryableErr( func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { } +// HasPerformedReads is part of TxnSenderFactory. +func (m *MockTransactionalSender) HasPerformedReads() bool { + panic("unimplemented") +} + +// HasPerformedWrites is part of TxnSenderFactory. +func (m *MockTransactionalSender) HasPerformedWrites() bool { + panic("unimplemented") +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 1495aa6d4f80..71fc61d7590c 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -336,6 +336,12 @@ type TxnSender interface { // ClearTxnRetryableErr clears the retryable error, if any. ClearTxnRetryableErr(ctx context.Context) + + // HasPerformedReads returns true if a read has been performed. + HasPerformedReads() bool + + // HasPerformedWrites returns true if a write has been performed. + HasPerformedWrites() bool } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 38808a341309..2a6dbaf4ee05 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2585,6 +2585,9 @@ func (ex *connExecutor) setTransactionModes( return errors.AssertionFailedf("expected an evaluated AS OF timestamp") } if !asOfTs.IsEmpty() { + if err := ex.state.checkReadsAndWrites(); err != nil { + return err + } if err := ex.state.setHistoricalTimestamp(ctx, asOfTs); err != nil { return err } diff --git a/pkg/sql/logictest/testdata/logic_test/as_of b/pkg/sql/logictest/testdata/logic_test/as_of index 18cf7fdf8a81..ef020468f763 100644 --- a/pkg/sql/logictest/testdata/logic_test/as_of +++ b/pkg/sql/logictest/testdata/logic_test/as_of @@ -108,3 +108,27 @@ SELECT * FROM t AS OF SYSTEM TIME with_min_timestamp(statement_timestamp()) skipif config 3node-tenant statement error pgcode XXC01 with_max_staleness can only be used with a CCL distribution SELECT * FROM t AS OF SYSTEM TIME with_max_staleness('1s'::interval) + +statement ok +BEGIN + +statement ok +SELECT * from t + +statement error cannot set fixed timestamp, .* already performed reads +SET TRANSACTION AS OF system time '-1s' + +statement ok +ROLLBACK + +statement ok +BEGIN + +statement ok +INSERT INTO t VALUES(1) + +statement error cannot set fixed timestamp, .* already performed writes +SET TRANSACTION AS OF system time '-1s' + +statement ok +ROLLBACK diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index e59b14f2e5ea..253b756795d8 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -287,6 +289,7 @@ func (ts *txnState) setHistoricalTimestamp( ) error { ts.mu.Lock() defer ts.mu.Unlock() + if err := ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp); err != nil { return err } @@ -465,3 +468,25 @@ func (ts *txnState) consumeAdvanceInfo() advanceInfo { ts.adv = advanceInfo{} return adv } + +// checkReadsAndWrites returns an error if the transaction has performed reads +// or writes. +func (ts *txnState) checkReadsAndWrites() error { + ts.mu.Lock() + defer ts.mu.Unlock() + + if ts.mu.txn.Sender().HasPerformedReads() { + return pgerror.Newf( + pgcode.InvalidTransactionState, + "cannot set fixed timestamp, txn %s already performed reads", + ts.mu.txn) + } + + if ts.mu.txn.Sender().HasPerformedWrites() { + return pgerror.Newf( + pgcode.InvalidTransactionState, + "cannot set fixed timestamp, txn %s already performed writes", + ts.mu.txn) + } + return nil +} From 37b6ca79a7e2bc99dd30540d3c78048072c67b28 Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 1 Jun 2022 10:50:04 -0400 Subject: [PATCH 2/4] authors: add surajr10 to authors Release note: None --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 050316d880b1..2ad3e94daae4 100644 --- a/AUTHORS +++ b/AUTHORS @@ -404,6 +404,7 @@ Steven Hand <@cockroachlabs.com> hand <@cockroachlabs.com> hand-crdb <@cockroach Steven Hubbard Sumeer Bhola sumeerbhola sum12 +Suraj Rao Syd <324760805@qq.com> Takuya Kuwahara Tamir Duberstein <@cockroachlabs.com> From 3e9776d3f8f95f01de14f4809fa3c039bf35638c Mon Sep 17 00:00:00 2001 From: rimadeodhar Date: Wed, 1 Jun 2022 10:15:06 -0700 Subject: [PATCH 3/4] tests: update version check for tenant scoping This PR fixes the version gate check to add tenant scoping while creating client certs for tenant roachtests. It is erroneously gated on v22.1 when it should be v22.2. Release note: None --- pkg/cmd/roachtest/tests/multitenant_utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index a17b2b2e846d..994bf41ebe58 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -68,8 +68,8 @@ func createTenantNode( versionStr, err := fetchCockroachVersion(ctx, t.L(), c, n[0]) v := version.MustParse(versionStr) require.NoError(t, err) - // Tenant scoped certificates were introduced in version 22.1. - if v.AtLeast(version.MustParse("v22.1.0")) { + // Tenant scoped certificates were introduced in version 22.2. + if v.AtLeast(version.MustParse("v22.2.0")) { tn.recreateClientCertsWithTenantScope(ctx, c) } tn.createTenantCert(ctx, t, c) From 3a3009c9b6c76c139448ad81e318173dea781974 Mon Sep 17 00:00:00 2001 From: Ryan Zhao Date: Wed, 25 May 2022 16:12:34 -0400 Subject: [PATCH 4/4] kvserver: implement granular metrics for snapshot bytes sent/rcvd This commit adds the following new metrics - `range.snapshots.unknown.rcvd-bytes` - `range.snapshots.unknown.sent-bytes` - `range.snapshots.rebalancing.rcvd-bytes` - `range.snapshots.rebalancing.sent-bytes` - `range.snapshots.recovery.rcvd-bytes` - `range.snapshots.recovery.sent-bytes` These metrics tracks the bytes send/received for each type of snapshot (rebalance, recovery, and unknown snapshots). The sum of these three new metrics should equal the existing `range.snapshots.(sent|rcvd)-bytes` that tracks the total number of snapshot bytes sent and received. Additionally, this commit changes the `snapshotStrategy` interface such that the `Receive` and `Send` methods take `snapshotBytesMetricFn` as a parameter rather than `*metric.Counter`. Finally, this commit adds a new SendSnapshot TestingKnob that hooks into the send snapshot flow after a DelegateSnapshotRequest is handled but before any throttling or sending logic takes place. Resolves #81047 Release note (ops change): Added new 6 metrics (range.snapshots.shapshots.(unknown|recovery|rebalancing).sent-bytes and range.snapshots.shapshots.(unknown|recovery|rebalancing).rcvd-bytes) to the metrics dashboard. This will allow users to track the number of bytes sent/received for each type of metric in addition to the total bytes sent/received. --- pkg/kv/kvserver/metrics.go | 48 +++++ pkg/kv/kvserver/raft_transport.go | 5 +- pkg/kv/kvserver/replica_command.go | 17 +- pkg/kv/kvserver/replica_learner_test.go | 271 ++++++++++++++++++++++++ pkg/kv/kvserver/store_raft.go | 5 + pkg/kv/kvserver/store_snapshot.go | 50 +++-- pkg/kv/kvserver/store_test.go | 4 +- pkg/kv/kvserver/testing_knobs.go | 3 + pkg/ts/catalog/chart_catalog.go | 6 + 9 files changed, 387 insertions(+), 22 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 45bff48a0ad2..cd4ac3e7fafc 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -569,6 +569,42 @@ var ( Measurement: "Bytes", Unit: metric.Unit_COUNT, } + metaRangeSnapshotUnknownRcvdBytes = metric.Metadata{ + Name: "range.snapshots.unknown.rcvd-bytes", + Help: "Number of unknown snapshot bytes received", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotUnknownSentBytes = metric.Metadata{ + Name: "range.snapshots.unknown.sent-bytes", + Help: "Number of unknown snapshot bytes sent", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRebalancingRcvdBytes = metric.Metadata{ + Name: "range.snapshots.rebalancing.rcvd-bytes", + Help: "Number of rebalancing snapshot bytes received", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRebalancingSentBytes = metric.Metadata{ + Name: "range.snapshots.rebalancing.sent-bytes", + Help: "Number of rebalancing snapshot bytes sent", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRecoveryRcvdBytes = metric.Metadata{ + Name: "range.snapshots.recovery.rcvd-bytes", + Help: "Number of recovery snapshot bytes received", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRecoverySentBytes = metric.Metadata{ + Name: "range.snapshots.recovery.sent-bytes", + Help: "Number of recovery snapshot bytes sent", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } metaRangeRaftLeaderTransfers = metric.Metadata{ Name: "range.raftleadertransfers", Help: "Number of raft leader transfers", @@ -1522,6 +1558,12 @@ type StoreMetrics struct { RangeSnapshotsAppliedByNonVoters *metric.Counter RangeSnapshotRcvdBytes *metric.Counter RangeSnapshotSentBytes *metric.Counter + RangeSnapshotUnknownRcvdBytes *metric.Counter + RangeSnapshotUnknownSentBytes *metric.Counter + RangeSnapshotRecoveryRcvdBytes *metric.Counter + RangeSnapshotRecoverySentBytes *metric.Counter + RangeSnapshotRebalancingRcvdBytes *metric.Counter + RangeSnapshotRebalancingSentBytes *metric.Counter // Raft processing metrics. RaftTicks *metric.Counter @@ -1977,6 +2019,12 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotsAppliedByNonVoters: metric.NewCounter(metaRangeSnapshotsAppliedByNonVoter), RangeSnapshotRcvdBytes: metric.NewCounter(metaRangeSnapshotRcvdBytes), RangeSnapshotSentBytes: metric.NewCounter(metaRangeSnapshotSentBytes), + RangeSnapshotUnknownRcvdBytes: metric.NewCounter(metaRangeSnapshotUnknownRcvdBytes), + RangeSnapshotUnknownSentBytes: metric.NewCounter(metaRangeSnapshotUnknownSentBytes), + RangeSnapshotRecoveryRcvdBytes: metric.NewCounter(metaRangeSnapshotRecoveryRcvdBytes), + RangeSnapshotRecoverySentBytes: metric.NewCounter(metaRangeSnapshotRecoverySentBytes), + RangeSnapshotRebalancingRcvdBytes: metric.NewCounter(metaRangeSnapshotRebalancingRcvdBytes), + RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes), RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries), diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index ce524e166fdf..23633f02912b 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -775,7 +774,7 @@ func (t *RaftTransport) SendSnapshot( snap *OutgoingSnapshot, newBatch func() storage.Batch, sent func(), - bytesSentCounter *metric.Counter, + recordBytesSent snapshotRecordMetrics, ) error { nodeID := header.RaftMessageRequest.ToReplica.NodeID @@ -794,7 +793,7 @@ func (t *RaftTransport) SendSnapshot( log.Warningf(ctx, "failed to close snapshot stream: %+v", err) } }() - return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, bytesSentCounter) + return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, recordBytesSent) } // DelegateSnapshot creates a rpc stream between the leaseholder and the diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c37e3a8edbc2..c9107829bdac 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2767,6 +2767,21 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + recordBytesSent := func(inc int64) { + r.store.metrics.RangeSnapshotSentBytes.Inc(inc) + + switch header.Priority { + case kvserverpb.SnapshotRequest_RECOVERY: + r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc) + case kvserverpb.SnapshotRequest_REBALANCE: + r.store.metrics.RangeSnapshotRebalancingSentBytes.Inc(inc) + default: + // If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of + // type UNKNOWN. + r.store.metrics.RangeSnapshotUnknownSentBytes.Inc(inc) + } + } + err = contextutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { return r.store.cfg.Transport.SendSnapshot( @@ -2776,7 +2791,7 @@ func (r *Replica) followerSendSnapshot( snap, newBatchFn, sent, - r.store.metrics.RangeSnapshotSentBytes, + recordBytesSent, ) }, ) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 70bab6b8aa10..a965a3651801 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -464,6 +465,8 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { }) } +// In addition to testing Raft snapshots to non-voters, this test also verifies +// that recorded metrics for Raft snapshot bytes sent are also accurate. func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { skip.UnderShort(t, "this test sleeps for a few seconds") @@ -486,6 +489,19 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // below. ltk.storeKnobs.DisableRaftSnapshotQueue = true + // Synchronize on the moment before the snapshot gets sent so we can measure + // the state at that time & gather metrics. + blockUntilSnapshotSendCh := make(chan struct{}) + blockSnapshotSendCh := make(chan struct{}) + ltk.storeKnobs.SendSnapshot = func() { + close(blockUntilSnapshotSendCh) + select { + case <-blockSnapshotSendCh: + case <-time.After(10 * time.Second): + return + } + } + tc := testcluster.StartTestCluster( t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, @@ -496,6 +512,10 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { scratchStartKey := tc.ScratchRange(t) g, ctx := errgroup.WithContext(ctx) + // Record the snapshot metrics before anything has been sent / received. + senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + // Add a new voting replica, but don't initialize it. Note that // `tc.AddNonVoters` will not return until the newly added non-voter is // initialized, which we will do below via the snapshot queue. @@ -552,7 +572,51 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { } return nil }) + + // Wait until the snapshot is about to be sent before calculating what the + // snapshot size should be. This allows our snapshot measurement to account + // for any state changes that happen between calling AddNonVoters and the + // snapshot being sent. + <-blockUntilSnapshotSendCh + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, tc.Server(1).GetFirstStoreID()) + require.NoError(t, err) + + close(blockSnapshotSendCh) require.NoError(t, g.Wait()) + + // Record the snapshot metrics for the sender after the raft snapshot was sent. + senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) + + // Asserts that the raft snapshot (aka recovery snapshot) bytes sent have been + // recorded and that it was not double counted in a different metric. + senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter) + + senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0} + senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: snapshotLength, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, senderTotalExpected, senderTotalDelta) + require.Equal(t, senderMapExpected, senderMapDelta) + + // Record the snapshot metrics for the receiver after the raft snapshot was + // received. + receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) + + // Asserts that the raft snapshot (aka recovery snapshot) bytes received have + // been recorded and that it was not double counted in a different metric. + receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter) + + receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength} + receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: snapshotLength}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, receiverTotalExpected, receiverTotalDelta) + require.Equal(t, receiverMapExpected, receiverMapDelta) } func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient, drainingNodeID int) { @@ -1533,3 +1597,210 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { require.False(t, desc.Replicas().InAtomicReplicationChange(), desc) } } + +type snapshotBytesMetrics struct { + sentBytes int64 + rcvdBytes int64 +} + +// getSnapshotBytesMetrics returns metrics on the number of snapshot bytes sent +// and received by a server. tc and serverIdx specify the index of the target +// server on the TestCluster TC. The function returns the total number of +// snapshot bytes sent/received, as well as a map with granular metrics on the +// number of snapshot bytes sent and received for each type of snapshot. The +// return value is of the form (totalBytes, granularMetrics), where totalBytes +// is a `snapshotBytesMetrics` struct containing the total bytes sent/received, +// and granularMetrics is the map mentioned above. +func getSnapshotBytesMetrics( + t *testing.T, tc *testcluster.TestCluster, serverIdx int, +) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) { + granularMetrics := make(map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) + + granularMetrics[kvserverpb.SnapshotRequest_UNKNOWN] = snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.rcvd-bytes"), + } + granularMetrics[kvserverpb.SnapshotRequest_RECOVERY] = snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.rcvd-bytes"), + } + granularMetrics[kvserverpb.SnapshotRequest_REBALANCE] = snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.rcvd-bytes"), + } + + totalBytes := snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rcvd-bytes"), + } + + return totalBytes, granularMetrics +} + +// getSnapshotMetricsDiff returns the delta between snapshot byte metrics +// recorded at different times. Metrics can be recorded using the +// getSnapshotBytesMetrics helper function, and the delta is returned in the +// form (totalBytes, granularMetrics). totalBytes is a +// snapshotBytesMetrics struct containing the difference in total bytes +// sent/received, and granularMetrics is the map of snapshotBytesMetrics structs +// containing deltas for each type of snapshot. +func getSnapshotMetricsDiff( + beforeTotal snapshotBytesMetrics, + beforeMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics, + afterTotal snapshotBytesMetrics, + afterMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics, +) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) { + diffTotal := snapshotBytesMetrics{ + sentBytes: afterTotal.sentBytes - beforeTotal.sentBytes, + rcvdBytes: afterTotal.rcvdBytes - beforeTotal.rcvdBytes, + } + diffMap := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: { + sentBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes, + rcvdBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes, + }, + kvserverpb.SnapshotRequest_RECOVERY: { + sentBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes, + rcvdBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes, + }, + kvserverpb.SnapshotRequest_UNKNOWN: { + sentBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes, + rcvdBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes, + }, + } + + return diffTotal, diffMap +} + +// This function returns the number of bytes sent for a snapshot. It follows the +// sending logic of kvBatchSnapshotStrategy.Send() but has one key difference. +// +// NB: This calculation assumes the snapshot size is less than +// `kv.snapshot_sender.batch_size` and will fit in a single storage.Batch. +func getExpectedSnapshotSizeBytes( + ctx context.Context, + originStore *kvserver.Store, + originRepl *kvserver.Replica, + snapType kvserverpb.SnapshotRequest_Type, + recipientStoreID roachpb.StoreID, +) (int64, error) { + snap, err := originRepl.GetSnapshot(ctx, snapType, recipientStoreID) + if err != nil { + return 0, err + } + defer snap.Close() + + totalBytes := int64(0) + var b storage.Batch + defer func() { + b.Close() + }() + b = originStore.Engine().NewUnindexedBatch(true) + for iter := snap.Iter; ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return 0, err + } else if !ok { + break + } + unsafeKey := iter.UnsafeKey() + unsafeValue := iter.UnsafeValue() + + if err := b.PutEngineKey(unsafeKey, unsafeValue); err != nil { + return 0, err + } + } + totalBytes += int64(b.Len()) + + return totalBytes, nil +} + +// Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and +// 'range.snapshots.rebalancing.sent-bytes' metrics. This test adds a new +// replica to a cluster, and during the process, a learner snapshot is sent to +// the new replica. +func TestRebalancingSnapshotMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.DisableRaftSnapshotQueue = true + + // Synchronize on the moment before the snapshot gets sent so we can measure + // the state at that time. + blockUntilSnapshotSendCh := make(chan struct{}) + blockSnapshotSendCh := make(chan struct{}) + ltk.storeKnobs.SendSnapshot = func() { + close(blockUntilSnapshotSendCh) + select { + case <-blockSnapshotSendCh: + case <-time.After(10 * time.Second): + return + } + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + scratchStartKey := tc.ScratchRange(t) + + // Record the snapshot metrics before anything has been sent / received. + senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + _, err := tc.AddVoters(scratchStartKey, tc.Target(1)) + return err + }) + + // Wait until the snapshot is about to be sent before calculating what the + // snapshot size should be. This allows our snapshot measurement to account + // for any state changes that happen between calling AddVoters and the + // snapshot being sent. + <-blockUntilSnapshotSendCh + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL, tc.Server(1).GetFirstStoreID()) + require.NoError(t, err) + + close(blockSnapshotSendCh) + require.NoError(t, g.Wait()) + + // Record the snapshot metrics for the sender after a voter has been added. A + // learner snapshot should have been sent from the sender to the receiver. + senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) + + // Asserts that the learner snapshot (aka rebalancing snapshot) bytes sent + // have been recorded and that it was not double counted in a different + // metric. + senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter) + + senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0} + senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: snapshotLength, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, senderTotalExpected, senderTotalDelta) + require.Equal(t, senderMapExpected, senderMapDelta) + + // Record the snapshot metrics for the receiver after a voter has been added. + receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) + + // Asserts that the learner snapshot (aka rebalancing snapshot) bytes received + // have been recorded and that it was not double counted in a different + // metric. + receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter) + + receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength} + receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: snapshotLength}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, receiverTotalExpected, receiverTotalDelta) + require.Equal(t, receiverMapExpected, receiverMapDelta) +} diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 19a104a6018d..16503188b10c 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -160,6 +160,11 @@ func (s *Store) HandleDelegatedSnapshot( ) error { ctx = s.AnnotateCtx(ctx) const name = "storage.Store: handle snapshot delegation" + + if fn := s.cfg.TestingKnobs.SendSnapshot; fn != nil { + fn() + } + return s.stopper.RunTaskWithErr( ctx, name, func(ctx context.Context) error { sender, err := s.GetReplica(req.RangeID) diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index ce855ee11643..30bea72998ef 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -72,6 +71,11 @@ type outgoingDelegatedStream interface { Recv() (*kvserverpb.DelegateSnapshotResponse, error) } +// snapshotRecordMetrics is a wrapper function that increments a set of metrics +// related to the number of snapshot bytes sent/received. The definer of the +// function specifies which metrics are incremented. +type snapshotRecordMetrics func(inc int64) + // snapshotStrategy is an approach to sending and receiving Range snapshots. // Each implementation corresponds to a SnapshotRequest_Strategy, and it is // expected that the implementation that matches the Strategy specified in the @@ -83,7 +87,7 @@ type snapshotStrategy interface { context.Context, incomingSnapshotStream, kvserverpb.SnapshotRequest_Header, - *metric.Counter, + snapshotRecordMetrics, ) (IncomingSnapshot, error) // Send streams SnapshotRequests created from the OutgoingSnapshot in to the @@ -93,7 +97,7 @@ type snapshotStrategy interface { outgoingSnapshotStream, kvserverpb.SnapshotRequest_Header, *OutgoingSnapshot, - *metric.Counter, + snapshotRecordMetrics, ) (int64, error) // Status provides a status report on the work performed during the @@ -264,7 +268,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, stream incomingSnapshotStream, header kvserverpb.SnapshotRequest_Header, - bytesRcvdCounter *metric.Counter, + recordBytesReceived snapshotRecordMetrics, ) (IncomingSnapshot, error) { assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) @@ -288,7 +292,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } if req.KVBatch != nil { - bytesRcvdCounter.Inc(int64(len(req.KVBatch))) + recordBytesReceived(int64(len(req.KVBatch))) batchReader, err := storage.NewRocksDBBatchReader(req.KVBatch) if err != nil { return noSnap, errors.Wrap(err, "failed to decode batch") @@ -350,10 +354,9 @@ func (kvSS *kvBatchSnapshotStrategy) Send( stream outgoingSnapshotStream, header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, - bytesSentMetric *metric.Counter, + recordBytesSent snapshotRecordMetrics, ) (int64, error) { assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) - // bytesSent is updated as key-value batches are sent with sendBatch. It does // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). @@ -389,7 +392,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return 0, err } bytesSent += bLen - bytesSentMetric.Inc(bLen) + recordBytesSent(bLen) b.Close() b = nil } @@ -399,7 +402,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return 0, err } bytesSent += int64(b.Len()) - bytesSentMetric.Inc(int64(b.Len())) + recordBytesSent(int64(b.Len())) } kvSS.status = redact.Sprintf("kv pairs: %d", kvs) @@ -774,7 +777,22 @@ func (s *Store) receiveSnapshot( log.Infof(ctx, "accepted snapshot reservation for r%d", header.State.Desc.RangeID) } - inSnap, err := ss.Receive(ctx, stream, *header, s.metrics.RangeSnapshotRcvdBytes) + recordBytesReceived := func(inc int64) { + s.metrics.RangeSnapshotRcvdBytes.Inc(inc) + + switch header.Priority { + case kvserverpb.SnapshotRequest_RECOVERY: + s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc) + case kvserverpb.SnapshotRequest_REBALANCE: + s.metrics.RangeSnapshotRebalancingRcvdBytes.Inc(inc) + default: + // If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of + // type UNKNOWN. + s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc) + } + } + + inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived) if err != nil { return err } @@ -1141,7 +1159,7 @@ func SendEmptySnapshot( &outgoingSnap, eng.NewBatch, func() {}, - nil, /* bytesSentCounter */ + nil, /* recordBytesSent */ ) } @@ -1160,13 +1178,13 @@ func sendSnapshot( snap *OutgoingSnapshot, newBatch func() storage.Batch, sent func(), - bytesSentCounter *metric.Counter, + recordBytesSent snapshotRecordMetrics, ) error { - if bytesSentCounter == nil { + if recordBytesSent == nil { // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshot` - // with a nil counter. We pass in a fake metrics counter here that isn't + // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't // hooked up to anything. - bytesSentCounter = metric.NewCounter(metric.Metadata{Name: "range.snapshots.sent-bytes"}) + recordBytesSent = func(inc int64) {} } start := timeutil.Now() to := header.RaftMessageRequest.ToReplica @@ -1229,7 +1247,7 @@ func sendSnapshot( log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy) } - numBytesSent, err := ss.Send(ctx, stream, header, snap, bytesSentCounter) + numBytesSent, err := ss.Send(ctx, stream, header, snap, recordBytesSent) if err != nil { return err } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 2cb131e961f6..1d1bb595f0bb 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2814,7 +2814,7 @@ func TestSendSnapshotThrottling(t *testing.T) { expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} err := sendSnapshot( - ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* bytesSentCounter */ + ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) @@ -2832,7 +2832,7 @@ func TestSendSnapshotThrottling(t *testing.T) { } c := fakeSnapshotStream{resp, nil} err := sendSnapshot( - ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* bytesSentCounter */ + ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index f2bd279ee140..16cb5ed51ba3 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -268,6 +268,9 @@ type StoreTestingKnobs struct { // in handleRaftReady to refreshReasonNewLeaderOrConfigChange. EnableUnconditionalRefreshesInRaftReady bool + // SendSnapshot is run after receiving a DelegateRaftSnapshot request but + // before any throttling or sending logic. + SendSnapshot func() // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an // error is returned from the hook, it's sent as an ERROR SnapshotResponse. diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 6a40f28ccbd3..13b7d039ee48 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -609,6 +609,12 @@ var charts = []sectionDescription{ Metrics: []string{ "range.snapshots.rcvd-bytes", "range.snapshots.sent-bytes", + "range.snapshots.recovery.rcvd-bytes", + "range.snapshots.recovery.sent-bytes", + "range.snapshots.rebalancing.rcvd-bytes", + "range.snapshots.rebalancing.sent-bytes", + "range.snapshots.unknown.rcvd-bytes", + "range.snapshots.unknown.sent-bytes", }, }, },