Skip to content

Commit

Permalink
Merge pull request #38642 from tbg/backport19.1-38484
Browse files Browse the repository at this point in the history
backport-19.1: storage: truncate raft log less aggressively when replica is missing
  • Loading branch information
tbg authored Jul 4, 2019
2 parents f1c9693 + 9edee5b commit 1a14d34
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 66 deletions.
6 changes: 4 additions & 2 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ var (
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)

// defaultRaftLogTruncationThreshold specifies the upper bound that a single
// Range's Raft log can grow to before log truncations are triggered, even
// if that means a snapshot will be required for a straggling follower.
// Range's Raft log can grow to before log truncations are triggered while at
// least one follower is missing. If all followers are active, the quota pool
// is responsible for ensuring the raft log doesn't grow without bound by
// making sure the leader doesn't get too far ahead.
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
"COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 4<<20 /* 4 MB */)

Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func registerTests(r *registry) {
registerQueue(r)
registerRebalanceLoad(r)
registerReplicaGC(r)
registerRestart(r)
registerRestore(r)
registerRoachmart(r)
registerScaleData(r)
Expand Down
95 changes: 95 additions & 0 deletions pkg/cmd/roachtest/restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func runRestart(ctx context.Context, t *test, c *cluster, downDuration time.Duration) {
crdbNodes := c.Range(1, c.nodes)
workloadNode := c.Node(1)
const restartNode = 3

t.Status("installing cockroach")
c.Put(ctx, cockroach, "./cockroach", crdbNodes)
c.Start(ctx, t, crdbNodes, startArgs(`--args=--vmodule=raft_log_queue=3`))

// We don't really need tpcc, we just need a good amount of traffic and a good
// amount of data.
t.Status("importing tpcc fixture")
c.Run(ctx, workloadNode,
"./cockroach workload fixtures import tpcc --warehouses=100 --fks=false --checks=false")

// Wait a full scanner cycle (10m) for the raft log queue to truncate the
// sstable entries from the import. They're huge and are not representative of
// normal traffic.
//
// NB: less would probably do a good enough job, but let's play it safe.
//
// TODO(dan/tbg): It's awkward that this is necessary. We should be able to
// do a better job here, for example by truncating only a smaller prefix of
// the log instead of all of it (right now there's no notion of per-entry
// size when we do truncate). Also having quiescing ranges truncate to
// lastIndex will be helpful because that drives the log size down eagerly
// when things are healthy.
t.Status("waiting for addsstable truncations")
time.Sleep(11 * time.Minute)

// Stop a node.
c.Stop(ctx, c.Node(restartNode))

// Wait for between 10s and `server.time_until_store_dead` while sending
// traffic to one of the nodes that are not down. This used to cause lots of
// raft log truncation, which caused node 3 to need lots of snapshots when it
// came back up.
c.Run(ctx, workloadNode, "./cockroach workload run tpcc --warehouses=100 "+
fmt.Sprintf("--tolerate-errors --wait=false --duration=%s", downDuration))

// Bring it back up and make sure it can serve a query within a reasonable
// time limit. For now, less time than it was down for.
c.Start(ctx, t, c.Node(restartNode))

// Dialing the formerly down node may still be prevented by the circuit breaker
// for a short moment (seconds) after n3 restarts. If it happens, the COUNT(*)
// can fail with a "no inbound stream connection" error. This is not what we
// want to catch in this test, so work around it.
//
// See
time.Sleep(15 * time.Second)

start := timeutil.Now()
restartNodeDB := c.Conn(ctx, restartNode)
if _, err := restartNodeDB.Exec(`SELECT count(*) FROM tpcc.order_line`); err != nil {
t.Fatal(err)
}
if took := timeutil.Since(start); took > downDuration {
t.Fatalf(`expected to recover within %s took %s`, downDuration, took)
} else {
c.l.Printf(`connecting and query finished in %s`, took)
}
}

func registerRestart(r *registry) {
r.Add(testSpec{
Name: fmt.Sprintf("restart/down-for-2m"),
Cluster: makeClusterSpec(3),
// "cockroach workload is only in 19.1+"
MinVersion: "v19.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runRestart(ctx, t, c, 2*time.Minute)
},
})
}
116 changes: 63 additions & 53 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,19 @@ func (td *truncateDecision) ShouldTruncate() bool {
(n > 0 && td.Input.LogSize >= RaftLogQueueStaleSize)
}

// ProtectIndex attempts to "protect" a position in the log by making sure it's
// not truncated away. Specifically it lowers the proposed truncation point
// (which will be the new first index after the truncation) to the given index
// if it would be truncating at a point past it. If a change is made, the
// ChosenVia is updated with the one given. This protection is not guaranteed if
// the protected index is outside of the existing [FirstIndex,LastIndex] bounds.
func (td *truncateDecision) ProtectIndex(index uint64, chosenVia string) {
if td.NewFirstIndex > index {
td.NewFirstIndex = index
td.ChosenVia = chosenVia
}
}

