Skip to content

Commit

Permalink
Merge #105286
Browse files Browse the repository at this point in the history
105286: kvserver: introduce raft memory tracking r=nvanbenschoten a=pav-kv

This PR introduces bytes tracking/limiting for raft entries. At the moment, it only tracks entries returned by raft `Ready()`. This only includes committed entries pulled in order to be applied to the state machine. The lifetime of these entries is limited to `handleRaftReadyRaftMuLocked` duration, so the acquired bytes quota is released at the end of this call.

Currently, `Ready()` calls `Storage.Entries()` only to fetch committed entries to be applied. In the future, it will also fetch entries when sending `MsgApp` messages to followers. The tracking will be extended correspondingly.

The limit is controlled by `COCKROACH_RAFT_ENTRIES_MEMORY_LIMIT` env variable, which is currently disabled by default.

### Testing

The effect has been demonstrated on a test #120886 which creates a large backlog of unapplied committed entries across multiple ranges. When the server is started, it eagerly tries to apply all these entries. See the screenshots of a few metrics during the unconstrained run, and a run with memory limiting.

The memory footprint has dropped from 10GB to 6GB on this test, without negatively impacting the throughput (in fact, there is throughput improvement; this can be accidental, but I observed it multiple times).

<details>
<summary>Before</summary>

<img width="1062" alt="Screenshot 2024-03-22 at 13 09 56" src="https://github.com/cockroachdb/cockroach/assets/3757441/09cd14c0-54ab-4585-8983-5562d5a5b49e">
<img width="1050" alt="Screenshot 2024-03-22 at 13 10 10" src="https://github.com/cockroachdb/cockroach/assets/3757441/93d89241-5f51-41d8-9b81-c52fe428f869">
<img width="1061" alt="Screenshot 2024-03-22 at 13 10 22" src="https://github.com/cockroachdb/cockroach/assets/3757441/446e589b-8cef-455e-bd53-75fc98191791">

</details>

<details>
<summary>After (with COCKROACH_RAFT_ENTRIES_MEMORY_LIMIT=24M)</summary>

<img width="1053" alt="Screenshot 2024-03-22 at 13 13 30" src="https://github.com/cockroachdb/cockroach/assets/3757441/9b1e7d67-0d46-48b4-af5c-f1f381b208f1">
<img width="1048" alt="Screenshot 2024-03-22 at 13 13 38" src="https://github.com/cockroachdb/cockroach/assets/3757441/79a91937-46a7-4461-aca2-27ed6387f44c">
<img width="1051" alt="Screenshot 2024-03-22 at 13 13 48" src="https://github.com/cockroachdb/cockroach/assets/3757441/7b02344f-5494-4e3a-9d04-48c6bbedd097">

</details>

---

Part of #102840

