Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
79090: kv: alter error for SET TRANSACTION AS OF SYSTEM TIME r=e-mbrown a=e-mbrown

if reads or writes  are already performed

Resolves #77265 

When a txn performed a read or write before setting up a historical timestamp a crash report was generated. This commit handles the error case.

Release note: None

81860: kvserver: implement granular metrics for snapshot recovery and rebalance r=KnightAsterial a=KnightAsterial

This commit adds the following new metrics
- `range.snapshots.unknown.rcvd-bytes`
- `range.snapshots.unknown.sent-bytes`
- `range.snapshots.rebalancing.rcvd-bytes`
- `range.snapshots.rebalancing.sent-bytes`
- `range.snapshots.recovery.rcvd-bytes`
- `range.snapshots.recovery.sent-bytes`
These metrics tracks the bytes send/received for each type of
snapshot (rebalance, recovery, and unknown snapshots).
The sum of these three new metrics should equal the existing
`range.snapshots.(sent|rcvd)-bytes` that tracks the total number
of snapshot bytes sent and received.

Additionally, this commit changes the `snapshotStrategy` interface
such that the `Receive` and `Send` methods take
`snapshotBytesMetricFn` as a parameter rather than `*metric.Counter`.

Finally, this commit adds a new SendSnapshot TestingKnob that hooks
into the send snapshot flow after a DelegateSnapshotRequest is
handled but before any throttling or sending logic takes place.

Resolves #81047

Release note (ops change): Added new 6 metrics
(`range.snapshots.shapshots.(unknown|recovery|rebalancing).sent-bytes`
and `range.snapshots.shapshots.(unknown|recovery|rebalancing).rcvd-bytes`)
to the metrics dashboard. This will allow users to track the number
of bytes sent/received for each type of metric in addition to the
total bytes sent/received.

82277: authors: add surajr10 to authors r=surajr10 a=surajr10

Release note: None

82296: tests: update version check for tenant scoping r=dhartunian a=rimadeodhar

This PR fixes the version gate check to add tenant
scoping while creating client certs for tenant
roachtests. It is erroneously gated on v22.1 when it
should be v22.2.

Release note: None

Co-authored-by: e-mbrown <[email protected]>
Co-authored-by: Ryan Zhao <[email protected]>
Co-authored-by: Suraj <[email protected]>
Co-authored-by: rimadeodhar <[email protected]>
  • Loading branch information
5 people committed Jun 1, 2022
5 parents b3589ca + 40151de + 3a3009c + 37b6ca7 + 3e9776d commit fd26d19
Show file tree
Hide file tree
Showing 17 changed files with 482 additions and 26 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ Steven Hand <@cockroachlabs.com> hand <@cockroachlabs.com> hand-crdb <@cockroach
Steven Hubbard <[email protected]> <[email protected]>
Sumeer Bhola <[email protected]> sumeerbhola <[email protected]>
sum12 <[email protected]>
Suraj Rao <[email protected]> <[email protected]>
Syd <[email protected]>
Takuya Kuwahara <[email protected]>
Tamir Duberstein <[email protected]> <@cockroachlabs.com>
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func createTenantNode(
versionStr, err := fetchCockroachVersion(ctx, t.L(), c, n[0])
v := version.MustParse(versionStr)
require.NoError(t, err)
// Tenant scoped certificates were introduced in version 22.1.
if v.AtLeast(version.MustParse("v22.1.0")) {
// Tenant scoped certificates were introduced in version 22.2.
if v.AtLeast(version.MustParse("v22.2.0")) {
tn.recreateClientCertsWithTenantScope(ctx, c)
}
tn.createTenantCert(ctx, t, c)
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,11 +1084,11 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam
tc.mu.Lock()
defer tc.mu.Unlock()
// The transaction must not have already been used in this epoch.
if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() {
if tc.hasPerformedReadsLocked() {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx)
}
if tc.mu.txn.Sequence != 0 {
if tc.hasPerformedWritesLocked() {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx)
}
Expand Down Expand Up @@ -1406,3 +1406,25 @@ func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) {
tc.mu.txnState = txnPending
}
}

// HasPerformedReads is part of the TxnSender interface.
func (tc *TxnCoordSender) HasPerformedReads() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.hasPerformedReadsLocked()
}

// HasPerformedWrites is part of the TxnSender interface.
func (tc *TxnCoordSender) HasPerformedWrites() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.hasPerformedWritesLocked()
}

func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty()
}

