diff --git a/AUTHORS b/AUTHORS index b8b8b47ee953..9d42915ddad7 100644 --- a/AUTHORS +++ b/AUTHORS @@ -405,6 +405,7 @@ Steven Hand <@cockroachlabs.com> hand <@cockroachlabs.com> hand-crdb <@cockroach Steven Hubbard Sumeer Bhola sumeerbhola sum12 +Suraj Rao Syd <324760805@qq.com> Takuya Kuwahara Tamir Duberstein <@cockroachlabs.com> diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index a17b2b2e846d..994bf41ebe58 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -68,8 +68,8 @@ func createTenantNode( versionStr, err := fetchCockroachVersion(ctx, t.L(), c, n[0]) v := version.MustParse(versionStr) require.NoError(t, err) - // Tenant scoped certificates were introduced in version 22.1. - if v.AtLeast(version.MustParse("v22.1.0")) { + // Tenant scoped certificates were introduced in version 22.2. + if v.AtLeast(version.MustParse("v22.2.0")) { tn.recreateClientCertsWithTenantScope(ctx, c) } tn.createTenantCert(ctx, t, c) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 6d840203e246..cb6f71799d48 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1084,11 +1084,11 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam tc.mu.Lock() defer tc.mu.Unlock() // The transaction must not have already been used in this epoch. - if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() { + if tc.hasPerformedReadsLocked() { return errors.WithContextTags(errors.AssertionFailedf( "cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx) } - if tc.mu.txn.Sequence != 0 { + if tc.hasPerformedWritesLocked() { return errors.WithContextTags(errors.AssertionFailedf( "cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx) } @@ -1406,3 +1406,25 @@ func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { tc.mu.txnState = txnPending } } + +// HasPerformedReads is part of the TxnSender interface. +func (tc *TxnCoordSender) HasPerformedReads() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.hasPerformedReadsLocked() +} + +// HasPerformedWrites is part of the TxnSender interface. +func (tc *TxnCoordSender) HasPerformedWrites() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.hasPerformedWritesLocked() +} + +func (tc *TxnCoordSender) hasPerformedReadsLocked() bool { + return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() +} + +func (tc *TxnCoordSender) hasPerformedWritesLocked() bool { + return tc.mu.txn.Sequence != 0 +} diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 530e2808c6fc..918fac8ed656 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -565,6 +565,42 @@ var ( Measurement: "Bytes", Unit: metric.Unit_COUNT, } + metaRangeSnapshotUnknownRcvdBytes = metric.Metadata{ + Name: "range.snapshots.unknown.rcvd-bytes", + Help: "Number of unknown snapshot bytes received", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotUnknownSentBytes = metric.Metadata{ + Name: "range.snapshots.unknown.sent-bytes", + Help: "Number of unknown snapshot bytes sent", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRebalancingRcvdBytes = metric.Metadata{ + Name: "range.snapshots.rebalancing.rcvd-bytes", + Help: "Number of rebalancing snapshot bytes received", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRebalancingSentBytes = metric.Metadata{ + Name: "range.snapshots.rebalancing.sent-bytes", + Help: "Number of rebalancing snapshot bytes sent", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRecoveryRcvdBytes = metric.Metadata{ + Name: "range.snapshots.recovery.rcvd-bytes", + Help: "Number of recovery snapshot bytes received", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRecoverySentBytes = metric.Metadata{ + Name: "range.snapshots.recovery.sent-bytes", + Help: "Number of recovery snapshot bytes sent", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } metaRangeRaftLeaderTransfers = metric.Metadata{ Name: "range.raftleadertransfers", Help: "Number of raft leader transfers", @@ -1524,6 +1560,12 @@ type StoreMetrics struct { RangeSnapshotsAppliedByNonVoters *metric.Counter RangeSnapshotRcvdBytes *metric.Counter RangeSnapshotSentBytes *metric.Counter + RangeSnapshotUnknownRcvdBytes *metric.Counter + RangeSnapshotUnknownSentBytes *metric.Counter + RangeSnapshotRecoveryRcvdBytes *metric.Counter + RangeSnapshotRecoverySentBytes *metric.Counter + RangeSnapshotRebalancingRcvdBytes *metric.Counter + RangeSnapshotRebalancingSentBytes *metric.Counter // Raft processing metrics. RaftTicks *metric.Counter @@ -1973,6 +2015,12 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotsAppliedByNonVoters: metric.NewCounter(metaRangeSnapshotsAppliedByNonVoter), RangeSnapshotRcvdBytes: metric.NewCounter(metaRangeSnapshotRcvdBytes), RangeSnapshotSentBytes: metric.NewCounter(metaRangeSnapshotSentBytes), + RangeSnapshotUnknownRcvdBytes: metric.NewCounter(metaRangeSnapshotUnknownRcvdBytes), + RangeSnapshotUnknownSentBytes: metric.NewCounter(metaRangeSnapshotUnknownSentBytes), + RangeSnapshotRecoveryRcvdBytes: metric.NewCounter(metaRangeSnapshotRecoveryRcvdBytes), + RangeSnapshotRecoverySentBytes: metric.NewCounter(metaRangeSnapshotRecoverySentBytes), + RangeSnapshotRebalancingRcvdBytes: metric.NewCounter(metaRangeSnapshotRebalancingRcvdBytes), + RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes), RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries), diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index ce524e166fdf..23633f02912b 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -775,7 +774,7 @@ func (t *RaftTransport) SendSnapshot( snap *OutgoingSnapshot, newBatch func() storage.Batch, sent func(), - bytesSentCounter *metric.Counter, + recordBytesSent snapshotRecordMetrics, ) error { nodeID := header.RaftMessageRequest.ToReplica.NodeID @@ -794,7 +793,7 @@ func (t *RaftTransport) SendSnapshot( log.Warningf(ctx, "failed to close snapshot stream: %+v", err) } }() - return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, bytesSentCounter) + return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, recordBytesSent) } // DelegateSnapshot creates a rpc stream between the leaseholder and the diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c37e3a8edbc2..c9107829bdac 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2767,6 +2767,21 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + recordBytesSent := func(inc int64) { + r.store.metrics.RangeSnapshotSentBytes.Inc(inc) + + switch header.Priority { + case kvserverpb.SnapshotRequest_RECOVERY: + r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc) + case kvserverpb.SnapshotRequest_REBALANCE: + r.store.metrics.RangeSnapshotRebalancingSentBytes.Inc(inc) + default: + // If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of + // type UNKNOWN. + r.store.metrics.RangeSnapshotUnknownSentBytes.Inc(inc) + } + } + err = contextutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { return r.store.cfg.Transport.SendSnapshot( @@ -2776,7 +2791,7 @@ func (r *Replica) followerSendSnapshot( snap, newBatchFn, sent, - r.store.metrics.RangeSnapshotSentBytes, + recordBytesSent, ) }, ) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 70bab6b8aa10..a965a3651801 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -464,6 +465,8 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { }) } +// In addition to testing Raft snapshots to non-voters, this test also verifies +// that recorded metrics for Raft snapshot bytes sent are also accurate. func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { skip.UnderShort(t, "this test sleeps for a few seconds") @@ -486,6 +489,19 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // below. ltk.storeKnobs.DisableRaftSnapshotQueue = true + // Synchronize on the moment before the snapshot gets sent so we can measure + // the state at that time & gather metrics. + blockUntilSnapshotSendCh := make(chan struct{}) + blockSnapshotSendCh := make(chan struct{}) + ltk.storeKnobs.SendSnapshot = func() { + close(blockUntilSnapshotSendCh) + select { + case <-blockSnapshotSendCh: + case <-time.After(10 * time.Second): + return + } + } + tc := testcluster.StartTestCluster( t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, @@ -496,6 +512,10 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { scratchStartKey := tc.ScratchRange(t) g, ctx := errgroup.WithContext(ctx) + // Record the snapshot metrics before anything has been sent / received. + senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + // Add a new voting replica, but don't initialize it. Note that // `tc.AddNonVoters` will not return until the newly added non-voter is // initialized, which we will do below via the snapshot queue. @@ -552,7 +572,51 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { } return nil }) + + // Wait until the snapshot is about to be sent before calculating what the + // snapshot size should be. This allows our snapshot measurement to account + // for any state changes that happen between calling AddNonVoters and the + // snapshot being sent. + <-blockUntilSnapshotSendCh + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, tc.Server(1).GetFirstStoreID()) + require.NoError(t, err) + + close(blockSnapshotSendCh) require.NoError(t, g.Wait()) + + // Record the snapshot metrics for the sender after the raft snapshot was sent. + senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) + + // Asserts that the raft snapshot (aka recovery snapshot) bytes sent have been + // recorded and that it was not double counted in a different metric. + senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter) + + senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0} + senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: snapshotLength, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, senderTotalExpected, senderTotalDelta) + require.Equal(t, senderMapExpected, senderMapDelta) + + // Record the snapshot metrics for the receiver after the raft snapshot was + // received. + receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) + + // Asserts that the raft snapshot (aka recovery snapshot) bytes received have + // been recorded and that it was not double counted in a different metric. + receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter) + + receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength} + receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: snapshotLength}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, receiverTotalExpected, receiverTotalDelta) + require.Equal(t, receiverMapExpected, receiverMapDelta) } func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient, drainingNodeID int) { @@ -1533,3 +1597,210 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { require.False(t, desc.Replicas().InAtomicReplicationChange(), desc) } } + +type snapshotBytesMetrics struct { + sentBytes int64 + rcvdBytes int64 +} + +// getSnapshotBytesMetrics returns metrics on the number of snapshot bytes sent +// and received by a server. tc and serverIdx specify the index of the target +// server on the TestCluster TC. The function returns the total number of +// snapshot bytes sent/received, as well as a map with granular metrics on the +// number of snapshot bytes sent and received for each type of snapshot. The +// return value is of the form (totalBytes, granularMetrics), where totalBytes +// is a `snapshotBytesMetrics` struct containing the total bytes sent/received, +// and granularMetrics is the map mentioned above. +func getSnapshotBytesMetrics( + t *testing.T, tc *testcluster.TestCluster, serverIdx int, +) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) { + granularMetrics := make(map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) + + granularMetrics[kvserverpb.SnapshotRequest_UNKNOWN] = snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.rcvd-bytes"), + } + granularMetrics[kvserverpb.SnapshotRequest_RECOVERY] = snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.rcvd-bytes"), + } + granularMetrics[kvserverpb.SnapshotRequest_REBALANCE] = snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.rcvd-bytes"), + } + + totalBytes := snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.sent-bytes"), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rcvd-bytes"), + } + + return totalBytes, granularMetrics +} + +// getSnapshotMetricsDiff returns the delta between snapshot byte metrics +// recorded at different times. Metrics can be recorded using the +// getSnapshotBytesMetrics helper function, and the delta is returned in the +// form (totalBytes, granularMetrics). totalBytes is a +// snapshotBytesMetrics struct containing the difference in total bytes +// sent/received, and granularMetrics is the map of snapshotBytesMetrics structs +// containing deltas for each type of snapshot. +func getSnapshotMetricsDiff( + beforeTotal snapshotBytesMetrics, + beforeMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics, + afterTotal snapshotBytesMetrics, + afterMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics, +) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) { + diffTotal := snapshotBytesMetrics{ + sentBytes: afterTotal.sentBytes - beforeTotal.sentBytes, + rcvdBytes: afterTotal.rcvdBytes - beforeTotal.rcvdBytes, + } + diffMap := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: { + sentBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes, + rcvdBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes, + }, + kvserverpb.SnapshotRequest_RECOVERY: { + sentBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes, + rcvdBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes, + }, + kvserverpb.SnapshotRequest_UNKNOWN: { + sentBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes, + rcvdBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes, + }, + } + + return diffTotal, diffMap +} + +// This function returns the number of bytes sent for a snapshot. It follows the +// sending logic of kvBatchSnapshotStrategy.Send() but has one key difference. +// +// NB: This calculation assumes the snapshot size is less than +// `kv.snapshot_sender.batch_size` and will fit in a single storage.Batch. +func getExpectedSnapshotSizeBytes( + ctx context.Context, + originStore *kvserver.Store, + originRepl *kvserver.Replica, + snapType kvserverpb.SnapshotRequest_Type, + recipientStoreID roachpb.StoreID, +) (int64, error) { + snap, err := originRepl.GetSnapshot(ctx, snapType, recipientStoreID) + if err != nil { + return 0, err + } + defer snap.Close() + + totalBytes := int64(0) + var b storage.Batch + defer func() { + b.Close() + }() + b = originStore.Engine().NewUnindexedBatch(true) + for iter := snap.Iter; ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return 0, err + } else if !ok { + break + } + unsafeKey := iter.UnsafeKey() + unsafeValue := iter.UnsafeValue() + + if err := b.PutEngineKey(unsafeKey, unsafeValue); err != nil { + return 0, err + } + } + totalBytes += int64(b.Len()) + + return totalBytes, nil +} + +// Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and +// 'range.snapshots.rebalancing.sent-bytes' metrics. This test adds a new +// replica to a cluster, and during the process, a learner snapshot is sent to +// the new replica. +func TestRebalancingSnapshotMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.DisableRaftSnapshotQueue = true + + // Synchronize on the moment before the snapshot gets sent so we can measure + // the state at that time. + blockUntilSnapshotSendCh := make(chan struct{}) + blockSnapshotSendCh := make(chan struct{}) + ltk.storeKnobs.SendSnapshot = func() { + close(blockUntilSnapshotSendCh) + select { + case <-blockSnapshotSendCh: + case <-time.After(10 * time.Second): + return + } + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + scratchStartKey := tc.ScratchRange(t) + + // Record the snapshot metrics before anything has been sent / received. + senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + _, err := tc.AddVoters(scratchStartKey, tc.Target(1)) + return err + }) + + // Wait until the snapshot is about to be sent before calculating what the + // snapshot size should be. This allows our snapshot measurement to account + // for any state changes that happen between calling AddVoters and the + // snapshot being sent. + <-blockUntilSnapshotSendCh + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL, tc.Server(1).GetFirstStoreID()) + require.NoError(t, err) + + close(blockSnapshotSendCh) + require.NoError(t, g.Wait()) + + // Record the snapshot metrics for the sender after a voter has been added. A + // learner snapshot should have been sent from the sender to the receiver. + senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) + + // Asserts that the learner snapshot (aka rebalancing snapshot) bytes sent + // have been recorded and that it was not double counted in a different + // metric. + senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter) + + senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0} + senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: snapshotLength, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, senderTotalExpected, senderTotalDelta) + require.Equal(t, senderMapExpected, senderMapDelta) + + // Record the snapshot metrics for the receiver after a voter has been added. + receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) + + // Asserts that the learner snapshot (aka rebalancing snapshot) bytes received + // have been recorded and that it was not double counted in a different + // metric. + receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter) + + receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength} + receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ + kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: snapshotLength}, + kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0}, + kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, receiverTotalExpected, receiverTotalDelta) + require.Equal(t, receiverMapExpected, receiverMapDelta) +} diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 19a104a6018d..16503188b10c 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -160,6 +160,11 @@ func (s *Store) HandleDelegatedSnapshot( ) error { ctx = s.AnnotateCtx(ctx) const name = "storage.Store: handle snapshot delegation" + + if fn := s.cfg.TestingKnobs.SendSnapshot; fn != nil { + fn() + } + return s.stopper.RunTaskWithErr( ctx, name, func(ctx context.Context) error { sender, err := s.GetReplica(req.RangeID) diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index ce855ee11643..30bea72998ef 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -72,6 +71,11 @@ type outgoingDelegatedStream interface { Recv() (*kvserverpb.DelegateSnapshotResponse, error) } +// snapshotRecordMetrics is a wrapper function that increments a set of metrics +// related to the number of snapshot bytes sent/received. The definer of the +// function specifies which metrics are incremented. +type snapshotRecordMetrics func(inc int64) + // snapshotStrategy is an approach to sending and receiving Range snapshots. // Each implementation corresponds to a SnapshotRequest_Strategy, and it is // expected that the implementation that matches the Strategy specified in the @@ -83,7 +87,7 @@ type snapshotStrategy interface { context.Context, incomingSnapshotStream, kvserverpb.SnapshotRequest_Header, - *metric.Counter, + snapshotRecordMetrics, ) (IncomingSnapshot, error) // Send streams SnapshotRequests created from the OutgoingSnapshot in to the @@ -93,7 +97,7 @@ type snapshotStrategy interface { outgoingSnapshotStream, kvserverpb.SnapshotRequest_Header, *OutgoingSnapshot, - *metric.Counter, + snapshotRecordMetrics, ) (int64, error) // Status provides a status report on the work performed during the @@ -264,7 +268,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, stream incomingSnapshotStream, header kvserverpb.SnapshotRequest_Header, - bytesRcvdCounter *metric.Counter, + recordBytesReceived snapshotRecordMetrics, ) (IncomingSnapshot, error) { assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) @@ -288,7 +292,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } if req.KVBatch != nil { - bytesRcvdCounter.Inc(int64(len(req.KVBatch))) + recordBytesReceived(int64(len(req.KVBatch))) batchReader, err := storage.NewRocksDBBatchReader(req.KVBatch) if err != nil { return noSnap, errors.Wrap(err, "failed to decode batch") @@ -350,10 +354,9 @@ func (kvSS *kvBatchSnapshotStrategy) Send( stream outgoingSnapshotStream, header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, - bytesSentMetric *metric.Counter, + recordBytesSent snapshotRecordMetrics, ) (int64, error) { assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) - // bytesSent is updated as key-value batches are sent with sendBatch. It does // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). @@ -389,7 +392,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return 0, err } bytesSent += bLen - bytesSentMetric.Inc(bLen) + recordBytesSent(bLen) b.Close() b = nil } @@ -399,7 +402,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return 0, err } bytesSent += int64(b.Len()) - bytesSentMetric.Inc(int64(b.Len())) + recordBytesSent(int64(b.Len())) } kvSS.status = redact.Sprintf("kv pairs: %d", kvs) @@ -774,7 +777,22 @@ func (s *Store) receiveSnapshot( log.Infof(ctx, "accepted snapshot reservation for r%d", header.State.Desc.RangeID) } - inSnap, err := ss.Receive(ctx, stream, *header, s.metrics.RangeSnapshotRcvdBytes) + recordBytesReceived := func(inc int64) { + s.metrics.RangeSnapshotRcvdBytes.Inc(inc) + + switch header.Priority { + case kvserverpb.SnapshotRequest_RECOVERY: + s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc) + case kvserverpb.SnapshotRequest_REBALANCE: + s.metrics.RangeSnapshotRebalancingRcvdBytes.Inc(inc) + default: + // If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of + // type UNKNOWN. + s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc) + } + } + + inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived) if err != nil { return err } @@ -1141,7 +1159,7 @@ func SendEmptySnapshot( &outgoingSnap, eng.NewBatch, func() {}, - nil, /* bytesSentCounter */ + nil, /* recordBytesSent */ ) } @@ -1160,13 +1178,13 @@ func sendSnapshot( snap *OutgoingSnapshot, newBatch func() storage.Batch, sent func(), - bytesSentCounter *metric.Counter, + recordBytesSent snapshotRecordMetrics, ) error { - if bytesSentCounter == nil { + if recordBytesSent == nil { // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshot` - // with a nil counter. We pass in a fake metrics counter here that isn't + // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't // hooked up to anything. - bytesSentCounter = metric.NewCounter(metric.Metadata{Name: "range.snapshots.sent-bytes"}) + recordBytesSent = func(inc int64) {} } start := timeutil.Now() to := header.RaftMessageRequest.ToReplica @@ -1229,7 +1247,7 @@ func sendSnapshot( log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy) } - numBytesSent, err := ss.Send(ctx, stream, header, snap, bytesSentCounter) + numBytesSent, err := ss.Send(ctx, stream, header, snap, recordBytesSent) if err != nil { return err } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 467774d522b6..aadeb33a9785 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2814,7 +2814,7 @@ func TestSendSnapshotThrottling(t *testing.T) { expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} err := sendSnapshot( - ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* bytesSentCounter */ + ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) @@ -2832,7 +2832,7 @@ func TestSendSnapshotThrottling(t *testing.T) { } c := fakeSnapshotStream{resp, nil} err := sendSnapshot( - ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* bytesSentCounter */ + ctx, st, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index f2bd279ee140..16cb5ed51ba3 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -268,6 +268,9 @@ type StoreTestingKnobs struct { // in handleRaftReady to refreshReasonNewLeaderOrConfigChange. EnableUnconditionalRefreshesInRaftReady bool + // SendSnapshot is run after receiving a DelegateRaftSnapshot request but + // before any throttling or sending logic. + SendSnapshot func() // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an // error is returned from the hook, it's sent as an ERROR SnapshotResponse. diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 621d2c06712c..0af00a2da39c 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -231,6 +231,16 @@ func (m *MockTransactionalSender) GetTxnRetryableErr( func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { } +// HasPerformedReads is part of TxnSenderFactory. +func (m *MockTransactionalSender) HasPerformedReads() bool { + panic("unimplemented") +} + +// HasPerformedWrites is part of TxnSenderFactory. +func (m *MockTransactionalSender) HasPerformedWrites() bool { + panic("unimplemented") +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 1495aa6d4f80..71fc61d7590c 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -336,6 +336,12 @@ type TxnSender interface { // ClearTxnRetryableErr clears the retryable error, if any. ClearTxnRetryableErr(ctx context.Context) + + // HasPerformedReads returns true if a read has been performed. + HasPerformedReads() bool + + // HasPerformedWrites returns true if a write has been performed. + HasPerformedWrites() bool } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 38808a341309..2a6dbaf4ee05 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2585,6 +2585,9 @@ func (ex *connExecutor) setTransactionModes( return errors.AssertionFailedf("expected an evaluated AS OF timestamp") } if !asOfTs.IsEmpty() { + if err := ex.state.checkReadsAndWrites(); err != nil { + return err + } if err := ex.state.setHistoricalTimestamp(ctx, asOfTs); err != nil { return err } diff --git a/pkg/sql/logictest/testdata/logic_test/as_of b/pkg/sql/logictest/testdata/logic_test/as_of index 18cf7fdf8a81..ef020468f763 100644 --- a/pkg/sql/logictest/testdata/logic_test/as_of +++ b/pkg/sql/logictest/testdata/logic_test/as_of @@ -108,3 +108,27 @@ SELECT * FROM t AS OF SYSTEM TIME with_min_timestamp(statement_timestamp()) skipif config 3node-tenant statement error pgcode XXC01 with_max_staleness can only be used with a CCL distribution SELECT * FROM t AS OF SYSTEM TIME with_max_staleness('1s'::interval) + +statement ok +BEGIN + +statement ok +SELECT * from t + +statement error cannot set fixed timestamp, .* already performed reads +SET TRANSACTION AS OF system time '-1s' + +statement ok +ROLLBACK + +statement ok +BEGIN + +statement ok +INSERT INTO t VALUES(1) + +statement error cannot set fixed timestamp, .* already performed writes +SET TRANSACTION AS OF system time '-1s' + +statement ok +ROLLBACK diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index e59b14f2e5ea..253b756795d8 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -287,6 +289,7 @@ func (ts *txnState) setHistoricalTimestamp( ) error { ts.mu.Lock() defer ts.mu.Unlock() + if err := ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp); err != nil { return err } @@ -465,3 +468,25 @@ func (ts *txnState) consumeAdvanceInfo() advanceInfo { ts.adv = advanceInfo{} return adv } + +// checkReadsAndWrites returns an error if the transaction has performed reads +// or writes. +func (ts *txnState) checkReadsAndWrites() error { + ts.mu.Lock() + defer ts.mu.Unlock() + + if ts.mu.txn.Sender().HasPerformedReads() { + return pgerror.Newf( + pgcode.InvalidTransactionState, + "cannot set fixed timestamp, txn %s already performed reads", + ts.mu.txn) + } + + if ts.mu.txn.Sender().HasPerformedWrites() { + return pgerror.Newf( + pgcode.InvalidTransactionState, + "cannot set fixed timestamp, txn %s already performed writes", + ts.mu.txn) + } + return nil +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index ff43ef655642..d6460e9b283c 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -607,6 +607,12 @@ var charts = []sectionDescription{ Metrics: []string{ "range.snapshots.rcvd-bytes", "range.snapshots.sent-bytes", + "range.snapshots.recovery.rcvd-bytes", + "range.snapshots.recovery.sent-bytes", + "range.snapshots.rebalancing.rcvd-bytes", + "range.snapshots.rebalancing.sent-bytes", + "range.snapshots.unknown.rcvd-bytes", + "range.snapshots.unknown.sent-bytes", }, }, },