diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 6883c4a9cc18..0a6cdfaaea3b 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -430,6 +430,8 @@
STORAGE | raft.entrycache.read_bytes | Counter of bytes in entries returned from the Raft entry cache | Bytes | COUNTER | BYTES | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | raft.entrycache.size | Number of Raft entries in the Raft entry cache | Entry Count | GAUGE | COUNT | AVG | NONE |
STORAGE | raft.heartbeats.pending | Number of pending heartbeats and responses waiting to be coalesced | Messages | GAUGE | COUNT | AVG | NONE |
+STORAGE | raft.loaded_entries.bytes | Bytes allocated by raft Storage.Entries calls that are still kept in memory | Bytes | GAUGE | BYTES | AVG | NONE |
+STORAGE | raft.loaded_entries.reserved.bytes | Bytes allocated by raft Storage.Entries calls that are still kept in memory | Memory | GAUGE | BYTES | AVG | NONE |
STORAGE | raft.process.applycommitted.latency | Latency histogram for applying all committed Raft commands in a Raft ready.
This measures the end-to-end latency of applying all commands in a Raft ready. Note that this closes over possibly multiple measurements of the 'raft.process.commandcommit.latency' metric, which receives datapoints for each sub-batch processed in the process. | Latency | HISTOGRAM | NANOSECONDS | AVG | NONE |
STORAGE | raft.process.commandcommit.latency | Latency histogram for applying a batch of Raft commands to the state machine.
This metric is misnamed: it measures the latency for *applying* a batch of committed Raft commands to a Replica state machine. This requires only non-durable I/O (except for replication configuration changes).
Note that a "batch" in this context is really a sub-batch of the batch received for application during raft ready handling. The 'raft.process.applycommitted.latency' histogram is likely more suitable in most cases, as it measures the total latency across all sub-batches (i.e. the sum of commandcommit.latency for a complete batch).
| Latency | HISTOGRAM | NANOSECONDS | AVG | NONE |
STORAGE | raft.process.handleready.latency | Latency histogram for handling a Raft ready.
This measures the end-to-end-latency of the Raft state advancement loop, including: - snapshot application - SST ingestion - durably appending to the Raft log (i.e. includes fsync) - entry application (incl. replicated side effects, notably log truncation)
These include work measured in 'raft.process.commandcommit.latency' and 'raft.process.applycommitted.latency'. However, matching percentiles of these metrics may be *higher* than handleready, since not every handleready cycle leads to an update of the others. For example, under tpcc-100 on a single node, the handleready count is approximately twice the logcommit count (and logcommit count tracks closely with applycommitted count).
High percentile outliers can be caused by individual large Raft commands or storage layer blips. Lower percentile (e.g. 50th) increases are often driven by CPU exhaustion or storage layer slowdowns.
| Latency | HISTOGRAM | NANOSECONDS | AVG | NONE |
diff --git a/pkg/kv/kvserver/logstore/BUILD.bazel b/pkg/kv/kvserver/logstore/BUILD.bazel
index 208399bb8192..c3d586fc036a 100644
--- a/pkg/kv/kvserver/logstore/BUILD.bazel
+++ b/pkg/kv/kvserver/logstore/BUILD.bazel
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "logstore",
srcs = [
+ "bytes_tracker.go",
"logstore.go",
"sideload.go",
"sideload_disk.go",
@@ -47,6 +48,7 @@ go_library(
go_test(
name = "logstore_test",
srcs = [
+ "bytes_tracker_test.go",
"logstore_bench_test.go",
"sideload_test.go",
"sync_waiter_test.go",
diff --git a/pkg/kv/kvserver/logstore/bytes_tracker.go b/pkg/kv/kvserver/logstore/bytes_tracker.go
new file mode 100644
index 000000000000..ec55e02a4909
--- /dev/null
+++ b/pkg/kv/kvserver/logstore/bytes_tracker.go
@@ -0,0 +1,189 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package logstore
+
+import (
+ "github.com/cockroachdb/cockroach/pkg/util/envutil"
+ "github.com/cockroachdb/cockroach/pkg/util/metric"
+)
+
+// raftEntriesMemoryLimit is the "global" soft limit for the total size of raft
+// log entries pulled into memory simultaneously. Currently, this only includes
+// the entries pulled as part of the local state machine application flow.
+//
+// No limit if <= 0.
+var raftEntriesMemoryLimit = envutil.EnvOrDefaultBytes(
+ "COCKROACH_RAFT_ENTRIES_MEMORY_LIMIT", 0)
+
+// NewRaftEntriesSoftLimit returns the SoftLimit configured with the default
+// memory limit.
+func NewRaftEntriesSoftLimit() *SoftLimit {
+ reservedBytesMetric := metric.NewGauge(metric.Metadata{
+ Name: "raft.loaded_entries.reserved.bytes",
+ Help: "Bytes allocated by raft Storage.Entries calls that are still kept in memory",
+ Measurement: "Memory",
+ Unit: metric.Unit_BYTES,
+ })
+ return &SoftLimit{Metric: reservedBytesMetric, Limit: raftEntriesMemoryLimit}
+}
+
+// The reservation sizes are optimized with the following constraints in mind:
+//
+// - Per kvserver.defaultRaftSchedulerConcurrency, there are <= 128 raft
+// scheduler workers.
+// - Per bulk.IngestBatchSize and kvserverbase.MaxCommandSize, raft entries are
+// up to 16 MB in size. Entries are typically small, but can reach this
+// limit, e.g. if these are AddSSTable commands or large batch INSERTs.
+// Typically, compression reduces the max size to < 8MB.
+const (
+ // minReserveSize is the granularity at which bytes are reserved from the
+ // SoftLimit for small allocations (below smallReserveSize). Batching small
+ // allocations this way amortizes the runtime cost of SoftLimit which is
+ // typically shared by many goroutines.
+ minReserveSize = 256 << 10
+ // smallReserveSize is the threshold below which the size is considered
+ // "small", and causes reserving in multiples of minReserveSize at a time.
+ smallReserveSize = 4 << 20
+)
+
+// SoftLimit is a byte size limit with weak guarantees. It tracks the global
+// usage in a metric, and gives the user a hint when the usage has reached the
+// soft limit.
+//
+// When at the limit, the user is supposed to back off acquiring resources, but
+// is not strictly required to. There can be overflows when the user acquires
+// resources optimistically. It is recommended to make sure that the possible
+// overflows are bounded (e.g. there is a bounded number of workers, and each
+// can optimistically acquire a bounded amount), and to account for these
+// overflows when picking the soft limit.
+//
+// TODO(pav-kv): integrate with mon.BytesMonitor.
+type SoftLimit struct {
+ Metric *metric.Gauge // the "global" usage metric
+ Limit int64 // the soft limit
+}
+
+// acquire accepts the given number of bytes for tracking.
+func (s *SoftLimit) acquire(x uint64) {
+ s.Metric.Inc(int64(x))
+}
+
+// withinLimit returns true iff the bytes usage hasn't reached the soft limit.
+func (s *SoftLimit) withinLimit() bool {
+ return s.Limit <= 0 || s.Metric.Value() < s.Limit
+}
+
+// release removes the given number of bytes from tracking.
+func (s *SoftLimit) release(x uint64) {
+ s.Metric.Dec(int64(x))
+}
+
+// BytesAccount acquires bytes from SoftLimit, and releases them at the end of
+// the lifetime.
+type BytesAccount struct {
+ lim *SoftLimit
+ metric *metric.Gauge // the "local" usage metric
+ used uint64
+ reserved uint64 // reserved bytes are not used yet but are accounted for in lim
+}
+
+// NewAccount creates a BytesAccount consuming from this SoftLimit.
+func (s *SoftLimit) NewAccount(metric *metric.Gauge) BytesAccount {
+ return BytesAccount{lim: s, metric: metric}
+}
+
+// Initialized returns true iff this BytesAccount is usable.
+func (b *BytesAccount) Initialized() bool {
+ return b != nil && b.lim != nil
+}
+
+// Grow optimistically acquires and accounts for the given number of bytes.
+// Returns false when this leads to the SoftLimit overflow, in which case the
+// user should back off acquiring any more resources.
+func (b *BytesAccount) Grow(x uint64) (withinLimit bool) {
+ if !b.Initialized() {
+ return true
+ }
+ withinLimit = true
+ if x > b.reserved {
+ need := roundSize(x - b.reserved)
+ b.lim.acquire(need)
+ b.reserved += need
+ if withinLimit = b.lim.withinLimit(); !withinLimit && b.reserved > x {
+ // If we reached the soft limit, drain the remainder of the reserved bytes
+ // to the limiter. We will not use it - the client typically stops calling
+ // Grow after this.
+ b.lim.release(b.reserved - x)
+ b.reserved = x
+ }
+ }
+ b.reserved -= x
+ b.used += x
+ if b.metric != nil {
+ b.metric.Inc(int64(x))
+ }
+ return withinLimit
+}
+
+// Clear returns all the reserved bytes into the SoftLimit.
+func (b *BytesAccount) Clear() {
+ if !b.Initialized() {
+ return
+ }
+ if b.metric != nil {
+ b.metric.Dec(int64(b.used))
+ }
+ b.lim.release(b.used + b.reserved)
+ b.used, b.reserved = 0, 0
+}
+
+func roundSize(size uint64) uint64 {
+ if size >= smallReserveSize {
+ // Don't round the size up if the allocation is large. This also avoids edge
+ // cases in the math below if size == math.MaxInt64.
+ return size
+ }
+ return (size + minReserveSize - 1) / minReserveSize * minReserveSize
+}
+
+// sizeHelper helps to build a batch of entries with the total size not
+// exceeding the static and dynamic byte limits. The user calls the add() method
+// until sizeHelper.done becomes true. For all add() calls that returned true,
+// the corresponding data can be added to the batch.
+//
+// In some exceptional cases, the size of the batch can exceed the limits:
+// - maxBytes can be exceeded by the first added entry,
+// - the soft limit can be exceeded by the last added entry.
+type sizeHelper struct {
+ bytes uint64
+ maxBytes uint64
+ account *BytesAccount
+ done bool
+}
+
+// add returns true if the given number of bytes can be added to the batch
+// without exceeding the maxBytes limit, or overflowing the bytes account. The
+// first add call always returns true.
+//
+// Must not be called after sizeHelper.done becomes true.
+func (s *sizeHelper) add(bytes uint64) bool {
+ if s.bytes == 0 { // this is the first entry, always take it
+ s.bytes += bytes
+ s.done = !s.account.Grow(bytes) || s.bytes > s.maxBytes
+ return true
+ } else if s.bytes+bytes > s.maxBytes {
+ s.done = true
+ return false
+ }
+ s.bytes += bytes
+ s.done = !s.account.Grow(bytes)
+ return true
+}
diff --git a/pkg/kv/kvserver/logstore/bytes_tracker_test.go b/pkg/kv/kvserver/logstore/bytes_tracker_test.go
new file mode 100644
index 000000000000..be63c76c9411
--- /dev/null
+++ b/pkg/kv/kvserver/logstore/bytes_tracker_test.go
@@ -0,0 +1,122 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package logstore
+
+import (
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/util/metric"
+ "github.com/stretchr/testify/require"
+)
+
+func TestSoftLimiter(t *testing.T) {
+ lim := SoftLimit{Metric: metric.NewGauge(metric.Metadata{}), Limit: 1000}
+ require.Zero(t, lim.Metric.Value())
+ lim.acquire(100)
+ require.Equal(t, int64(100), lim.Metric.Value())
+ require.True(t, lim.withinLimit())
+ lim.acquire(900)
+ require.Equal(t, int64(1000), lim.Metric.Value())
+ require.False(t, lim.withinLimit())
+ lim.release(100)
+ require.Equal(t, int64(900), lim.Metric.Value())
+ require.True(t, lim.withinLimit())
+ lim.acquire(10000)
+ require.Equal(t, int64(10900), lim.Metric.Value())
+ require.False(t, lim.withinLimit())
+ lim.release(900)
+ require.Equal(t, int64(10000), lim.Metric.Value())
+ require.False(t, lim.withinLimit())
+ lim.release(10000)
+ require.Zero(t, lim.Metric.Value())
+ require.True(t, lim.withinLimit())
+
+ lim.Limit = 0 // no limit
+ lim.acquire(100)
+ require.True(t, lim.withinLimit())
+ lim.acquire(1000000)
+ require.True(t, lim.withinLimit())
+}
+
+func TestBytesAccount(t *testing.T) {
+ lim := SoftLimit{Metric: metric.NewGauge(metric.Metadata{}), Limit: 1 << 20}
+ a1, a2 := lim.NewAccount(nil), lim.NewAccount(nil)
+ require.True(t, a1.Grow(256<<10))
+
+ require.True(t, a2.Grow(128<<10))
+ require.True(t, a2.Grow(256<<10))
+ require.False(t, a2.Grow(512<<10))
+ require.Equal(t, uint64(512+256+128)<<10, a2.used)
+ require.Zero(t, a2.reserved)
+ // a2 returns all the reserved bytes to the limiter.
+ require.Equal(t, int64(a1.used+a1.reserved+a2.used), lim.Metric.Value())
+
+ require.False(t, a1.Grow(10))
+ require.Equal(t, uint64(10+256<<10), a1.used)
+ require.Zero(t, a1.reserved)
+ // a1 returns all the reserved bytes to the limiter.
+ require.Equal(t, int64(a1.used+a2.used), lim.Metric.Value())
+
+ require.False(t, lim.withinLimit())
+ a2.Clear()
+ require.True(t, lim.withinLimit())
+ require.True(t, a1.Grow(1000))
+ a1.Clear()
+
+ require.Zero(t, lim.Metric.Value())
+}
+
+func TestSizeHelper(t *testing.T) {
+ lim := SoftLimit{Metric: metric.NewGauge(metric.Metadata{}), Limit: 1 << 20}
+ for _, tt := range []struct {
+ max uint64
+ sizes []uint64
+ take int
+ }{
+ // Limits are not reached.
+ {max: 1023, sizes: []uint64{10}, take: 1},
+ {max: 100, sizes: []uint64{10, 10, 20, 10}, take: 4},
+ // Max size limit is reached.
+ {max: 100, sizes: []uint64{30, 30, 30, 30}, take: 3},
+ {max: 100, sizes: []uint64{100, 1}, take: 1},
+ {max: 100, sizes: []uint64{200, 1}, take: 1},
+ {max: 1000, sizes: []uint64{100, 900, 100}, take: 2},
+ {max: 1000, sizes: []uint64{100, 500, 10000}, take: 2},
+ {max: 1000, sizes: []uint64{100, 1000}, take: 1},
+ // Soft limiter kicks in.
+ {max: 64 << 20, sizes: []uint64{8 << 20}, take: 1},
+ {max: 64 << 20, sizes: []uint64{4 << 20, 4 << 20}, take: 1},
+ {max: 64 << 20, sizes: []uint64{1 << 20, 1 << 20, 1 << 20}, take: 1},
+ {max: 64 << 20, sizes: []uint64{100, 1 << 20, 1 << 20}, take: 2},
+ {max: 64 << 20, sizes: []uint64{100, 1000, 1234, 1 << 20, 1 << 20}, take: 4},
+ } {
+ t.Run("", func(t *testing.T) {
+ acc := lim.NewAccount(nil)
+ defer acc.Clear()
+ sh := sizeHelper{maxBytes: tt.max, account: &acc}
+
+ took := 0
+ for ln := len(tt.sizes); took < ln && !sh.done; took++ {
+ if size := tt.sizes[took]; !sh.add(size) {
+ require.True(t, sh.done)
+ break
+ }
+ }
+ require.Equal(t, tt.take, took)
+
+ var want uint64
+ for _, size := range tt.sizes[:tt.take] {
+ want += size
+ }
+ require.Equal(t, want, sh.bytes)
+ })
+ }
+}
diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go
index 3c0ceae72455..6cc0bca145b5 100644
--- a/pkg/kv/kvserver/logstore/logstore.go
+++ b/pkg/kv/kvserver/logstore/logstore.go
@@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math/rand"
+ "slices"
"sync"
"time"
@@ -530,6 +531,7 @@ func LoadEntries(
sideloaded SideloadStorage,
lo, hi kvpb.RaftIndex,
maxBytes uint64,
+ account *BytesAccount,
) (_ []raftpb.Entry, _cachedSize uint64, _loadedSize uint64, _ error) {
if lo > hi {
return nil, 0, 0, errors.Errorf("lo:%d is greater than hi:%d", lo, hi)
@@ -540,17 +542,29 @@ func LoadEntries(
n = 100
}
ents := make([]raftpb.Entry, 0, n)
-
- ents, cachedSize, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes)
-
- // Return results if the correct number of results came back or if
- // we ran into the max bytes limit.
- if kvpb.RaftIndex(len(ents)) == hi-lo || exceededMaxBytes {
+ ents, _, hitIndex, _ := eCache.Scan(ents, rangeID, lo, hi, maxBytes)
+
+ // TODO(pav-kv): pass the sizeHelper to eCache.Scan above, to avoid scanning
+ // the same entries twice, and computing their sizes.
+ sh := sizeHelper{maxBytes: maxBytes, account: account}
+ for i, entry := range ents {
+ if sh.done || !sh.add(uint64(entry.Size())) {
+ // Remove the remaining entries, and dereference the memory they hold.
+ ents = slices.Delete(ents, i, len(ents))
+ break
+ }
+ }
+ // NB: if we couldn't get quota for all cached entries, return only a prefix
+ // for which we got it. Even though all the cached entries are already in
+ // memory, returning all of them would increase their lifetime, incur size
+ // amplification when processing them, and risk reaching out-of-memory state.
+ cachedSize := sh.bytes
+ // Return results if the correct number of results came back, or we ran into
+ // the max bytes limit, or reached the memory budget limit.
+ if len(ents) == int(hi-lo) || sh.done {
return ents, cachedSize, 0, nil
}
- combinedSize := cachedSize // size tracks total size of ents.
-
// Scan over the log to find the requested entries in the range [lo, hi),
// stopping once we have enough.
expectedIndex := hitIndex
@@ -578,17 +592,12 @@ func LoadEntries(
}
}
- // Note that we track the size of proposals with payloads inlined.
- combinedSize += uint64(ent.Size())
- if combinedSize > maxBytes {
- exceededMaxBytes = true
- if len(ents) == 0 { // make sure to return at least one entry
- ents = append(ents, ent)
- }
+ if sh.add(uint64(ent.Size())) {
+ ents = append(ents, ent)
+ }
+ if sh.done {
return iterutil.StopIteration()
}
-
- ents = append(ents, ent)
return nil
}
@@ -600,13 +609,9 @@ func LoadEntries(
eCache.Add(rangeID, ents, false /* truncate */)
// Did the correct number of results come back? If so, we're all good.
- if kvpb.RaftIndex(len(ents)) == hi-lo {
- return ents, cachedSize, combinedSize - cachedSize, nil
- }
-
- // Did we hit the size limit? If so, return what we have.
- if exceededMaxBytes {
- return ents, cachedSize, combinedSize - cachedSize, nil
+ // Did we hit the size limits? If so, return what we have.
+ if len(ents) == int(hi-lo) || sh.done {
+ return ents, cachedSize, sh.bytes - cachedSize, nil
}
// Did we get any results at all? Because something went wrong.
diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go
index 865e0801d7d6..5ae4f6faac4f 100644
--- a/pkg/kv/kvserver/metrics.go
+++ b/pkg/kv/kvserver/metrics.go
@@ -1221,6 +1221,13 @@ or the delegate being too busy to send.
// (0 to 1.0) so it probably won't produce useful results here.
Unit: metric.Unit_COUNT,
}
+ // Raft entry bytes loaded in memory.
+ metaRaftLoadedEntriesBytes = metric.Metadata{
+ Name: "raft.loaded_entries.bytes",
+ Help: `Bytes allocated by raft Storage.Entries calls that are still kept in memory`,
+ Measurement: "Bytes",
+ Unit: metric.Unit_BYTES,
+ }
// Raft processing metrics.
metaRaftTicks = metric.Metadata{
@@ -2650,6 +2657,7 @@ type StoreMetrics struct {
RaftProposalsDropped *metric.Counter
RaftProposalsDroppedLeader *metric.Counter
RaftQuotaPoolPercentUsed metric.IHistogram
+ RaftLoadedEntriesBytes *metric.Gauge
RaftWorkingDurationNanos *metric.Counter
RaftTickingDurationNanos *metric.Counter
RaftCommandsProposed *metric.Counter
@@ -3351,6 +3359,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
SigFigs: 1,
BucketConfig: metric.Percent100Buckets,
}),
+ RaftLoadedEntriesBytes: metric.NewGauge(metaRaftLoadedEntriesBytes),
RaftWorkingDurationNanos: metric.NewCounter(metaRaftWorkingDurationNanos),
RaftTickingDurationNanos: metric.NewCounter(metaRaftTickingDurationNanos),
RaftCommandsProposed: metric.NewCounter(metaRaftCommandsProposed),
diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go
index fcb62b66a27a..9270036a7ef0 100644
--- a/pkg/kv/kvserver/replica.go
+++ b/pkg/kv/kvserver/replica.go
@@ -324,6 +324,11 @@ type Replica struct {
stateMachine replicaStateMachine
// decoder is used to decode committed raft entries.
decoder replicaDecoder
+
+ // bytesAccount accounts bytes used by various Raft components, like entries
+ // to be applied. Currently, it only tracks bytes used by committed entries
+ // being applied to the state machine.
+ bytesAccount logstore.BytesAccount
}
// localMsgs contains a collection of raftpb.Message that target the local
diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go
index 78ead347f4ca..dc6d27b7aa39 100644
--- a/pkg/kv/kvserver/replica_raft.go
+++ b/pkg/kv/kvserver/replica_raft.go
@@ -750,6 +750,19 @@ func (r *Replica) handleRaftReady(
return r.handleRaftReadyRaftMuLocked(ctx, inSnap)
}
+func (r *Replica) attachRaftEntriesMonitorRaftMuLocked() {
+ r.raftMu.bytesAccount = r.store.cfg.RaftEntriesMonitor.NewAccount(
+ r.store.metrics.RaftLoadedEntriesBytes)
+}
+
+func (r *Replica) detachRaftEntriesMonitorRaftMuLocked() {
+ // Return all the used bytes back to the limiter.
+ r.raftMu.bytesAccount.Clear()
+ // De-initialize the account so that log storage Entries() calls don't track
+ // the entries anymore.
+ r.raftMu.bytesAccount = logstore.BytesAccount{}
+}
+
// handleRaftReadyRaftMuLocked is the same as handleRaftReady but requires that
// the replica's raftMu be held.
//
@@ -799,7 +812,20 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return false, err
}
if hasReady = raftGroup.HasReady(); hasReady {
+ // Since we are holding raftMu, only this Ready() call will use
+ // raftMu.bytesAccount. It tracks memory usage that this Ready incurs.
+ r.attachRaftEntriesMonitorRaftMuLocked()
+ // TODO(pav-kv): currently, Ready() only accounts for entry bytes loaded
+ // from log storage, and ignores the in-memory unstable entries. Pass a
+ // flow control struct down the stack, and do a more complete accounting
+ // in raft. This will also eliminate the "side channel" plumbing hack with
+ // this bytesAccount.
syncRd := raftGroup.Ready()
+ // We apply committed entries during this handleRaftReady, so it is ok to
+ // release the corresponding memory tokens at the end of this func. Next
+ // time we enter this function, the account will be empty again.
+ defer r.detachRaftEntriesMonitorRaftMuLocked()
+
logRaftReady(ctx, syncRd)
asyncRd := makeAsyncReady(syncRd)
softState = asyncRd.SoftState
diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go
index 442fed27f7ac..a48f06fd7df6 100644
--- a/pkg/kv/kvserver/replica_raftstorage.go
+++ b/pkg/kv/kvserver/replica_raftstorage.go
@@ -115,7 +115,7 @@ func (r *replicaRaftStorage) TypedEntries(
return nil, errors.New("sideloaded storage is uninitialized")
}
ents, _, loadedSize, err := logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
- r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes)
+ r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes, &r.raftMu.bytesAccount)
r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize))
return ents, err
}
diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go
index d7b3644fac5f..e29a06f91476 100644
--- a/pkg/kv/kvserver/replica_sideload_test.go
+++ b/pkg/kv/kvserver/replica_sideload_test.go
@@ -185,8 +185,7 @@ func TestRaftSSTableSideloading(t *testing.T) {
tc.store.raftEntryCache.Clear(tc.repl.RangeID, hi)
ents, cachedBytes, _, err := logstore.LoadEntries(
ctx, rsl, tc.store.TODOEngine(), tc.repl.RangeID, tc.store.raftEntryCache,
- tc.repl.raftMu.sideloaded, lo, hi, math.MaxUint64,
- )
+ tc.repl.raftMu.sideloaded, lo, hi, math.MaxUint64, nil /* account */)
require.NoError(t, err)
require.Len(t, ents, int(hi-lo))
require.Zero(t, cachedBytes)
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index c950bc71f1fd..53a0ba2d3987 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -1233,6 +1233,7 @@ type StoreConfig struct {
// tests.
KVMemoryMonitor *mon.BytesMonitor
RangefeedBudgetFactory *rangefeed.BudgetFactory
+ RaftEntriesMonitor *logstore.SoftLimit // tracks memory used by raft entries
// SpanConfigsDisabled determines whether we're able to use the span configs
// infrastructure or not.
diff --git a/pkg/server/server.go b/pkg/server/server.go
index b38439a20419..c6447048bb0c 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider"
@@ -707,13 +708,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
if rangeFeedBudgetFactory != nil {
nodeRegistry.AddMetricStruct(rangeFeedBudgetFactory.Metrics())
}
+
+ raftEntriesMonitor := logstore.NewRaftEntriesSoftLimit()
+ nodeRegistry.AddMetric(raftEntriesMonitor.Metric)
+
// Closer order is important with BytesMonitor.
- stopper.AddCloser(stop.CloserFn(func() {
- rangeFeedBudgetFactory.Stop(ctx)
- }))
- stopper.AddCloser(stop.CloserFn(func() {
- kvMemoryMonitor.Stop(ctx)
- }))
+ stopper.AddCloser(stop.CloserFn(func() { rangeFeedBudgetFactory.Stop(ctx) }))
+ stopper.AddCloser(stop.CloserFn(func() { kvMemoryMonitor.Stop(ctx) }))
tsDB := ts.NewDB(db, cfg.Settings)
nodeRegistry.AddMetricStruct(tsDB.Metrics())
@@ -855,6 +856,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
EagerLeaseAcquisitionLimiter: eagerLeaseAcquisitionLimiter,
KVMemoryMonitor: kvMemoryMonitor,
RangefeedBudgetFactory: rangeFeedBudgetFactory,
+ RaftEntriesMonitor: raftEntriesMonitor,
SharedStorageEnabled: cfg.SharedStorage != "",
SystemConfigProvider: systemConfigWatcher,
SpanConfigSubscriber: spanConfig.subscriber,