Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120030: kvserver: opt-in "fast" external file snapshotting  r=itsbilal a=stevendanna

Like SharedSSTs, we may want to send a pebble.ExternalFile's metadata
rather than its content during a snapshot.

This is opt-in via a cluster setting and is only attempted when the
store appears to actually have external files.

Epic: none
Release note: None

120709: pkg/sql/sqlstats: rework SQL stats flush metrics r=xinhaoz a=abarganier

Informs: cockroachdb#119779

Epic: CRDB-24527

This patch renames a few of the core metrics used to track the SQL stats
flush job to be more in-line with how the SQL activity update job's
metrics work.

Primarily, it renames them for consistency. It also changes the general
`count` metric to only track successful executions, so we can more
easily discern between failed and successful executions.

Release note (ops change): Metrics used to track the SQL stats
subsystem's task that flushes in-memory stats to persisted storage have
been reworked slightly to be more consistent with other metrics used in
the subsystem. The metrics are:

- `sql.stats.flushes.successful`: Number of times SQL Stats are flushed
 successfully to persistent storage
- `sql.stats.flushes.failed`: Number of attempted SQL Stats flushes that
 failed with errors
- `sql.stats.flush.latency`: The latency of SQL Stats flushes to
 persistent storage. Includes failed flush attempts

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Alex Barganier <[email protected]>
  • Loading branch information
3 people committed Mar 20, 2024
3 parents aa2e1fb + 85386fd + 78fee7a commit ecedafa
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 81 deletions.
8 changes: 4 additions & 4 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1474,15 +1474,15 @@
<tr><td>APPLICATION</td><td>sql.statements.active</td><td>Number of currently active user SQL statements</td><td>Active Statements</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.statements.active.internal</td><td>Number of currently active user SQL statements (internal queries)</td><td>SQL Internal Statements</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.activity.update.latency</td><td>The latency of updates made by the SQL activity updater job. Includes failed update attempts</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.activity.updates.failed</td><td>Number of update attempts made by the SQL activity updater job that failed with errors</td><td>failed updatesgi</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.activity.updates.failed</td><td>Number of update attempts made by the SQL activity updater job that failed with errors</td><td>failed updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.activity.updates.successful</td><td>Number of successful updates made by the SQL activity updater job</td><td>successful updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.cleanup.rows_removed</td><td>Number of stale statistics rows that are removed</td><td>SQL Stats Cleanup</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.discarded.current</td><td>Number of fingerprint statistics being discarded</td><td>Discarded SQL Stats</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flush.count</td><td>Number of times SQL Stats are flushed to persistent storage</td><td>SQL Stats Flush</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flush.done_signals.ignored</td><td>Number of times the SQL Stats activity update job ignored the signal sent to it indicating a flush has completed</td><td>flush done signals ignored</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flush.duration</td><td>Time took to in nanoseconds to complete SQL Stats flush</td><td>SQL Stats Flush</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flush.error</td><td>Number of errors encountered when flushing SQL Stats</td><td>SQL Stats Flush</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flush.fingerprint.count</td><td>The number of unique statement and transaction fingerprints included in the SQL Stats flush</td><td>statement &amp; transaction fingerprints</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flush.latency</td><td>The latency of SQL Stats flushes to persistent storage. Includes failed flush attempts</td><td>nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flushes.failed</td><td>Number of attempted SQL Stats flushes that failed with errors</td><td>failed flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.flushes.successful</td><td>Number of times SQL Stats are flushed successfully to persistent storage</td><td>successful flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.mem.current</td><td>Current memory usage for fingerprint storage</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.mem.max</td><td>Memory usage for fingerprint storage</td><td>Memory</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.stats.reported.mem.current</td><td>Current memory usage for reported fingerprint storage</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/storageccl/engineccl/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func iterateReplicaKeySpansShared(
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error {
if !reader.ConsistentIterators() {
panic("reader must provide consistent iterators")
Expand All @@ -65,7 +66,7 @@ func iterateReplicaKeySpansShared(
ReplicatedBySpan: desc.RSpan(),
})
span := spans[0]
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile)
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile)
}