// computeTruncateDecision returns the oldest index that cannot be
// truncated. If there is a behind node, we want to keep old raft logs so it
// can catch up without having to send a full snapshot. However, if a node down
Expand All @@ -380,70 +393,67 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision := truncateDecision{Input: input}
decision.QuorumIndex = getQuorumIndex(&input.RaftStatus)

decision.NewFirstIndex = decision.QuorumIndex
decision.ChosenVia = truncatableIndexChosenViaQuorumIndex
// The last index is most aggressive possible truncation that we could do.
// Everything else in this method makes the truncation less aggressive.
decision.NewFirstIndex = decision.Input.LastIndex
decision.ChosenVia = truncatableIndexChosenViaLastIndex

// Start by trying to truncate at the quorum index. Naively, you would expect
// lastIndex to never be smaller than quorumIndex, but
// RaftStatus.Progress.Match is updated on the leader when a command is
// proposed and in a single replica Raft group this also means that
// RaftStatus.Commit is updated at propose time.
decision.ProtectIndex(decision.QuorumIndex, truncatableIndexChosenViaQuorumIndex)

for _, progress := range input.RaftStatus.Progress {
if !progress.RecentActive {
// If a follower isn't recently active, don't lower the truncation
// index for it as the follower is likely not online at all and would
// block log truncation forever.
continue
}
// Snapshots are expensive, so we try our best to avoid truncating past
// where a follower is.

// Generally we truncate to the quorum commit index when the log becomes
// too large, but we make an exception for live followers which are
// being probed (i.e. the leader doesn't know how far they've caught
// up). In that case the Match index is too large, and so the quorum
// index can be, too. We don't want these followers to require a
// snapshot since they are most likely going to be caught up very soon
// (they respond with the "right index" to the first probe or don't
// respond, in which case they should end up as not recently active).
// But we also don't know their index, so we can't possible make a
// truncation decision that avoids that at this point and make the
// truncation a no-op.
// First, we never truncate off a recently active follower, no matter how
// large the log gets. Recently active shares the (currently 10s) constant
// as the quota pool, so the quota pool should put a bound on how much the
// raft log can grow due to this.
//
// The scenario in which this is most relevant is during restores, where
// we split off new ranges that rapidly receive very large log entries
// while the Raft group is still in a state of discovery (a new leader
// starts probing followers at its own last index). Additionally, these
// ranges will be split many times over, resulting in a flurry of
// snapshots with overlapping bounds that put significant stress on the
// Raft snapshot queue.
if progress.State == raft.ProgressStateProbe {
if decision.NewFirstIndex > decision.Input.FirstIndex {
decision.NewFirstIndex = decision.Input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaProbingFollower
// For live followers which are being probed (i.e. the leader doesn't know
// how far they've caught up), the Match index is too large, and so the
// quorum index can be, too. We don't want these followers to require a
// snapshot since they are most likely going to be caught up very soon (they
// respond with the "right index" to the first probe or don't respond, in
// which case they should end up as not recently active). But we also don't
// know their index, so we can't possible make a truncation decision that
// avoids that at this point and make the truncation a no-op.
//
// The scenario in which this is most relevant is during restores, where we
// split off new ranges that rapidly receive very large log entries while
// the Raft group is still in a state of discovery (a new leader starts
// probing followers at its own last index). Additionally, these ranges will
// be split many times over, resulting in a flurry of snapshots with
// overlapping bounds that put significant stress on the Raft snapshot
// queue.
if progress.RecentActive {
if progress.State == raft.ProgressStateProbe {
decision.ProtectIndex(decision.Input.FirstIndex, truncatableIndexChosenViaProbingFollower)
} else {
decision.ProtectIndex(progress.Match, truncatableIndexChosenViaFollowers)
}
} else if !input.LogTooLarge() && decision.NewFirstIndex > progress.Match {
decision.NewFirstIndex = progress.Match
decision.ChosenVia = truncatableIndexChosenViaFollowers
continue
}

// Second, if the follower has not been recently active, we don't
// truncate it off as long as the raft log is not too large.
if !input.LogTooLarge() {
decision.ProtectIndex(progress.Match, truncatableIndexChosenViaFollowers)
}

// Otherwise, we let it truncate to the quorum index.
}

// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range (or is in Raft recovery). We don't want to
// truncate the log in a way that will require that new replica to be caught
// up via yet another Raft snapshot.
if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex {
decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex
decision.ChosenVia = truncatableIndexChosenViaPendingSnap
}

// Advance to the first index, but never truncate past the quorum commit
// index.
if decision.NewFirstIndex < input.FirstIndex && input.FirstIndex <= decision.QuorumIndex {
decision.NewFirstIndex = input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaFirstIndex
}
// Never truncate past the last index. Naively, you would expect lastIndex to
// never be smaller than quorumIndex, but RaftStatus.Progress.Match is
// updated on the leader when a command is proposed and in a single replica
// Raft group this also means that RaftStatus.Commit is updated at propose
// time.
if decision.NewFirstIndex > input.LastIndex {
decision.NewFirstIndex = input.LastIndex
decision.ChosenVia = truncatableIndexChosenViaLastIndex
if input.PendingPreemptiveSnapshotIndex > 0 {
decision.ProtectIndex(input.PendingPreemptiveSnapshotIndex, truncatableIndexChosenViaPendingSnap)
}

// If new first index dropped below first index, make them equal (resulting
Expand Down Expand Up @@ -497,7 +507,7 @@ func (rlq *raftLogQueue) shouldQueue(
}

// shouldQueueImpl returns whether the given truncate decision should lead to
// a log truncation. This is either the case if the decision says so or of
// a log truncation. This is either the case if the decision says so or if
// we want to recompute the log size (in which case `recomputeRaftLogSize` and
// `shouldQ` are both true and a reasonable priority is returned).
func (rlq *raftLogQueue) shouldQueueImpl(
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ func TestComputeTruncateDecision(t *testing.T) {
{
// Nothing to truncate.
[]uint64{1, 2}, 100, 1, 1, 0,
"should truncate: false [truncate 0 entries to first index 1 (chosen via: quorum)]"},
"should truncate: false [truncate 0 entries to first index 1 (chosen via: last index)]"},
{
// Nothing to truncate on this replica, though a quorum elsewhere has more progress.
// NB this couldn't happen if we're truly the Raft leader, unless we appended to our
// own log asynchronously.
[]uint64{1, 5, 5}, 100, 1, 1, 0,
"should truncate: false [truncate 0 entries to first index 1 (chosen via: followers)]",
"should truncate: false [truncate 0 entries to first index 1 (chosen via: last index)]",
},
{
// We're not truncating anything, but one follower is already cut off. There's no pending
Expand All @@ -149,7 +149,7 @@ func TestComputeTruncateDecision(t *testing.T) {
{
// The happy case.
[]uint64{5, 5, 5}, 100, 2, 5, 0,
"should truncate: false [truncate 3 entries to first index 5 (chosen via: quorum)]",
"should truncate: false [truncate 3 entries to first index 5 (chosen via: last index)]",
},
{
// No truncation, but the outstanding snapshot is made obsolete by the truncation. However
Expand All @@ -174,10 +174,10 @@ func TestComputeTruncateDecision(t *testing.T) {
[]uint64{1, 2, 3, 4}, 100, 2, 2, 0,
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
},
// If over targetSize, should truncate to quorum committed index. Minority will need snapshots.
// Don't truncate off active followers, even if over targetSize.
{
[]uint64{1, 3, 3, 4}, 2000, 1, 3, 0,
"should truncate: false [truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot]",
"should truncate: false [truncate 0 entries to first index 1 (chosen via: followers); log too large (2.0 KiB > 1000 B)]",
},
// Don't truncate away pending snapshot, even when log too large.
{
Expand All @@ -186,11 +186,11 @@ func TestComputeTruncateDecision(t *testing.T) {
},
{
[]uint64{1, 3, 3, 4}, 2000, 2, 3, 0,
"should truncate: false [truncate 1 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)]",
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index); log too large (2.0 KiB > 1000 B)]",
},
{
[]uint64{1, 3, 3, 4}, 2000, 3, 3, 0,
"should truncate: false [truncate 0 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)]",
"should truncate: false [truncate 0 entries to first index 3 (chosen via: first index); log too large (2.0 KiB > 1000 B)]",
},
// The pending snapshot index affects the quorum commit index.
{
Expand All @@ -213,7 +213,7 @@ func TestComputeTruncateDecision(t *testing.T) {
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
}}
for i, c := range testCases {
t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) {
t.Run("", func(t *testing.T) {
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
}
Expand All @@ -231,7 +231,7 @@ func TestComputeTruncateDecision(t *testing.T) {
}
decision := computeTruncateDecision(input)
if act, exp := decision.String(), c.exp; act != exp {
t.Errorf("%d: got:\n%s\nwanted:\n%s", i, act, exp)
t.Errorf("%d:\ngot:\n%s\nwanted:\n%s", i, act, exp)
}

// Verify the triggers that queue a range for recomputation. In
Expand Down Expand Up @@ -270,11 +270,11 @@ func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) {
exp := map[bool]map[bool]string{ // (tooLarge, active)
false: {
true: "should truncate: false [truncate 0 entries to first index 10 (chosen via: probing follower)]",
false: "should truncate: true [truncate 190 entries to first index 200 (chosen via: followers)]",
false: "should truncate: false [truncate 0 entries to first index 10 (chosen via: first index)]",
},
true: {
true: "should truncate: false [truncate 0 entries to first index 10 (chosen via: probing follower); log too large (2.0 KiB > 1.0 KiB)]",
false: "should truncate: true [truncate 290 entries to first index 300 (chosen via: quorum); log too large (2.0 KiB > 1.0 KiB); implies 1 Raft snapshot]",
false: "should truncate: true [truncate 190 entries to first index 200 (chosen via: followers); log too large (2.0 KiB > 1.0 KiB)]",
},
}

Expand Down

0 comments on commit 1a14d34

Please sign in to comment.