Skip to content

Commit

Permalink
storage: add permitLargeSnapshots flag to replica
Browse files Browse the repository at this point in the history
In a privately reported user issue, we've seen that [our attempts](cockroachdb#7788)
at [preventing large snapshots](cockroachdb#7581)
can result in replica unavailability. Our current approach to limiting
large snapshots assumes is that its ok to block snapshots indefinitely
while waiting for a range to first split. Unfortunately, this can create
a dependency cycle where a range requires a snapshot to split (because it
can't achieve an up-to-date quorum without it) but isn't allowed to perform
a snapshot until its size is reduced below the threshold. This can result
in unavailability even when a majority of replicas remain live.

Currently, we still need this snapshot size limit because unbounded snapshots
can result in OOM errors that crash entire nodes. However, once snapshots
are streamed from disk to disk, never needing to buffer in-memory on the
sending or receiving side, we should be able to remove any snapshot size
limit (see cockroachdb#16954).

As a holdover, this change introduces a `permitLargeSnapshots` flag on a
replica which is set when the replica is too large to snapshot but observes
splits failing. When set, the flag allows snapshots to ignore the size
limit until the snapshot goes through and splits are able to succeed
again.

Release note: None
  • Loading branch information
nvanbenschoten committed Dec 9, 2017
1 parent cfd7dcd commit 5c21fa6
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 7 deletions.
126 changes: 123 additions & 3 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func TestStoreRangeSplitStatsWithMerges(t *testing.T) {
// fillRange writes keys with the given prefix and associated values
// until bytes bytes have been written or the given range has split.
func fillRange(
store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64, t *testing.T,
t *testing.T, store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64,
) {
src := rand.New(rand.NewSource(0))
for {
Expand All @@ -803,7 +803,7 @@ func fillRange(
return
}
key := append(append([]byte(nil), prefix...), randutil.RandBytes(src, 100)...)
key = keys.MakeFamilyKey(key, 0)
key = keys.MakeFamilyKey(key, src.Uint32())
val := randutil.RandBytes(src, int(src.Int31n(1<<8)))
pArgs := putArgs(key, val)
_, pErr := client.SendWrappedWith(context.Background(), store, roachpb.Header{
Expand Down Expand Up @@ -862,7 +862,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
}

// Look in the range after prefix we're writing to.
fillRange(store, repl.RangeID, tableBoundary, maxBytes, t)
fillRange(t, store, repl.RangeID, tableBoundary, maxBytes)
}

// Verify that the range is in fact split.
Expand Down Expand Up @@ -913,6 +913,126 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
})
}

// TestStoreRangeSplitAfterLargeSnapshot tests a scenerio where a range is too
// large to snapshot a follower, but is unable to split because it cannot
// achieve quorum. The leader of the range should adapt to this, eventually
// permitting the large snapshot so that it can recover and then split
// successfully.
func TestStoreRangeSplitAfterLargeSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set maxBytes to something small so we can exceed the maximum snapshot
// size without adding 2x64MB of data.
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
})()

// Create a three node cluster.
sc := storage.TestStoreConfig(nil)
sc.RaftElectionTimeoutTicks = 1000000
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)
store0 := mtc.stores[0]

// The behindNode falls behind far enough to require a snapshot.
const behindNode = 1
// The crashingNode crashes after its single range becomes too large to
// snapshot.
const crashingNode = 2

// First, do a write; we'll use this to determine when the dust has settled.
keyPrefix := append(keys.UserTableDataMin, []byte("key")...)
incArgs := incrementArgs(keyPrefix, 1)
if _, pErr := client.SendWrapped(context.Background(), rg1(store0), incArgs); pErr != nil {
t.Fatal(pErr)
}

// Replicate to the other nodes.
mtc.replicateRange(1, behindNode, crashingNode)
mtc.waitForValues(keyPrefix, []int64{1, 1, 1})