func init() {
Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ message SnapshotRequest {
// metadata present in the snapshot, but not file contents.
bool shared_replicate = 12;

// If true, the snapshot could contain external files. Such files
// will have their metadata present in the snapshot but not the
// file contents.
bool external_replicate = 13;

reserved 1, 4, 6, 7, 8, 9;
}

Expand Down Expand Up @@ -242,6 +247,54 @@ message SnapshotRequest {
uint64 size = 10;
}

// ExternalTable represents one SSTable present in external storage.
// Intended to be the protobuf version of pebble.ExternalFile.
message ExternalTable {
// Used by the Pebble objstorage package to generate new blob storage drivers.
// Reserved for future use.
bytes locator = 1;

// ObjName is the unique name of this sstable on Locator.
string object_name = 2;

// Physical size of the sstable in bytes.
uint64 size = 3;

// StartKey is the loose, inclusive start key of the sstable.
bytes StartKey = 4;
// EndKey is the loose, end key of the sstable. Whether it is
// inclusive or exclusive is controlled by EndKeyIsInclusive.
bytes EndKey = 5;

// EndKeyIsInclusive is true if the EndKey is inclusive.
bool EndKeyIsInclusive = 6;

// HasPointKey denote whether this file contains point keys.
bool has_point_key = 7;

// HasRangeKey denote whether this file contains range keys.
bool has_range_key = 8;

// SyntheticPrefix will prepend this suffix to all keys in the file during
// iteration. Note that the backing file itself is not modified.
bytes synthetic_prefix = 9;

// SyntheticSuffix will replace the suffix of every key in the file during
// iteration. Note that the file itself is not modified, rather, every key
// returned by an iterator will have the synthetic suffix.
//
// SyntheticSuffix can only be used under the following conditions:
// - the synthetic suffix must sort before any non-empty suffixes in the
// backing sst (the entire sst, not just the part restricted to Bounds).
// - the backing sst must not contain multiple keys with the same prefix.
bytes synthetic_suffix = 10;

// LSM level of the original sstable. This sstable will go into the same
// level in the destination LSM.
int32 level = 11;

}

Header header = 1;

// A BatchRepr. Multiple kv_batches may be sent across multiple request messages.
Expand All @@ -259,6 +312,8 @@ message SnapshotRequest {
// flag must be set to true before any user keys are streamed.
bool transition_from_shared_to_regular_replicate = 6;

repeated ExternalTable external_tables = 7 [(gogoproto.nullable) = false];

reserved 3;
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ var IterateReplicaKeySpansShared func(
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error

// IterateOptions instructs how points and ranges should be presented to visitor
Expand Down
30 changes: 29 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,6 +3078,12 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting(
"trace logged (set to 0 to disable);", 0,
)

var externalFileSnapshotting = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.snapshot.external_files.enabled",
"enables sending external files as metadata during a snapshot",
false)

// followerSendSnapshot receives a delegate snapshot request and generates the
// snapshot from this replica. The entire process of generating and transmitting
// the snapshot is handled, and errors are propagated back to the leaseholder.
Expand Down Expand Up @@ -3157,7 +3163,28 @@ func (r *Replica) followerSendSnapshot(
// a snapshot for a non-system range. This allows us to send metadata of
// sstables in shared storage as opposed to streaming their contents. Keys
// in higher levels of the LSM are still streamed in the snapshot.
sharedReplicate := r.store.cfg.SharedStorageEnabled && snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0
nonSystemRange := snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0
sharedReplicate := r.store.cfg.SharedStorageEnabled && nonSystemRange

// Use external replication if we aren't using shared
// replication, are dealing with a non-system range, are on at
// least 24.1, and our store has external files.
externalReplicate := !sharedReplicate && nonSystemRange &&
r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1) &&
externalFileSnapshotting.Get(&r.store.ClusterSettings().SV)
if externalReplicate {
start := snap.State.Desc.StartKey.AsRawKey()
end := snap.State.Desc.EndKey.AsRawKey()
total, _, external, err := r.store.StateEngine().ApproximateDiskBytes(start, end)
if err != nil {
log.Warningf(ctx, "could not determine if store has external bytes: %v", err)
externalReplicate = false
}

// Enable ExternalReplicate only if we have more
// external bytes than local bytes.
externalReplicate = external > (total - external)
}

// Create new snapshot request header using the delegate snapshot request.
header := kvserverpb.SnapshotRequest_Header{
Expand All @@ -3178,6 +3205,7 @@ func (r *Replica) followerSendSnapshot(
SenderQueueName: req.SenderQueueName,
SenderQueuePriority: req.SenderQueuePriority,
SharedReplicate: sharedReplicate,
ExternalReplicate: externalReplicate,
}
newBatchFn := func() storage.WriteBatch {
return r.store.TODOEngine().NewWriteBatch()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ type IncomingSnapshot struct {
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
sharedSSTs []pebble.SharedSSTMeta
externalSSTs []pebble.ExternalFile
doExcise bool
// clearedSpans represents the key spans in the existing store that will be
// cleared by doing the Ingest*. This is tracked so that we can convert the
Expand Down Expand Up @@ -673,7 +674,7 @@ func (r *Replica) applySnapshot(
if inSnap.doExcise {
exciseSpan := desc.KeySpan().AsRawSpanWithNoLocals()
if ingestStats, err =
r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, exciseSpan); err != nil {
r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, inSnap.externalSSTs, exciseSpan); err != nil {
return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey)
}
} else {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,9 @@ func (s spanSetReader) ScanInternal(
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error {
return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile)
}

func (s spanSetReader) Close() {
Expand Down Expand Up @@ -788,6 +789,7 @@ func (s spanSetBatch) ScanInternal(
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error {
// Only used on Engine.
panic("unimplemented")
Expand Down
Loading

0 comments on commit ecedafa

Please sign in to comment.