func (tc *TxnCoordSender) hasPerformedWritesLocked() bool {
return tc.mu.txn.Sequence != 0
}
48 changes: 48 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,42 @@ var (
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotUnknownRcvdBytes = metric.Metadata{
Name: "range.snapshots.unknown.rcvd-bytes",
Help: "Number of unknown snapshot bytes received",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotUnknownSentBytes = metric.Metadata{
Name: "range.snapshots.unknown.sent-bytes",
Help: "Number of unknown snapshot bytes sent",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRebalancingRcvdBytes = metric.Metadata{
Name: "range.snapshots.rebalancing.rcvd-bytes",
Help: "Number of rebalancing snapshot bytes received",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRebalancingSentBytes = metric.Metadata{
Name: "range.snapshots.rebalancing.sent-bytes",
Help: "Number of rebalancing snapshot bytes sent",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRecoveryRcvdBytes = metric.Metadata{
Name: "range.snapshots.recovery.rcvd-bytes",
Help: "Number of recovery snapshot bytes received",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRecoverySentBytes = metric.Metadata{
Name: "range.snapshots.recovery.sent-bytes",
Help: "Number of recovery snapshot bytes sent",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Help: "Number of raft leader transfers",
Expand Down Expand Up @@ -1524,6 +1560,12 @@ type StoreMetrics struct {
RangeSnapshotsAppliedByNonVoters *metric.Counter
RangeSnapshotRcvdBytes *metric.Counter
RangeSnapshotSentBytes *metric.Counter
RangeSnapshotUnknownRcvdBytes *metric.Counter
RangeSnapshotUnknownSentBytes *metric.Counter
RangeSnapshotRecoveryRcvdBytes *metric.Counter
RangeSnapshotRecoverySentBytes *metric.Counter
RangeSnapshotRebalancingRcvdBytes *metric.Counter
RangeSnapshotRebalancingSentBytes *metric.Counter

// Raft processing metrics.
RaftTicks *metric.Counter
Expand Down Expand Up @@ -1973,6 +2015,12 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotsAppliedByNonVoters: metric.NewCounter(metaRangeSnapshotsAppliedByNonVoter),
RangeSnapshotRcvdBytes: metric.NewCounter(metaRangeSnapshotRcvdBytes),
RangeSnapshotSentBytes: metric.NewCounter(metaRangeSnapshotSentBytes),
RangeSnapshotUnknownRcvdBytes: metric.NewCounter(metaRangeSnapshotUnknownRcvdBytes),
RangeSnapshotUnknownSentBytes: metric.NewCounter(metaRangeSnapshotUnknownSentBytes),
RangeSnapshotRecoveryRcvdBytes: metric.NewCounter(metaRangeSnapshotRecoveryRcvdBytes),
RangeSnapshotRecoverySentBytes: metric.NewCounter(metaRangeSnapshotRecoverySentBytes),
RangeSnapshotRebalancingRcvdBytes: metric.NewCounter(metaRangeSnapshotRebalancingRcvdBytes),
RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries),

Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -775,7 +774,7 @@ func (t *RaftTransport) SendSnapshot(
snap *OutgoingSnapshot,
newBatch func() storage.Batch,
sent func(),
bytesSentCounter *metric.Counter,
recordBytesSent snapshotRecordMetrics,
) error {
nodeID := header.RaftMessageRequest.ToReplica.NodeID

Expand All @@ -794,7 +793,7 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, bytesSentCounter)
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, recordBytesSent)
}

// DelegateSnapshot creates a rpc stream between the leaseholder and the
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,21 @@ func (r *Replica) followerSendSnapshot(
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
}

recordBytesSent := func(inc int64) {
r.store.metrics.RangeSnapshotSentBytes.Inc(inc)

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc)
case kvserverpb.SnapshotRequest_REBALANCE:
r.store.metrics.RangeSnapshotRebalancingSentBytes.Inc(inc)
default:
// If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of
// type UNKNOWN.
r.store.metrics.RangeSnapshotUnknownSentBytes.Inc(inc)
}
}

err = contextutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
return r.store.cfg.Transport.SendSnapshot(
Expand All @@ -2776,7 +2791,7 @@ func (r *Replica) followerSendSnapshot(
snap,
newBatchFn,
sent,
r.store.metrics.RangeSnapshotSentBytes,
recordBytesSent,
)
},
)
Expand Down
Loading

0 comments on commit fd26d19

Please sign in to comment.