Release note (performance improvement): This change introduced pacing for pulling raft entries from storage when applying them to the state machine. This helps avoiding OOMs or excessive resource usage when a node catches up on many committed entries, which ultimately affects availability and performance (e.g., tail latencies). New metrics `raft.loaded_entries.bytes` and `raft.loaded_entries.reserved.bytes` gauge the total size of log entries pulled from raft for applying to the state machine, and can be used for figuring out a reasonable soft limit. The soft limit can be set via `COCKROACH_RAFT_ENTRIES_MEMORY_LIMIT` env var which is currently disabled by default.

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Mar 22, 2024
2 parents 5a5a248 + b5c519c commit bd3288e
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 33 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@
<tr><td>STORAGE</td><td>raft.entrycache.read_bytes</td><td>Counter of bytes in entries returned from the Raft entry cache</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.entrycache.size</td><td>Number of Raft entries in the Raft entry cache</td><td>Entry Count</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>raft.heartbeats.pending</td><td>Number of pending heartbeats and responses waiting to be coalesced</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>raft.loaded_entries.bytes</td><td>Bytes allocated by raft Storage.Entries calls that are still kept in memory</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>raft.loaded_entries.reserved.bytes</td><td>Bytes allocated by raft Storage.Entries calls that are still kept in memory</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>raft.process.applycommitted.latency</td><td>Latency histogram for applying all committed Raft commands in a Raft ready.<br/><br/>This measures the end-to-end latency of applying all commands in a Raft ready. Note that<br/>this closes over possibly multiple measurements of the &#39;raft.process.commandcommit.latency&#39;<br/>metric, which receives datapoints for each sub-batch processed in the process.</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>raft.process.commandcommit.latency</td><td>Latency histogram for applying a batch of Raft commands to the state machine.<br/><br/>This metric is misnamed: it measures the latency for *applying* a batch of<br/>committed Raft commands to a Replica state machine. This requires only<br/>non-durable I/O (except for replication configuration changes).<br/><br/>Note that a &#34;batch&#34; in this context is really a sub-batch of the batch received<br/>for application during raft ready handling. The<br/>&#39;raft.process.applycommitted.latency&#39; histogram is likely more suitable in most<br/>cases, as it measures the total latency across all sub-batches (i.e. the sum of<br/>commandcommit.latency for a complete batch).<br/></td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>raft.process.handleready.latency</td><td>Latency histogram for handling a Raft ready.<br/><br/>This measures the end-to-end-latency of the Raft state advancement loop, including:<br/>- snapshot application<br/>- SST ingestion<br/>- durably appending to the Raft log (i.e. includes fsync)<br/>- entry application (incl. replicated side effects, notably log truncation)<br/><br/>These include work measured in &#39;raft.process.commandcommit.latency&#39; and<br/>&#39;raft.process.applycommitted.latency&#39;. However, matching percentiles of these<br/>metrics may be *higher* than handleready, since not every handleready cycle<br/>leads to an update of the others. For example, under tpcc-100 on a single node,<br/>the handleready count is approximately twice the logcommit count (and logcommit<br/>count tracks closely with applycommitted count).<br/><br/>High percentile outliers can be caused by individual large Raft commands or<br/>storage layer blips. Lower percentile (e.g. 50th) increases are often driven by<br/>CPU exhaustion or storage layer slowdowns.<br/></td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/logstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "logstore",
srcs = [
"bytes_tracker.go",
"logstore.go",
"sideload.go",
"sideload_disk.go",
Expand Down Expand Up @@ -47,6 +48,7 @@ go_library(
go_test(
name = "logstore_test",
srcs = [
"bytes_tracker_test.go",
"logstore_bench_test.go",
"sideload_test.go",
"sync_waiter_test.go",
Expand Down
189 changes: 189 additions & 0 deletions pkg/kv/kvserver/logstore/bytes_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package logstore

import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/metric"
)

// raftEntriesMemoryLimit is the "global" soft limit for the total size of raft
// log entries pulled into memory simultaneously. Currently, this only includes
// the entries pulled as part of the local state machine application flow.
//
// No limit if <= 0.
var raftEntriesMemoryLimit = envutil.EnvOrDefaultBytes(
"COCKROACH_RAFT_ENTRIES_MEMORY_LIMIT", 0)

// NewRaftEntriesSoftLimit returns the SoftLimit configured with the default
// memory limit.
func NewRaftEntriesSoftLimit() *SoftLimit {
reservedBytesMetric := metric.NewGauge(metric.Metadata{
Name: "raft.loaded_entries.reserved.bytes",
Help: "Bytes allocated by raft Storage.Entries calls that are still kept in memory",
Measurement: "Memory",
Unit: metric.Unit_BYTES,
})
return &SoftLimit{Metric: reservedBytesMetric, Limit: raftEntriesMemoryLimit}
}

// The reservation sizes are optimized with the following constraints in mind:
//
// - Per kvserver.defaultRaftSchedulerConcurrency, there are <= 128 raft
// scheduler workers.
// - Per bulk.IngestBatchSize and kvserverbase.MaxCommandSize, raft entries are
// up to 16 MB in size. Entries are typically small, but can reach this
// limit, e.g. if these are AddSSTable commands or large batch INSERTs.
// Typically, compression reduces the max size to < 8MB.
const (
// minReserveSize is the granularity at which bytes are reserved from the
// SoftLimit for small allocations (below smallReserveSize). Batching small
// allocations this way amortizes the runtime cost of SoftLimit which is
// typically shared by many goroutines.
minReserveSize = 256 << 10
// smallReserveSize is the threshold below which the size is considered
// "small", and causes reserving in multiples of minReserveSize at a time.
smallReserveSize = 4 << 20
)

// SoftLimit is a byte size limit with weak guarantees. It tracks the global
// usage in a metric, and gives the user a hint when the usage has reached the
// soft limit.
//
// When at the limit, the user is supposed to back off acquiring resources, but
// is not strictly required to. There can be overflows when the user acquires
// resources optimistically. It is recommended to make sure that the possible
// overflows are bounded (e.g. there is a bounded number of workers, and each
// can optimistically acquire a bounded amount), and to account for these
// overflows when picking the soft limit.
//
// TODO(pav-kv): integrate with mon.BytesMonitor.
type SoftLimit struct {
Metric *metric.Gauge // the "global" usage metric
Limit int64 // the soft limit
}

// acquire accepts the given number of bytes for tracking.
func (s *SoftLimit) acquire(x uint64) {
s.Metric.Inc(int64(x))
}

// withinLimit returns true iff the bytes usage hasn't reached the soft limit.
func (s *SoftLimit) withinLimit() bool {
return s.Limit <= 0 || s.Metric.Value() < s.Limit
}

// release removes the given number of bytes from tracking.
func (s *SoftLimit) release(x uint64) {
s.Metric.Dec(int64(x))
}

// BytesAccount acquires bytes from SoftLimit, and releases them at the end of
// the lifetime.
type BytesAccount struct {
lim *SoftLimit
metric *metric.Gauge // the "local" usage metric
used uint64
reserved uint64 // reserved bytes are not used yet but are accounted for in lim
}

// NewAccount creates a BytesAccount consuming from this SoftLimit.
func (s *SoftLimit) NewAccount(metric *metric.Gauge) BytesAccount {
return BytesAccount{lim: s, metric: metric}
}

// Initialized returns true iff this BytesAccount is usable.
func (b *BytesAccount) Initialized() bool {
return b != nil && b.lim != nil
}

// Grow optimistically acquires and accounts for the given number of bytes.
// Returns false when this leads to the SoftLimit overflow, in which case the
// user should back off acquiring any more resources.
func (b *BytesAccount) Grow(x uint64) (withinLimit bool) {
if !b.Initialized() {
return true
}
withinLimit = true
if x > b.reserved {
need := roundSize(x - b.reserved)
b.lim.acquire(need)
b.reserved += need
if withinLimit = b.lim.withinLimit(); !withinLimit && b.reserved > x {
// If we reached the soft limit, drain the remainder of the reserved bytes
// to the limiter. We will not use it - the client typically stops calling
// Grow after this.
b.lim.release(b.reserved - x)
b.reserved = x
}
}
b.reserved -= x
b.used += x
if b.metric != nil {
b.metric.Inc(int64(x))
}
return withinLimit
}

// Clear returns all the reserved bytes into the SoftLimit.
func (b *BytesAccount) Clear() {
if !b.Initialized() {
return
}
if b.metric != nil {
b.metric.Dec(int64(b.used))
}
b.lim.release(b.used + b.reserved)
b.used, b.reserved = 0, 0
}

func roundSize(size uint64) uint64 {
if size >= smallReserveSize {
// Don't round the size up if the allocation is large. This also avoids edge
// cases in the math below if size == math.MaxInt64.
return size
}
return (size + minReserveSize - 1) / minReserveSize * minReserveSize
}

// sizeHelper helps to build a batch of entries with the total size not
// exceeding the static and dynamic byte limits. The user calls the add() method
// until sizeHelper.done becomes true. For all add() calls that returned true,
// the corresponding data can be added to the batch.
//
// In some exceptional cases, the size of the batch can exceed the limits:
// - maxBytes can be exceeded by the first added entry,
// - the soft limit can be exceeded by the last added entry.
type sizeHelper struct {
bytes uint64
maxBytes uint64
account *BytesAccount
done bool
}

// add returns true if the given number of bytes can be added to the batch
// without exceeding the maxBytes limit, or overflowing the bytes account. The
// first add call always returns true.
//
// Must not be called after sizeHelper.done becomes true.
func (s *sizeHelper) add(bytes uint64) bool {
if s.bytes == 0 { // this is the first entry, always take it
s.bytes += bytes
s.done = !s.account.Grow(bytes) || s.bytes > s.maxBytes
return true
} else if s.bytes+bytes > s.maxBytes {
s.done = true
return false
}
s.bytes += bytes
s.done = !s.account.Grow(bytes)
return true
}
122 changes: 122 additions & 0 deletions pkg/kv/kvserver/logstore/bytes_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package logstore

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/stretchr/testify/require"
)

func TestSoftLimiter(t *testing.T) {
lim := SoftLimit{Metric: metric.NewGauge(metric.Metadata{}), Limit: 1000}
require.Zero(t, lim.Metric.Value())
lim.acquire(100)
require.Equal(t, int64(100), lim.Metric.Value())
require.True(t, lim.withinLimit())
lim.acquire(900)
require.Equal(t, int64(1000), lim.Metric.Value())
require.False(t, lim.withinLimit())
lim.release(100)
require.Equal(t, int64(900), lim.Metric.Value())
require.True(t, lim.withinLimit())
lim.acquire(10000)
require.Equal(t, int64(10900), lim.Metric.Value())
require.False(t, lim.withinLimit())
lim.release(900)
require.Equal(t, int64(10000), lim.Metric.Value())
require.False(t, lim.withinLimit())
lim.release(10000)
require.Zero(t, lim.Metric.Value())
require.True(t, lim.withinLimit())

lim.Limit = 0 // no limit
lim.acquire(100)
require.True(t, lim.withinLimit())
lim.acquire(1000000)
require.True(t, lim.withinLimit())
}

func TestBytesAccount(t *testing.T) {
lim := SoftLimit{Metric: metric.NewGauge(metric.Metadata{}), Limit: 1 << 20}
a1, a2 := lim.NewAccount(nil), lim.NewAccount(nil)
require.True(t, a1.Grow(256<<10))

require.True(t, a2.Grow(128<<10))
require.True(t, a2.Grow(256<<10))
require.False(t, a2.Grow(512<<10))
require.Equal(t, uint64(512+256+128)<<10, a2.used)
require.Zero(t, a2.reserved)
// a2 returns all the reserved bytes to the limiter.
require.Equal(t, int64(a1.used+a1.reserved+a2.used), lim.Metric.Value())

require.False(t, a1.Grow(10))
require.Equal(t, uint64(10+256<<10), a1.used)
require.Zero(t, a1.reserved)
// a1 returns all the reserved bytes to the limiter.
require.Equal(t, int64(a1.used+a2.used), lim.Metric.Value())

require.False(t, lim.withinLimit())
a2.Clear()
require.True(t, lim.withinLimit())
require.True(t, a1.Grow(1000))
a1.Clear()

require.Zero(t, lim.Metric.Value())
}

func TestSizeHelper(t *testing.T) {
lim := SoftLimit{Metric: metric.NewGauge(metric.Metadata{}), Limit: 1 << 20}
for _, tt := range []struct {
max uint64
sizes []uint64
take int
}{
// Limits are not reached.
{max: 1023, sizes: []uint64{10}, take: 1},
{max: 100, sizes: []uint64{10, 10, 20, 10}, take: 4},
// Max size limit is reached.
{max: 100, sizes: []uint64{30, 30, 30, 30}, take: 3},
{max: 100, sizes: []uint64{100, 1}, take: 1},
{max: 100, sizes: []uint64{200, 1}, take: 1},
{max: 1000, sizes: []uint64{100, 900, 100}, take: 2},
{max: 1000, sizes: []uint64{100, 500, 10000}, take: 2},
{max: 1000, sizes: []uint64{100, 1000}, take: 1},
// Soft limiter kicks in.
{max: 64 << 20, sizes: []uint64{8 << 20}, take: 1},
{max: 64 << 20, sizes: []uint64{4 << 20, 4 << 20}, take: 1},
{max: 64 << 20, sizes: []uint64{1 << 20, 1 << 20, 1 << 20}, take: 1},
{max: 64 << 20, sizes: []uint64{100, 1 << 20, 1 << 20}, take: 2},
{max: 64 << 20, sizes: []uint64{100, 1000, 1234, 1 << 20, 1 << 20}, take: 4},
} {
t.Run("", func(t *testing.T) {
acc := lim.NewAccount(nil)
defer acc.Clear()
sh := sizeHelper{maxBytes: tt.max, account: &acc}

took := 0
for ln := len(tt.sizes); took < ln && !sh.done; took++ {
if size := tt.sizes[took]; !sh.add(size) {
require.True(t, sh.done)
break
}
}
require.Equal(t, tt.take, took)

var want uint64
for _, size := range tt.sizes[:tt.take] {
want += size
}
require.Equal(t, want, sh.bytes)
})
}
}
Loading

0 comments on commit bd3288e

Please sign in to comment.