// Activate queues and wait for initial splits.
for _, store := range mtc.stores {
store.SetSplitQueueActive(true)
store.SetRaftSnapshotQueueActive(true)
store.ForceSplitScanAndProcess()
}
if err := server.WaitForInitialSplits(store0.DB()); err != nil {
t.Fatal(err)
}

repl := store0.LookupReplica(roachpb.RKey(keyPrefix), nil)
rangeID := repl.RangeID
header := roachpb.Header{RangeID: rangeID}

// Deactivate split and snapshot queue.
for _, store := range mtc.stores {
store.SetSplitQueueActive(false)
store.SetRaftSnapshotQueueActive(false)
}

// Fill the range so that it will try to split once the splitQueue is
// re-enabled. Fill it past the snapshot size limit enforced in
// Replica.GetSnapshot. We do this before stopping behindNode so that
// the quotaPool does not throttle progress.
fillRange(t, store0, rangeID, keyPrefix, 2*maxBytes+1)

// Stop behindNode so it falls behind and will require a snapshot.
mtc.stopStore(behindNode)

// Let behindNode fall behind.
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyPrefix, []int64{2, 1, 2})

// Truncate the replica's log. This ensures that the only way behindNode can
// recover is through a snapshot.
index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
truncArgs := truncateLogArgs(index+1, rangeID)
truncArgs.Key = repl.Desc().StartKey.AsRawKey()
if _, err := client.SendWrappedWith(context.Background(), store0, header, truncArgs); err != nil {
t.Fatal(err)
}

// Stop crashingNode so that we lose quorum and can no longer split.
// Bring behindNode back up.
mtc.stopStore(crashingNode)
mtc.restartStore(behindNode)

// Turn the queues back on.
for i, store := range mtc.stores {
if i != crashingNode {
store.SetSplitQueueActive(true)
store.SetRaftSnapshotQueueActive(true)
}
}

// Verify that the range is eventually able to split after behindNode is
// sent a snapshot.
expectedInitialRanges, err := server.ExpectedInitialRangeCount(store0.DB())
if err != nil {
t.Fatal(err)
}
testutils.SucceedsSoon(t, func() error {
// Eagerly force split queue to process.
for i, store := range mtc.stores {
if i != crashingNode {
store.ForceSplitScanAndProcess()
}
}
if store0.ReplicaCount() < expectedInitialRanges+1 {
return errors.Errorf("expected new range created by split")
}
return nil
})
}

// TestStoreRangeSystemSplits verifies that splits are based on the contents of
// the SystemConfig span.
func TestStoreRangeSystemSplits(t *testing.T) {
Expand Down
21 changes: 18 additions & 3 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ type Replica struct {
minLeaseProposedTS hlc.Timestamp
// Max bytes before split.
maxBytes int64
// Allow snapshots of any size instead of waiting for a split. Set to
// true when a split that is required for snapshots fails. Reset to
// false when the splits eventually succeed. The reasoning here is that
// in certain situations the split is dependent on the snapshot
// succeeding (either directly or transitively), so blocking the
// snapshot on the split can create a deadlock.
// TODO(nvanbenschoten): remove after #16954 is addressed.
permitLargeSnapshots bool
// proposals stores the Raft in-flight commands which
// originated at this Replica, i.e. all commands for which
// propose has been called, but which have not yet
Expand Down Expand Up @@ -1192,21 +1200,28 @@ func (r *Replica) getEstimatedBehindCountRLocked(raftStatus *raft.Status) int64
return 0
}

// GetMaxBytes atomically gets the range maximum byte limit.
// GetMaxBytes gets the range maximum byte limit.
func (r *Replica) GetMaxBytes() int64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.maxBytes
}

// SetMaxBytes atomically sets the maximum byte limit before
// split. This value is cached by the range for efficiency.
// SetMaxBytes sets the maximum byte limit before split.
func (r *Replica) SetMaxBytes(maxBytes int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.maxBytes = maxBytes

// Whenever we change maxBytes, reset permitLargeSnapshots.
r.mu.permitLargeSnapshots = false
}

// func (r *Replica) DoubleMaxBytes() {
// r.mu.Lock()
// defer r.mu.Unlock()
// }

