Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
102271: kvserver,storage: BatchCommitStats plumbing for raft appends and metrics r=tbg a=sumeerbhola

Pebble now produces detailed stats for a batch commit. These are partially
plumbed into kvserver, for (a) log statements related to raft appends,
(b) cumulative duration metrics applicable for all writes (raft and state
machine).

Informs cockroachdb/pebble#1943

Epic: none

Release note: None

103261: roachtest: add `failover` variants for replica deadlocks r=erikgrinaker a=erikgrinaker

**builtins: add `crdb_internal.unsafe_lock_replica()`**

This patch adds `crdb_internal.unsafe_lock_replica()` which can be used to (un)lock a given replica mutex on the gateway node. It requires the env var `COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true`. This is useful to test replica deadlocks or stalls in end-to-end resiliency tests.
  
**roachtest: add `failover` variants for replica deadlocks**

This patch adds `failover` variants that benchmark the pMax unavailability when 5 random leaseholder replicas are deadlocked on a node. The node remains alive, and continues to heartbeat both via liveness and RPC, as do other replicas on the node. The deadlocked replicas will be entirely unresponsive however, and depending on the timing this can also cause deadlocks on other mutexes.

This failure mode is representative of all failure modes that leave a replica unresponsive. This includes disk stalls, which will also hold replica locks during the stall, but disk stalls have specialized handling in e.g. Pebble and during node heartbeats that eventually resolve them while deadlocks don't.

We currently don't handle this failure mode at all, and expect permanent unavailability.

