From 4e3ea0accc741ffe3357d5b9532879a1f672a125 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 25 Apr 2023 14:30:33 -0400 Subject: [PATCH 1/3] kvserver,storage: BatchCommitStats plumbing for raft appends and metrics Pebble now produces detailed stats for a batch commit. These are partially plumbed into kvserver, for (a) log statements related to raft appends, (b) cumulative duration metrics applicable for all writes (raft and state machine). Informs https://github.com/cockroachdb/pebble/issues/1943 Epic: none Release note: None --- pkg/kv/kvserver/logstore/logstore.go | 15 +++- .../kvserver/logstore/logstore_bench_test.go | 2 +- pkg/kv/kvserver/logstore/sync_waiter.go | 6 +- pkg/kv/kvserver/metrics.go | 80 +++++++++++++++++++ pkg/kv/kvserver/replica_raft.go | 13 ++- pkg/kv/kvserver/replica_raft_test.go | 15 +++- pkg/kv/kvserver/spanset/batch.go | 4 + .../testdata/handle_raft_ready_stats.txt | 2 +- pkg/storage/engine.go | 44 +++++++++- pkg/storage/pebble.go | 27 ++++++- pkg/storage/pebble_batch.go | 35 +++++--- pkg/storage/pebble_iterator.go | 10 +-- 12 files changed, 223 insertions(+), 30 deletions(-) diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index 5db622574419..ece2ffd87f5b 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -87,6 +87,9 @@ type AppendStats struct { PebbleBegin time.Time PebbleEnd time.Time PebbleBytes int64 + // Only set when !NonBlocking, which means almost never, since + // kv.raft_log.non_blocking_synchronization.enabled defaults to true. + PebbleCommitStats storage.BatchCommitStats Sync bool // If true, PebbleEnd-PebbleBegin does not include the sync time. @@ -113,8 +116,9 @@ type LogStore struct { // SyncCallback is a callback that is notified when a raft log write has been // durably committed to disk. The function is handed the response messages that // are associated with the MsgStorageAppend that triggered the fsync. +// commitStats is populated iff this was a non-blocking sync. type SyncCallback interface { - OnLogSync(context.Context, []raftpb.Message) + OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) } func newStoreEntriesBatch(eng storage.Engine) storage.Batch { @@ -257,6 +261,7 @@ func (s *LogStore) storeEntriesAndCommitBatch( ctx: ctx, cb: cb, msgs: m.Responses, + batch: batch, metrics: s.Metrics, logCommitBegin: stats.PebbleBegin, } @@ -269,10 +274,11 @@ func (s *LogStore) storeEntriesAndCommitBatch( return RaftState{}, errors.Wrap(err, expl) } stats.PebbleEnd = timeutil.Now() + stats.PebbleCommitStats = batch.CommitStats() if wantsSync { logCommitEnd := stats.PebbleEnd s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds()) - cb.OnLogSync(ctx, m.Responses) + cb.OnLogSync(ctx, m.Responses, storage.BatchCommitStats{}) } } stats.Sync = wantsSync @@ -325,6 +331,8 @@ type nonBlockingSyncWaiterCallback struct { ctx context.Context cb SyncCallback msgs []raftpb.Message + // Used to extract stats. This is the batch that has been synced. + batch storage.WriteBatch // Used to record Metrics. metrics Metrics logCommitBegin time.Time @@ -334,7 +342,8 @@ type nonBlockingSyncWaiterCallback struct { func (cb *nonBlockingSyncWaiterCallback) run() { dur := timeutil.Since(cb.logCommitBegin).Nanoseconds() cb.metrics.RaftLogCommitLatency.RecordValue(dur) - cb.cb.OnLogSync(cb.ctx, cb.msgs) + commitStats := cb.batch.CommitStats() + cb.cb.OnLogSync(cb.ctx, cb.msgs, commitStats) cb.release() } diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index da7604427ed6..a22268c24057 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -38,7 +38,7 @@ func (b *discardBatch) Commit(bool) error { type noopSyncCallback struct{} -func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {} +func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) {} func BenchmarkLogStore_StoreEntries(b *testing.B) { defer log.Scope(b).Close(b) diff --git a/pkg/kv/kvserver/logstore/sync_waiter.go b/pkg/kv/kvserver/logstore/sync_waiter.go index 31880796dfc0..9e422b294008 100644 --- a/pkg/kv/kvserver/logstore/sync_waiter.go +++ b/pkg/kv/kvserver/logstore/sync_waiter.go @@ -95,8 +95,8 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { if err := w.wg.SyncWait(); err != nil { log.Fatalf(ctx, "SyncWait error: %+v", err) } - w.wg.Close() w.cb.run() + w.wg.Close() case <-stopper.ShouldQuiesce(): return } @@ -108,7 +108,9 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { // durably committed. It may never be called in case the stopper stops. // // The syncWaiter will be Closed after its SyncWait method completes. It must -// not be Closed by the caller. +// not be Closed by the caller. The cb is called before the syncWaiter is +// closed, in case the cb implementation needs to extract something form the +// syncWaiter. // // If the SyncWaiterLoop has already been stopped, the callback will never be // called. diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index df31b2c6cdf7..3712fd528669 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -759,6 +759,61 @@ bytes preserved during flushes and compactions over the lifetime of the process. Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaBatchCommitCount = metric.Metadata{ + Name: "storage.batch-commit.count", + Help: "Count of batch commits. See storage.AggregatedBatchCommitStats for details.", + Measurement: "Commit Ops", + Unit: metric.Unit_COUNT, + } + metaBatchCommitDuration = metric.Metadata{ + Name: "storage.batch-commit.duration", + Help: "Cumulative time spent in batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitSemWaitDuration = metric.Metadata{ + Name: "storage.batch-commit.sem-wait.duration", + Help: "Cumulative time spent in semaphore wait, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitWALQWaitDuration = metric.Metadata{ + Name: "storage.batch-commit.wal-queue-wait.duration", + Help: "Cumulative time spent waiting for memory blocks in the WAL queue, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitMemStallDuration = metric.Metadata{ + Name: "storage.batch-commit.mem-stall.duration", + Help: "Cumulative time spent in a write stall due to too many memtables, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitL0StallDuration = metric.Metadata{ + Name: "storage.batch-commit.l0-stall.duration", + Help: "Cumulative time spent in a write stall due to high read amplification in L0, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitWALRotDuration = metric.Metadata{ + Name: "storage.batch-commit.wal-rotation.duration", + Help: "Cumulative time spent waiting for WAL rotation, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitCommitWaitDuration = metric.Metadata{ + Name: "storage.batch-commit.commit-wait.duration", + Help: "Cumulative time spent waiting for WAL sync, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } ) var ( @@ -2068,6 +2123,14 @@ type StoreMetrics struct { FlushableIngestCount *metric.Gauge FlushableIngestTableCount *metric.Gauge FlushableIngestTableSize *metric.Gauge + BatchCommitCount *metric.Gauge + BatchCommitDuration *metric.Gauge + BatchCommitSemWaitDuration *metric.Gauge + BatchCommitWALQWaitDuration *metric.Gauge + BatchCommitMemStallDuration *metric.Gauge + BatchCommitL0StallDuration *metric.Gauge + BatchCommitWALRotWaitDuration *metric.Gauge + BatchCommitCommitWaitDuration *metric.Gauge RdbCheckpoints *metric.Gauge @@ -2684,6 +2747,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { FlushableIngestCount: metric.NewGauge(metaFlushableIngestCount), FlushableIngestTableCount: metric.NewGauge(metaFlushableIngestTableCount), FlushableIngestTableSize: metric.NewGauge(metaFlushableIngestTableBytes), + BatchCommitCount: metric.NewGauge(metaBatchCommitCount), + BatchCommitDuration: metric.NewGauge(metaBatchCommitDuration), + BatchCommitSemWaitDuration: metric.NewGauge(metaBatchCommitSemWaitDuration), + BatchCommitWALQWaitDuration: metric.NewGauge(metaBatchCommitWALQWaitDuration), + BatchCommitMemStallDuration: metric.NewGauge(metaBatchCommitMemStallDuration), + BatchCommitL0StallDuration: metric.NewGauge(metaBatchCommitL0StallDuration), + BatchCommitWALRotWaitDuration: metric.NewGauge(metaBatchCommitWALRotDuration), + BatchCommitCommitWaitDuration: metric.NewGauge(metaBatchCommitCommitWaitDuration), RdbCheckpoints: metric.NewGauge(metaRdbCheckpoints), @@ -3035,6 +3106,15 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { sm.FlushableIngestCount.Update(int64(m.Flush.AsIngestCount)) sm.FlushableIngestTableCount.Update(int64(m.Flush.AsIngestTableCount)) sm.FlushableIngestTableSize.Update(int64(m.Flush.AsIngestBytes)) + sm.BatchCommitCount.Update(int64(m.BatchCommitStats.Count)) + sm.BatchCommitDuration.Update(int64(m.BatchCommitStats.TotalDuration)) + sm.BatchCommitSemWaitDuration.Update(int64(m.BatchCommitStats.SemaphoreWaitDuration)) + sm.BatchCommitWALQWaitDuration.Update(int64(m.BatchCommitStats.WALQueueWaitDuration)) + sm.BatchCommitMemStallDuration.Update(int64(m.BatchCommitStats.MemTableWriteStallDuration)) + sm.BatchCommitL0StallDuration.Update(int64(m.BatchCommitStats.L0ReadAmpWriteStallDuration)) + sm.BatchCommitWALRotWaitDuration.Update(int64(m.BatchCommitStats.WALRotationDuration)) + sm.BatchCommitCommitWaitDuration.Update(int64(m.BatchCommitStats.CommitWaitDuration)) + // Update the maximum number of L0 sub-levels seen. sm.l0SublevelsTracker.Lock() sm.l0SublevelsTracker.swag.Record(timeutil.Now(), float64(m.Levels[0].Sublevels)) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 80d3b162410b..d7dd421e8db7 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -688,6 +688,9 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) { p.Printf(", snapshot ignored") } } + if !(s.append.PebbleCommitStats == storage.BatchCommitStats{}) { + p.Printf(" pebble stats: [%s]", s.append.PebbleCommitStats) + } } func (s handleRaftReadyStats) String() string { @@ -1515,9 +1518,15 @@ func (r *Replica) maybeCoalesceHeartbeat( // replicaSyncCallback implements the logstore.SyncCallback interface. type replicaSyncCallback Replica -func (r *replicaSyncCallback) OnLogSync(ctx context.Context, msgs []raftpb.Message) { +func (r *replicaSyncCallback) OnLogSync( + ctx context.Context, msgs []raftpb.Message, commitStats storage.BatchCommitStats, +) { + repl := (*Replica)(r) // Send MsgStorageAppend's responses. - (*Replica)(r).sendRaftMessages(ctx, msgs, nil /* blocked */, false /* willDeliverLocal */) + repl.sendRaftMessages(ctx, msgs, nil /* blocked */, false /* willDeliverLocal */) + if commitStats.TotalDuration > defaultReplicaRaftMuWarnThreshold { + log.Infof(repl.raftCtx, "slow non-blocking raft commit: %s", commitStats) + } } // sendRaftMessages sends a slice of Raft messages. diff --git a/pkg/kv/kvserver/replica_raft_test.go b/pkg/kv/kvserver/replica_raft_test.go index 95c344c4f59a..5a6c80869d38 100644 --- a/pkg/kv/kvserver/replica_raft_test.go +++ b/pkg/kv/kvserver/replica_raft_test.go @@ -17,11 +17,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/pebble" "github.com/cockroachdb/redact" "github.com/stretchr/testify/assert" "go.etcd.io/raft/v3/tracker" @@ -98,7 +100,18 @@ func Test_handleRaftReadyStats_SafeFormat(t *testing.T) { PebbleBegin: ts(3), PebbleEnd: ts(4), PebbleBytes: 1024 * 5, - Sync: true, + PebbleCommitStats: storage.BatchCommitStats{ + BatchCommitStats: pebble.BatchCommitStats{ + TotalDuration: 100 * time.Millisecond, + SemaphoreWaitDuration: 2 * time.Millisecond, + WALQueueWaitDuration: 5 * time.Millisecond, + MemTableWriteStallDuration: 8 * time.Millisecond, + L0ReadAmpWriteStallDuration: 11 * time.Millisecond, + WALRotationDuration: 14 * time.Millisecond, + CommitWaitDuration: 17 * time.Millisecond, + }, + }, + Sync: true, }, tSnapBegin: ts(4), tSnapEnd: ts(5), diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 805fef2bcd8d..5d2a773379ae 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -785,6 +785,10 @@ func (s spanSetBatch) Repr() []byte { return s.b.Repr() } +func (s spanSetBatch) CommitStats() storage.BatchCommitStats { + return s.b.CommitStats() +} + // NewBatch returns a storage.Batch that asserts access of the underlying // Batch against the given SpanSet. We only consider span boundaries, associated // timestamps are not considered. diff --git a/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt b/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt index 0dfb67499ba7..1713c37c67dc 100644 --- a/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt +++ b/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt @@ -1,3 +1,3 @@ echo ---- -raft ready handling: 5.00s [append=1.00s, apply=1.00s, sync=1.00s, snap=1.00s, other=1.00s], wrote [append-batch=5.0 KiB, append-ent=1.0 KiB (7), append-sst=5.0 MiB (3), apply=3 B (2 in 9 batches)], state_assertions=4, snapshot applied +raft ready handling: 5.00s [append=1.00s, apply=1.00s, sync=1.00s, snap=1.00s, other=1.00s], wrote [append-batch=5.0 KiB, append-ent=1.0 KiB (7), append-sst=5.0 MiB (3), apply=3 B (2 in 9 batches)], state_assertions=4, snapshot applied pebble stats: [commit-wait 17ms wal-q 5ms mem-stall 8ms l0-stall 11ms wal-rot 14ms sem 2ms] diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 16f3904c7367..3ae1dd5a5123 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/vfs" + "github.com/cockroachdb/redact" prometheusgo "github.com/prometheus/client_model/go" ) @@ -392,7 +393,7 @@ type EngineIterator interface { // a clone of an existing iterator. type CloneContext struct { rawIter pebbleiter.Iterator - statsReporter statsReporter + statsReporter iterStatsReporter } // IterOptions contains options used to create an {MVCC,Engine}Iterator. @@ -1024,13 +1025,43 @@ type WriteBatch interface { // Repr returns the underlying representation of the batch and can be used to // reconstitute the batch on a remote node using Writer.ApplyBatchRepr(). Repr() []byte + // CommitStats returns stats related to committing the batch. Should be + // called after Batch.Commit. If CommitNoSyncWait is used, it should be + // called after the call to SyncWait. + CommitStats() BatchCommitStats +} + +type BatchCommitStats struct { + pebble.BatchCommitStats +} + +// SafeFormat implements redact.SafeFormatter. It does not print the total +// duration. +func (stats BatchCommitStats) SafeFormat(p redact.SafePrinter, _ rune) { + p.Printf("commit-wait %s", stats.CommitWaitDuration) + if stats.WALQueueWaitDuration > 0 { + p.Printf(" wal-q %s", stats.WALQueueWaitDuration) + } + if stats.MemTableWriteStallDuration > 0 { + p.Printf(" mem-stall %s", stats.MemTableWriteStallDuration) + } + if stats.L0ReadAmpWriteStallDuration > 0 { + p.Printf(" l0-stall %s", stats.L0ReadAmpWriteStallDuration) + } + if stats.WALRotationDuration > 0 { + p.Printf(" wal-rot %s", stats.WALRotationDuration) + } + if stats.SemaphoreWaitDuration > 0 { + p.Printf(" sem %s", stats.SemaphoreWaitDuration) + } } // Metrics is a set of Engine metrics. Most are contained in the embedded // *pebble.Metrics struct, which has its own documentation. type Metrics struct { *pebble.Metrics - Iterator AggregatedIteratorStats + Iterator AggregatedIteratorStats + BatchCommitStats AggregatedBatchCommitStats // DiskSlowCount counts the number of times Pebble records disk slowness. DiskSlowCount int64 // DiskStallCount counts the number of times Pebble observes slow writes @@ -1092,6 +1123,15 @@ type AggregatedIteratorStats struct { InternalSteps int } +// AggregatedBatchCommitStats hold cumulative stats summed over all the +// batches that committed at the engine. Since these are durations, only the +// mean (over an interval) can be recovered. We can change some of these to +// histograms once we know which ones are more useful. +type AggregatedBatchCommitStats struct { + Count uint64 + BatchCommitStats +} + // MetricsForInterval is a set of pebble.Metrics that need to be saved in order to // compute metrics according to an interval. type MetricsForInterval struct { diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 815920edf785..dbbd22a57c5a 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -773,7 +773,10 @@ type Pebble struct { syncutil.Mutex AggregatedIteratorStats } - + batchCommitStats struct { + syncutil.Mutex + AggregatedBatchCommitStats + } // Relevant options copied over from pebble.Options. unencryptedFS vfs.FS logCtx context.Context @@ -1350,6 +1353,19 @@ func (p *Pebble) aggregateIterStats(stats IteratorStats) { p.iterStats.InternalSteps += stats.Stats.ForwardStepCount[pebble.InternalIterCall] + stats.Stats.ReverseStepCount[pebble.InternalIterCall] } +func (p *Pebble) aggregateBatchCommitStats(stats BatchCommitStats) { + p.batchCommitStats.Lock() + p.batchCommitStats.Count++ + p.batchCommitStats.TotalDuration += stats.TotalDuration + p.batchCommitStats.SemaphoreWaitDuration += stats.SemaphoreWaitDuration + p.batchCommitStats.WALQueueWaitDuration += stats.WALQueueWaitDuration + p.batchCommitStats.MemTableWriteStallDuration += stats.MemTableWriteStallDuration + p.batchCommitStats.L0ReadAmpWriteStallDuration += stats.L0ReadAmpWriteStallDuration + p.batchCommitStats.WALRotationDuration += stats.WALRotationDuration + p.batchCommitStats.CommitWaitDuration += stats.CommitWaitDuration + p.batchCommitStats.Unlock() +} + // Closed implements the Engine interface. func (p *Pebble) Closed() bool { return p.closed @@ -1785,6 +1801,9 @@ func (p *Pebble) GetMetrics() Metrics { p.iterStats.Lock() m.Iterator = p.iterStats.AggregatedIteratorStats p.iterStats.Unlock() + p.batchCommitStats.Lock() + m.BatchCommitStats = p.batchCommitStats.AggregatedBatchCommitStats + p.batchCommitStats.Unlock() return m } @@ -1892,7 +1911,7 @@ func (p *Pebble) GetAuxiliaryDir() string { // NewBatch implements the Engine interface. func (p *Pebble) NewBatch() Batch { - return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings, p) + return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings, p, p) } // NewReadOnly implements the Engine interface. @@ -1902,12 +1921,12 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter { // NewUnindexedBatch implements the Engine interface. func (p *Pebble) NewUnindexedBatch() Batch { - return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings, p) + return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings, p, p) } // NewWriteBatch implements the Engine interface. func (p *Pebble) NewWriteBatch() WriteBatch { - return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings, p) + return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings, p, p) } // NewSnapshot implements the Engine interface. diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index ec152ca8f12c..fcfa7e7789f0 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -55,7 +55,8 @@ type pebbleBatch struct { // scratch space for wrappedIntentWriter. scratch []byte - statsReporter statsReporter + iterStatsReporter iterStatsReporter + batchStatsReporter batchStatsReporter settings *cluster.Settings shouldWriteLocalTimestamps bool shouldWriteLocalTimestampsCached bool @@ -69,13 +70,18 @@ var pebbleBatchPool = sync.Pool{ }, } +type batchStatsReporter interface { + aggregateBatchCommitStats(stats BatchCommitStats) +} + // Instantiates a new pebbleBatch. func newPebbleBatch( db *pebble.DB, batch *pebble.Batch, writeOnly bool, settings *cluster.Settings, - statsReporter statsReporter, + iterStatsReporter iterStatsReporter, + batchStatsReporter batchStatsReporter, ) *pebbleBatch { pb := pebbleBatchPool.Get().(*pebbleBatch) *pb = pebbleBatch{ @@ -102,9 +108,10 @@ func newPebbleBatch( upperBoundBuf: pb.normalEngineIter.upperBoundBuf, reusable: true, }, - writeOnly: writeOnly, - statsReporter: statsReporter, - settings: settings, + writeOnly: writeOnly, + iterStatsReporter: iterStatsReporter, + batchStatsReporter: batchStatsReporter, + settings: settings, } pb.wrappedIntentWriter = wrapIntentWriter(pb) return pb @@ -185,14 +192,14 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ rawIter: p.iter, - statsReporter: p.statsReporter, + statsReporter: p.iterStatsReporter, }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.statsReporter) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -221,14 +228,14 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ rawIter: p.iter, - statsReporter: p.statsReporter, + statsReporter: p.iterStatsReporter, }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.statsReporter) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -557,6 +564,8 @@ func (p *pebbleBatch) Commit(sync bool) error { // or don't have after they receive an error from this method. panic(err) } + p.batchStatsReporter.aggregateBatchCommitStats( + BatchCommitStats{p.batch.CommitStats()}) return err } @@ -595,6 +604,9 @@ func (p *pebbleBatch) SyncWait() error { // or don't have after they receive an error from this method. panic(err) } + p.batchStatsReporter.aggregateBatchCommitStats( + BatchCommitStats{p.batch.CommitStats()}) + return err } @@ -624,6 +636,11 @@ func (p *pebbleBatch) Repr() []byte { return reprCopy } +// CommitStats implements the Batch interface. +func (p *pebbleBatch) CommitStats() BatchCommitStats { + return BatchCommitStats{BatchCommitStats: p.batch.CommitStats()} +} + // ShouldWriteLocalTimestamps implements the Writer interface. func (p *pebbleBatch) ShouldWriteLocalTimestamps(ctx context.Context) bool { // pebbleBatch is short-lived, so cache the value for performance. diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index a69ae1b9dc74..973cfe5e007a 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -49,7 +49,7 @@ type pebbleIterator struct { // statsReporter is used to sum iterator stats across all the iterators // during the lifetime of the Engine when the iterator is closed or its // stats reset. It's intended to be used with (*Pebble). It must not be nil. - statsReporter statsReporter + statsReporter iterStatsReporter // Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips // SSTables based on MVCC/Engine key when true. @@ -75,7 +75,7 @@ type pebbleIterator struct { mvccDone bool } -type statsReporter interface { +type iterStatsReporter interface { aggregateIterStats(IteratorStats) } @@ -99,7 +99,7 @@ func newPebbleIterator( handle pebble.Reader, opts IterOptions, durability DurabilityRequirement, - statsReporter statsReporter, + statsReporter iterStatsReporter, ) *pebbleIterator { p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive @@ -158,7 +158,7 @@ func (p *pebbleIterator) init( iter pebbleiter.Iterator, opts IterOptions, durability DurabilityRequirement, - statsReporter statsReporter, + statsReporter iterStatsReporter, ) { *p = pebbleIterator{ iter: iter, @@ -185,7 +185,7 @@ func (p *pebbleIterator) initReuseOrCreate( clone bool, opts IterOptions, durability DurabilityRequirement, - statsReporter statsReporter, + statsReporter iterStatsReporter, ) { if iter != nil && !clone { p.init(iter, opts, durability, statsReporter) From 2512ef450cb13a7efe2f435b1a31914a25f1047f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 14 May 2023 12:09:26 +0000 Subject: [PATCH 2/3] builtins: add `crdb_internal.unsafe_lock_replica()` This patch adds `crdb_internal.unsafe_lock_replica()` which can be used to (un)lock a given replica mutex on the gateway node. It requires the env var `COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true`. This is useful to test replica deadlocks or stalls in end-to-end resiliency tests. Epic: none Release note: None --- pkg/kv/kvserver/kvserverbase/BUILD.bazel | 1 + pkg/kv/kvserver/kvserverbase/stores.go | 5 ++ pkg/kv/kvserver/stores_base.go | 10 ++++ pkg/sql/sem/builtins/BUILD.bazel | 1 + .../builtins/builtinconstants/constants.go | 1 + pkg/sql/sem/builtins/builtins.go | 59 +++++++++++++++++++ pkg/sql/sem/builtins/fixed_oids.go | 1 + 7 files changed, 78 insertions(+) diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel index 1bb735f64459..7ef13954ba2e 100644 --- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//pkg/util/humanizeutil", "//pkg/util/log", "//pkg/util/quotapool", + "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/kvserverbase/stores.go b/pkg/kv/kvserver/kvserverbase/stores.go index e0a0c0fd2c05..d3725d5aa59b 100644 --- a/pkg/kv/kvserver/kvserverbase/stores.go +++ b/pkg/kv/kvserver/kvserverbase/stores.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/errorutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" ) @@ -38,6 +39,10 @@ type Store interface { // SetQueueActive disables/enables the named queue. SetQueueActive(active bool, queue string) error + + // GetReplicaMutexForTesting returns the mutex of the replica with the given + // range ID, or nil if no replica was found. This is used for testing. + GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex } // UnsupportedStoresIterator is a StoresIterator that only returns "unsupported" diff --git a/pkg/kv/kvserver/stores_base.go b/pkg/kv/kvserver/stores_base.go index dc608d9d86cd..018ca8125097 100644 --- a/pkg/kv/kvserver/stores_base.go +++ b/pkg/kv/kvserver/stores_base.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" ) @@ -93,3 +94,12 @@ func (s *baseStore) SetQueueActive(active bool, queue string) error { kvQueue.SetDisabled(!active) return nil } + +// GetReplicaMutexForTesting is part of kvserverbase.Store. +func (s *baseStore) GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex { + store := (*Store)(s) + if repl := store.GetReplicaIfExists(rangeID); repl != nil { + return &repl.mu.RWMutex + } + return nil +} diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index 885c0eb5993c..7685e2e0bb85 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -104,6 +104,7 @@ go_library( "//pkg/util/bitarray", "//pkg/util/duration", "//pkg/util/encoding", + "//pkg/util/envutil", "//pkg/util/errorutil/unimplemented", "//pkg/util/fuzzystrmatch", "//pkg/util/hlc", diff --git a/pkg/sql/sem/builtins/builtinconstants/constants.go b/pkg/sql/sem/builtins/builtinconstants/constants.go index e9474062119d..13eb43f35465 100644 --- a/pkg/sql/sem/builtins/builtinconstants/constants.go +++ b/pkg/sql/sem/builtins/builtinconstants/constants.go @@ -57,6 +57,7 @@ const ( CategorySystemInfo = "System info" CategorySystemRepair = "System repair" CategoryStreamIngestion = "Stream Ingestion" + CategoryTesting = "Testing" ) const ( diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 0d4081555afd..0f083bda30d2 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -79,6 +79,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/fuzzystrmatch" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -87,6 +88,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeofday" "github.com/cockroachdb/cockroach/pkg/util/timetz" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -149,6 +151,10 @@ CockroachDB supports the following flags: | p | no | no | | m/n | no | yes |` +// enableUnsafeTestBuiltins enables unsafe builtins for testing purposes. +var enableUnsafeTestBuiltins = envutil.EnvOrDefaultBool( + "COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS", false) + // builtinDefinition represents a built-in function before it becomes // a tree.FunctionDefinition. type builtinDefinition struct { @@ -6277,6 +6283,59 @@ DO NOT USE -- USE 'CREATE TENANT' INSTEAD`, Volatility: volatility.Volatile, }, ), + "crdb_internal.unsafe_lock_replica": makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategoryTesting, + DistsqlBlocklist: true, + Undocumented: true, + }, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "range_id", Typ: types.Int}, + {Name: "lock", Typ: types.Bool}, // true to lock, false to unlock + }, + ReturnType: tree.FixedReturnType(types.Bool), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + if !enableUnsafeTestBuiltins { + return nil, errors.Errorf("requires COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true") + } else if isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx); err != nil { + return nil, err + } else if !isAdmin { + return nil, errInsufficientPriv + } + + rangeID := roachpb.RangeID(*args[0].(*tree.DInt)) + lock := *args[1].(*tree.DBool) + + var replicaMu *syncutil.RWMutex + if err := evalCtx.KVStoresIterator.ForEachStore(func(store kvserverbase.Store) error { + if replicaMu == nil { + replicaMu = store.GetReplicaMutexForTesting(rangeID) + } + return nil + }); err != nil { + return nil, err + } else if replicaMu == nil { + return nil, kvpb.NewRangeNotFoundError(rangeID, 0) + } + + log.Warningf(ctx, "crdb_internal.unsafe_lock_replica on r%d with lock=%t", rangeID, lock) + + if lock { + replicaMu.Lock() // deadlocks if called twice + } else { + // Unlocking a non-locked mutex will irrecoverably fatal the process. + // We do TryLock() as a best-effort guard against this, but it will be + // racey. The caller is expected to have locked the mutex first. + replicaMu.TryLock() + replicaMu.Unlock() + } + return tree.DBoolTrue, nil + }, + Info: "This function is used only by CockroachDB's developers for testing purposes.", + Volatility: volatility.Volatile, + }, + ), "crdb_internal.upsert_dropped_relation_gc_ttl": makeBuiltin( tree.FunctionProperties{ Category: builtinconstants.CategorySystemRepair, diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index 54e64c0d484b..f619f7cd50d5 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -2382,6 +2382,7 @@ var builtinOidsArray = []string{ 2409: `st_bdpolyfromtext(str: string, srid: int) -> geometry`, 2410: `crdb_internal.pretty_value(raw_value: bytes) -> string`, 2411: `to_char(date: date, format: string) -> string`, + 2412: `crdb_internal.unsafe_lock_replica(range_id: int, lock: bool) -> bool`, } var builtinOidsBySignature map[string]oid.Oid From eabc4fb9e3b7b99351ce81f471f7813c90021a55 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 14 May 2023 12:59:19 +0000 Subject: [PATCH 3/3] roachtest: add `failover` variants for replica deadlocks This patch adds `failover` variants that benchmark the pMax unavailability when 5 random leaseholder replicas are deadlocked on a node. The node remains alive, and continues to heartbeat both via liveness and RPC, as do other replicas on the node. The deadlocked replicas will be entirely unresponsive however, and depending on the timing this can also cause deadlocks on other mutexes. This failure mode is representative of all failure modes that leave a replica unresponsive. This includes disk stalls, which will also hold replica locks during the stall, but disk stalls have specialized handling in e.g. Pebble and during node heartbeats that eventually resolve them while deadlocks don't. We currently don't handle this failure mode at all, and expect permanent unavailability. Epic: none Release note: None --- pkg/cmd/roachtest/tests/failover.go | 108 ++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/pkg/cmd/roachtest/tests/failover.go b/pkg/cmd/roachtest/tests/failover.go index 2643f03e69a3..cd52c1b1104a 100644 --- a/pkg/cmd/roachtest/tests/failover.go +++ b/pkg/cmd/roachtest/tests/failover.go @@ -22,8 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -70,6 +72,7 @@ func registerFailover(r registry.Registry) { failureModeBlackholeRecv, failureModeBlackholeSend, failureModeCrash, + failureModeDeadlock, failureModeDiskStall, failureModePause, } { @@ -606,6 +609,7 @@ func runFailoverNonSystem( // Create cluster. opts := option.DefaultStartOpts() settings := install.MakeClusterSettings() + settings.Env = append(settings.Env, "COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true") failer := makeFailer(t, c, failureMode, opts, settings) failer.Setup(ctx) @@ -745,6 +749,7 @@ func runFailoverLiveness( // Create cluster. Don't schedule a backup as this roachtest reports to roachperf. opts := option.DefaultStartOptsNoBackups() settings := install.MakeClusterSettings() + settings.Env = append(settings.Env, "COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true") failer := makeFailer(t, c, failureMode, opts, settings) failer.Setup(ctx) @@ -888,6 +893,7 @@ func runFailoverSystemNonLiveness( // Create cluster. opts := option.DefaultStartOpts() settings := install.MakeClusterSettings() + settings.Env = append(settings.Env, "COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true") failer := makeFailer(t, c, failureMode, opts, settings) failer.Setup(ctx) @@ -1006,6 +1012,7 @@ const ( failureModeBlackholeRecv failureMode = "blackhole-recv" failureModeBlackholeSend failureMode = "blackhole-send" failureModeCrash failureMode = "crash" + failureModeDeadlock failureMode = "deadlock" failureModeDiskStall failureMode = "disk-stall" failureModePause failureMode = "pause" failureModeNoop failureMode = "noop" @@ -1064,6 +1071,15 @@ func makeFailerWithoutLocalNoop( startOpts: opts, startSettings: settings, } + case failureModeDeadlock: + return &deadlockFailer{ + t: t, + c: c, + startOpts: opts, + startSettings: settings, + onlyLeaseholders: true, + numReplicas: 5, + } case failureModeDiskStall: return &diskStallFailer{ t: t, @@ -1249,6 +1265,98 @@ func (f *crashFailer) Recover(ctx context.Context, nodeID int) { f.c.Start(ctx, f.t.L(), f.startOpts, f.startSettings, f.c.Node(nodeID)) } +// deadlockFailer deadlocks replicas. In addition to deadlocks, this failure +// mode is representative of all failure modes that leave a replica unresponsive +// while the node is otherwise still functional. +type deadlockFailer struct { + t test.Test + c cluster.Cluster + m cluster.Monitor + startOpts option.StartOpts + startSettings install.ClusterSettings + onlyLeaseholders bool + numReplicas int + locks map[int][]roachpb.RangeID // track locks by node +} + +func (f *deadlockFailer) String() string { return string(failureModeDeadlock) } +func (f *deadlockFailer) CanUseLocal() bool { return true } +func (f *deadlockFailer) Setup(context.Context) {} +func (f *deadlockFailer) Ready(_ context.Context, m cluster.Monitor) { f.m = m } +func (f *deadlockFailer) Cleanup(context.Context) {} + +func (f *deadlockFailer) Fail(ctx context.Context, nodeID int) { + require.NotZero(f.t, f.numReplicas) + if f.locks == nil { + f.locks = map[int][]roachpb.RangeID{} + } + + ctx, cancel := context.WithTimeout(ctx, 20*time.Second) // can take a while to lock + defer cancel() + + predicate := `$1 = ANY(replicas)` + if f.onlyLeaseholders { + predicate += ` AND lease_holder = $1` + } + + conn := f.c.Conn(ctx, f.t.L(), nodeID) + rows, err := conn.QueryContext(ctx, fmt.Sprintf( + `SELECT range_id, crdb_internal.unsafe_lock_replica(range_id, true) FROM [ + SELECT range_id FROM [SHOW CLUSTER RANGES WITH DETAILS] + WHERE %s + ORDER BY random() + LIMIT $2 + ]`, predicate), nodeID, f.numReplicas) + require.NoError(f.t, err) + for rows.Next() { + var rangeID roachpb.RangeID + var locked bool + require.NoError(f.t, rows.Scan(&rangeID, &locked)) + require.True(f.t, locked) + f.t.Status(fmt.Sprintf("locked r%d on n%d", rangeID, nodeID)) + f.locks[nodeID] = append(f.locks[nodeID], rangeID) + } + require.NoError(f.t, rows.Err()) +} + +func (f *deadlockFailer) Recover(ctx context.Context, nodeID int) { + if f.locks == nil || len(f.locks[nodeID]) == 0 { + return + } + + err := func() error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + conn, err := f.c.ConnE(ctx, f.t.L(), nodeID) + if err != nil { + return err + } + for _, rangeID := range f.locks[nodeID] { + var unlocked bool + err := conn.QueryRowContext(ctx, + `SELECT crdb_internal.unsafe_lock_replica($1, false)`, rangeID).Scan(&unlocked) + if err != nil { + return err + } else if !unlocked { + return errors.Errorf("r%d was not unlocked", rangeID) + } else { + f.t.Status(fmt.Sprintf("unlocked r%d on n%d", rangeID, nodeID)) + } + } + return nil + }() + // We may have locked replicas that prevent us from connecting to the node + // again, so we fall back to restarting the node. + if err != nil { + f.t.Status(fmt.Sprintf("failed to unlock replicas on n%d, restarting node: %s", nodeID, err)) + f.m.ExpectDeath() + f.c.Stop(ctx, f.t.L(), option.DefaultStopOpts(), f.c.Node(nodeID)) + f.c.Start(ctx, f.t.L(), f.startOpts, f.startSettings, f.c.Node(nodeID)) + } + delete(f.locks, nodeID) +} + // diskStallFailer stalls the disk indefinitely. This should cause the node to // eventually self-terminate, but we'd want leases to move off before then. type diskStallFailer struct {