// IsFirstRange returns true if this is the first range.
func (r *Replica) IsFirstRange() bool {
return r.RangeID == 1
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,19 @@ func (r *Replica) GetSnapshot(
defer r.mu.RUnlock()
rangeID := r.RangeID

if r.exceedsDoubleSplitSizeRLocked() {
// TODO(nvanbenschoten): We should never block snapshots indefinitely. Doing
// so can reduce a range's ability to recover from an under-replicated state
// and can cause unavailability even when a majority of replicas remain
// live. For instance, if a range gets too large to snapshot and requires a
// split in order to do so again, the loss of one up-to-date replica could
// cause it to permanently lose quorum.
//
// For now we still need this check because unbounded snapshots can result
// in OOM errors that crash entire nodes. However, once snapshots are
// streamed from disk to disk, never needing to buffer in-memory on the
// sending or receiving side, we should be able to remove any snapshot size
// limit. See #16954 for more.
if r.exceedsDoubleSplitSizeRLocked() && !r.mu.permitLargeSnapshots {
maxBytes := r.mu.maxBytes
size := r.mu.state.Stats.Total()
err := errors.Errorf(
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/replica_raftstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) {
t.Fatal(err)
}

// Snapshot should succeed.
if snap, err := rep.GetSnapshot(context.Background(), "test"); err != nil {
t.Fatal(err)
} else {
Expand All @@ -93,6 +94,7 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) {
t.Fatal(err)
}

// Snapshot should fail.
const expected = "not generating test snapshot because replica is too large"
if _, err := rep.GetSnapshot(context.Background(), "test"); !testutils.IsError(err, expected) {
rep.mu.Lock()
Expand All @@ -104,4 +106,16 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) {
rep.needsSplitBySize(), rep.exceedsDoubleSplitSizeRLocked(), err,
)
}

// Set the permitLargeSnapshots flag, which bypasses the snapshot size check.
rep.mu.Lock()
rep.mu.permitLargeSnapshots = true
rep.mu.Unlock()

// Snapshot should succeed.
if snap, err := rep.GetSnapshot(context.Background(), "test"); err != nil {
t.Fatal(err)
} else {
snap.Close()
}
}
11 changes: 11 additions & 0 deletions pkg/storage/split_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ func (sq *splitQueue) process(ctx context.Context, r *Replica, sysCfg config.Sys
roachpb.AdminSplitRequest{},
desc,
); pErr != nil {
// If we failed to split the range and the range is too large to snapshot,
// set the permitLargeSnapshots flag so that we don't continue to block
// large snapshots. This could result in unavailability. The flag is reset
// whenever the split size is adjusted, which includes when the split
// finally succeeds.
// TODO(nvanbenschoten): remove after #16954.
r.mu.Lock()
defer r.mu.Unlock()
if r.exceedsDoubleSplitSizeRLocked() {
r.mu.permitLargeSnapshots = true
}
return pErr.GoError()
} else if !validSplitKey {
// If we couldn't find a split key, set the max-bytes for the range to
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,8 @@ type StoreTestingKnobs struct {
// DisableTimeSeriesMaintenanceQueue disables the time series maintenance
// queue.
DisableTimeSeriesMaintenanceQueue bool
// DisableRaftSnapshotQueue disables the raft snapshot queue.
DisableRaftSnapshotQueue bool
// DisableScanner disables the replica scanner.
DisableScanner bool
// DisablePeriodicGossips disables periodic gossiping.
Expand Down Expand Up @@ -902,6 +904,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
if cfg.TestingKnobs.DisableTimeSeriesMaintenanceQueue {
s.setTimeSeriesMaintenanceQueueActive(false)
}
if cfg.TestingKnobs.DisableRaftSnapshotQueue {
s.setRaftSnapshotQueueActive(false)
}
if cfg.TestingKnobs.DisableScanner {
s.setScannerActive(false)
}
Expand Down

0 comments on commit 5c21fa6

Please sign in to comment.