Resolves cockroachdb#103192.
Epic: none
Release note: None

Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
3 people committed May 26, 2023
3 parents c6255a1 + 4e3ea0a + eabc4fb commit 7709f15
Show file tree
Hide file tree
Showing 20 changed files with 409 additions and 30 deletions.
108 changes: 108 additions & 0 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -70,6 +72,7 @@ func registerFailover(r registry.Registry) {
failureModeBlackholeRecv,
failureModeBlackholeSend,
failureModeCrash,
failureModeDeadlock,
failureModeDiskStall,
failureModePause,
} {
Expand Down Expand Up @@ -606,6 +609,7 @@ func runFailoverNonSystem(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
settings.Env = append(settings.Env, "COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true")

failer := makeFailer(t, c, failureMode, opts, settings)
failer.Setup(ctx)
Expand Down Expand Up @@ -745,6 +749,7 @@ func runFailoverLiveness(
// Create cluster. Don't schedule a backup as this roachtest reports to roachperf.
opts := option.DefaultStartOptsNoBackups()
settings := install.MakeClusterSettings()
settings.Env = append(settings.Env, "COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true")

failer := makeFailer(t, c, failureMode, opts, settings)
failer.Setup(ctx)
Expand Down Expand Up @@ -888,6 +893,7 @@ func runFailoverSystemNonLiveness(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
settings.Env = append(settings.Env, "COCKROACH_ENABLE_UNSAFE_TEST_BUILTINS=true")

failer := makeFailer(t, c, failureMode, opts, settings)
failer.Setup(ctx)
Expand Down Expand Up @@ -1006,6 +1012,7 @@ const (
failureModeBlackholeRecv failureMode = "blackhole-recv"
failureModeBlackholeSend failureMode = "blackhole-send"
failureModeCrash failureMode = "crash"
failureModeDeadlock failureMode = "deadlock"
failureModeDiskStall failureMode = "disk-stall"
failureModePause failureMode = "pause"
failureModeNoop failureMode = "noop"
Expand Down Expand Up @@ -1064,6 +1071,15 @@ func makeFailerWithoutLocalNoop(
startOpts: opts,
startSettings: settings,
}
case failureModeDeadlock:
return &deadlockFailer{
t: t,
c: c,
startOpts: opts,
startSettings: settings,
onlyLeaseholders: true,
numReplicas: 5,
}
case failureModeDiskStall:
return &diskStallFailer{
t: t,
Expand Down Expand Up @@ -1249,6 +1265,98 @@ func (f *crashFailer) Recover(ctx context.Context, nodeID int) {
f.c.Start(ctx, f.t.L(), f.startOpts, f.startSettings, f.c.Node(nodeID))
}

// deadlockFailer deadlocks replicas. In addition to deadlocks, this failure
// mode is representative of all failure modes that leave a replica unresponsive
// while the node is otherwise still functional.
type deadlockFailer struct {
t test.Test
c cluster.Cluster
m cluster.Monitor
startOpts option.StartOpts
startSettings install.ClusterSettings
onlyLeaseholders bool
numReplicas int
locks map[int][]roachpb.RangeID // track locks by node
}

func (f *deadlockFailer) String() string { return string(failureModeDeadlock) }
func (f *deadlockFailer) CanUseLocal() bool { return true }
func (f *deadlockFailer) Setup(context.Context) {}
func (f *deadlockFailer) Ready(_ context.Context, m cluster.Monitor) { f.m = m }
func (f *deadlockFailer) Cleanup(context.Context) {}

func (f *deadlockFailer) Fail(ctx context.Context, nodeID int) {
require.NotZero(f.t, f.numReplicas)
if f.locks == nil {
f.locks = map[int][]roachpb.RangeID{}
}

ctx, cancel := context.WithTimeout(ctx, 20*time.Second) // can take a while to lock
defer cancel()

predicate := `$1 = ANY(replicas)`
if f.onlyLeaseholders {
predicate += ` AND lease_holder = $1`
}

conn := f.c.Conn(ctx, f.t.L(), nodeID)
rows, err := conn.QueryContext(ctx, fmt.Sprintf(
`SELECT range_id, crdb_internal.unsafe_lock_replica(range_id, true) FROM [
SELECT range_id FROM [SHOW CLUSTER RANGES WITH DETAILS]
WHERE %s
ORDER BY random()
LIMIT $2
]`, predicate), nodeID, f.numReplicas)
require.NoError(f.t, err)
for rows.Next() {
var rangeID roachpb.RangeID
var locked bool
require.NoError(f.t, rows.Scan(&rangeID, &locked))
require.True(f.t, locked)
f.t.Status(fmt.Sprintf("locked r%d on n%d", rangeID, nodeID))
f.locks[nodeID] = append(f.locks[nodeID], rangeID)
}
require.NoError(f.t, rows.Err())
}

func (f *deadlockFailer) Recover(ctx context.Context, nodeID int) {
if f.locks == nil || len(f.locks[nodeID]) == 0 {
return
}

err := func() error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

conn, err := f.c.ConnE(ctx, f.t.L(), nodeID)
if err != nil {
return err
}
for _, rangeID := range f.locks[nodeID] {
var unlocked bool
err := conn.QueryRowContext(ctx,
`SELECT crdb_internal.unsafe_lock_replica($1, false)`, rangeID).Scan(&unlocked)
if err != nil {
return err
} else if !unlocked {
return errors.Errorf("r%d was not unlocked", rangeID)
} else {
f.t.Status(fmt.Sprintf("unlocked r%d on n%d", rangeID, nodeID))
}
}
return nil
}()
// We may have locked replicas that prevent us from connecting to the node
// again, so we fall back to restarting the node.
if err != nil {
f.t.Status(fmt.Sprintf("failed to unlock replicas on n%d, restarting node: %s", nodeID, err))
f.m.ExpectDeath()
f.c.Stop(ctx, f.t.L(), option.DefaultStopOpts(), f.c.Node(nodeID))
f.c.Start(ctx, f.t.L(), f.startOpts, f.startSettings, f.c.Node(nodeID))
}
delete(f.locks, nodeID)
}

// diskStallFailer stalls the disk indefinitely. This should cause the node to
// eventually self-terminate, but we'd want leases to move off before then.
type diskStallFailer struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvserverbase/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
)

Expand All @@ -38,6 +39,10 @@ type Store interface {

// SetQueueActive disables/enables the named queue.
SetQueueActive(active bool, queue string) error

// GetReplicaMutexForTesting returns the mutex of the replica with the given
// range ID, or nil if no replica was found. This is used for testing.
GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex
}

// UnsupportedStoresIterator is a StoresIterator that only returns "unsupported"
Expand Down
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type AppendStats struct {
PebbleBegin time.Time
PebbleEnd time.Time
PebbleBytes int64
// Only set when !NonBlocking, which means almost never, since
// kv.raft_log.non_blocking_synchronization.enabled defaults to true.
PebbleCommitStats storage.BatchCommitStats

Sync bool
// If true, PebbleEnd-PebbleBegin does not include the sync time.
Expand All @@ -113,8 +116,9 @@ type LogStore struct {
// SyncCallback is a callback that is notified when a raft log write has been
// durably committed to disk. The function is handed the response messages that
// are associated with the MsgStorageAppend that triggered the fsync.
// commitStats is populated iff this was a non-blocking sync.
type SyncCallback interface {
OnLogSync(context.Context, []raftpb.Message)
OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats)
}

func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
Expand Down Expand Up @@ -257,6 +261,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
ctx: ctx,
cb: cb,
msgs: m.Responses,
batch: batch,
metrics: s.Metrics,
logCommitBegin: stats.PebbleBegin,
}
Expand All @@ -269,10 +274,11 @@ func (s *LogStore) storeEntriesAndCommitBatch(
return RaftState{}, errors.Wrap(err, expl)
}
stats.PebbleEnd = timeutil.Now()
stats.PebbleCommitStats = batch.CommitStats()
if wantsSync {
logCommitEnd := stats.PebbleEnd
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
cb.OnLogSync(ctx, m.Responses)
cb.OnLogSync(ctx, m.Responses, storage.BatchCommitStats{})
}
}
stats.Sync = wantsSync
Expand Down Expand Up @@ -325,6 +331,8 @@ type nonBlockingSyncWaiterCallback struct {
ctx context.Context
cb SyncCallback
msgs []raftpb.Message
// Used to extract stats. This is the batch that has been synced.
batch storage.WriteBatch
// Used to record Metrics.
metrics Metrics
logCommitBegin time.Time
Expand All @@ -334,7 +342,8 @@ type nonBlockingSyncWaiterCallback struct {
func (cb *nonBlockingSyncWaiterCallback) run() {
dur := timeutil.Since(cb.logCommitBegin).Nanoseconds()
cb.metrics.RaftLogCommitLatency.RecordValue(dur)
cb.cb.OnLogSync(cb.ctx, cb.msgs)
commitStats := cb.batch.CommitStats()
cb.cb.OnLogSync(cb.ctx, cb.msgs, commitStats)
cb.release()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (b *discardBatch) Commit(bool) error {

type noopSyncCallback struct{}

func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {}
func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) {}

func BenchmarkLogStore_StoreEntries(b *testing.B) {
defer log.Scope(b).Close(b)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/logstore/sync_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) {
if err := w.wg.SyncWait(); err != nil {
log.Fatalf(ctx, "SyncWait error: %+v", err)
}
w.wg.Close()
w.cb.run()
w.wg.Close()
case <-stopper.ShouldQuiesce():
return
}
Expand All @@ -108,7 +108,9 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) {
// durably committed. It may never be called in case the stopper stops.
//
// The syncWaiter will be Closed after its SyncWait method completes. It must
// not be Closed by the caller.
// not be Closed by the caller. The cb is called before the syncWaiter is
// closed, in case the cb implementation needs to extract something form the
// syncWaiter.
//
// If the SyncWaiterLoop has already been stopped, the callback will never be
// called.
Expand Down
80 changes: 80 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,61 @@ bytes preserved during flushes and compactions over the lifetime of the process.
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaBatchCommitCount = metric.Metadata{
Name: "storage.batch-commit.count",
Help: "Count of batch commits. See storage.AggregatedBatchCommitStats for details.",
Measurement: "Commit Ops",
Unit: metric.Unit_COUNT,
}
metaBatchCommitDuration = metric.Metadata{
Name: "storage.batch-commit.duration",
Help: "Cumulative time spent in batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitSemWaitDuration = metric.Metadata{
Name: "storage.batch-commit.sem-wait.duration",
Help: "Cumulative time spent in semaphore wait, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitWALQWaitDuration = metric.Metadata{
Name: "storage.batch-commit.wal-queue-wait.duration",
Help: "Cumulative time spent waiting for memory blocks in the WAL queue, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitMemStallDuration = metric.Metadata{
Name: "storage.batch-commit.mem-stall.duration",
Help: "Cumulative time spent in a write stall due to too many memtables, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitL0StallDuration = metric.Metadata{
Name: "storage.batch-commit.l0-stall.duration",
Help: "Cumulative time spent in a write stall due to high read amplification in L0, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitWALRotDuration = metric.Metadata{
Name: "storage.batch-commit.wal-rotation.duration",
Help: "Cumulative time spent waiting for WAL rotation, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitCommitWaitDuration = metric.Metadata{
Name: "storage.batch-commit.commit-wait.duration",
Help: "Cumulative time spent waiting for WAL sync, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
)

var (
Expand Down Expand Up @@ -2068,6 +2123,14 @@ type StoreMetrics struct {
FlushableIngestCount *metric.Gauge
FlushableIngestTableCount *metric.Gauge
FlushableIngestTableSize *metric.Gauge
BatchCommitCount *metric.Gauge
BatchCommitDuration *metric.Gauge
BatchCommitSemWaitDuration *metric.Gauge
BatchCommitWALQWaitDuration *metric.Gauge
BatchCommitMemStallDuration *metric.Gauge
BatchCommitL0StallDuration *metric.Gauge
BatchCommitWALRotWaitDuration *metric.Gauge
BatchCommitCommitWaitDuration *metric.Gauge

RdbCheckpoints *metric.Gauge

Expand Down Expand Up @@ -2684,6 +2747,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
FlushableIngestCount: metric.NewGauge(metaFlushableIngestCount),
FlushableIngestTableCount: metric.NewGauge(metaFlushableIngestTableCount),
FlushableIngestTableSize: metric.NewGauge(metaFlushableIngestTableBytes),
BatchCommitCount: metric.NewGauge(metaBatchCommitCount),
BatchCommitDuration: metric.NewGauge(metaBatchCommitDuration),
BatchCommitSemWaitDuration: metric.NewGauge(metaBatchCommitSemWaitDuration),
BatchCommitWALQWaitDuration: metric.NewGauge(metaBatchCommitWALQWaitDuration),
BatchCommitMemStallDuration: metric.NewGauge(metaBatchCommitMemStallDuration),
BatchCommitL0StallDuration: metric.NewGauge(metaBatchCommitL0StallDuration),
BatchCommitWALRotWaitDuration: metric.NewGauge(metaBatchCommitWALRotDuration),
BatchCommitCommitWaitDuration: metric.NewGauge(metaBatchCommitCommitWaitDuration),

RdbCheckpoints: metric.NewGauge(metaRdbCheckpoints),

Expand Down Expand Up @@ -3035,6 +3106,15 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.FlushableIngestCount.Update(int64(m.Flush.AsIngestCount))
sm.FlushableIngestTableCount.Update(int64(m.Flush.AsIngestTableCount))
sm.FlushableIngestTableSize.Update(int64(m.Flush.AsIngestBytes))
sm.BatchCommitCount.Update(int64(m.BatchCommitStats.Count))
sm.BatchCommitDuration.Update(int64(m.BatchCommitStats.TotalDuration))
sm.BatchCommitSemWaitDuration.Update(int64(m.BatchCommitStats.SemaphoreWaitDuration))
sm.BatchCommitWALQWaitDuration.Update(int64(m.BatchCommitStats.WALQueueWaitDuration))
sm.BatchCommitMemStallDuration.Update(int64(m.BatchCommitStats.MemTableWriteStallDuration))
sm.BatchCommitL0StallDuration.Update(int64(m.BatchCommitStats.L0ReadAmpWriteStallDuration))
sm.BatchCommitWALRotWaitDuration.Update(int64(m.BatchCommitStats.WALRotationDuration))
sm.BatchCommitCommitWaitDuration.Update(int64(m.BatchCommitStats.CommitWaitDuration))

// Update the maximum number of L0 sub-levels seen.
sm.l0SublevelsTracker.Lock()
sm.l0SublevelsTracker.swag.Record(timeutil.Now(), float64(m.Levels[0].Sublevels))
Expand Down
Loading

0 comments on commit 7709f15

Please sign in to comment.