Skip to content

Commit

Permalink
Merge #34265 #34266
Browse files Browse the repository at this point in the history
34265: storage: assorted queue logging improvements r=andreimatei a=andreimatei

see individual commits

34266: storage: log decisions to not truncate a range r=andreimatei a=andreimatei

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Jan 29, 2019
3 parents 7c8dbff + 755fbf3 + b8eab7a commit 1c02b84
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 59 deletions.
58 changes: 20 additions & 38 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ type queueImpl interface {
context.Context, hlc.Timestamp, *Replica, *config.SystemConfig,
) (shouldQueue bool, priority float64)

// process accepts lease status, a replica, and the system config
// and executes queue-specific work on it. The Replica is guaranteed
// to be initialized.
// process accepts a replica, and the system config and executes
// queue-specific work on it. The Replica is guaranteed to be initialized.
process(context.Context, *Replica, *config.SystemConfig) error

// timer returns a duration to wait between processing the next item
Expand Down Expand Up @@ -638,17 +637,17 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunAsyncTask(
annotatedCtx, fmt.Sprintf("storage.%s: processing replica", bq.name),
func(annotatedCtx context.Context) {
func(ctx context.Context) {
// Release semaphore when finished processing.
defer func() { <-bq.processSem }()

start := timeutil.Now()
err := bq.processReplica(annotatedCtx, repl)
err := bq.processReplica(ctx, repl)

duration := timeutil.Since(start)
bq.recordProcessDuration(annotatedCtx, duration)
bq.recordProcessDuration(ctx, duration)

bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
// Release semaphore on task failure.
<-bq.processSem
Expand Down Expand Up @@ -693,39 +692,31 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
// processReplica processes a single replica. This should not be
// called externally to the queue. bq.mu.Lock must not be held
// while calling this method.
func (bq *baseQueue) processReplica(queueCtx context.Context, repl *Replica) error {
//
// ctx should already be annotated by repl.AnnotateCtx().
func (bq *baseQueue) processReplica(ctx context.Context, repl *Replica) error {
// Load the system config if it's needed.
var cfg *config.SystemConfig
if bq.needsSystemConfig {
cfg = bq.gossip.GetSystemConfig()
if cfg == nil {
if log.V(1) {
log.Infof(queueCtx, "no system config available. skipping")
}
log.VEventf(ctx, 1, "no system config available. skipping")
return nil
}
}

if cfg != nil && bq.requiresSplit(cfg, repl) {
// Range needs to be split due to zone configs, but queue does
// not accept unsplit ranges.
if log.V(3) {
log.Infof(queueCtx, "split needed; skipping")
}
log.VEventf(ctx, 3, "split needed; skipping")
return nil
}

// Putting a span in a context means that events will no longer go to the
// event log. Use queueCtx for events that are intended for the event log.
ctx, span := bq.AnnotateCtxWithSpan(queueCtx, bq.name)
ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name)
defer span.Finish()
// Also add the Replica annotations to ctx.
ctx = repl.AnnotateCtx(ctx)
ctx, cancel := context.WithTimeout(ctx, bq.processTimeout)
defer cancel()
if log.V(1) {
log.Infof(ctx, "processing replica")
}
log.VEventf(ctx, 1, "processing replica")

if !repl.IsInitialized() {
// We checked this when adding the replica, but we need to check it again
Expand All @@ -735,9 +726,7 @@ func (bq *baseQueue) processReplica(queueCtx context.Context, repl *Replica) err

if reason, err := repl.IsDestroyed(); err != nil {
if !bq.queueConfig.processDestroyedReplicas || reason == destroyReasonRemoved {
if log.V(3) {
log.Infof(queueCtx, "replica destroyed (%s); skipping", err)
}
log.VEventf(ctx, 3, "replica destroyed (%s); skipping", err)
return nil
}
}
Expand All @@ -749,10 +738,7 @@ func (bq *baseQueue) processReplica(queueCtx context.Context, repl *Replica) err
if _, pErr := repl.redirectOnOrAcquireLease(ctx); pErr != nil {
switch v := pErr.GetDetail().(type) {
case *roachpb.NotLeaseHolderError, *roachpb.RangeNotFoundError:
if log.V(3) {
log.Infof(queueCtx, "%s; skipping", v)
}
log.Eventf(ctx, "%s; skipping", v)
log.VEventf(ctx, 3, "%s; skipping", v)
return nil
default:
log.VErrEventf(ctx, 2, "could not obtain lease: %s", pErr)
Expand All @@ -761,15 +747,11 @@ func (bq *baseQueue) processReplica(queueCtx context.Context, repl *Replica) err
}
}

if log.V(3) {
log.Infof(queueCtx, "processing")
}
log.VEventf(ctx, 3, "processing...")
if err := bq.impl.process(ctx, repl, cfg); err != nil {
return err
}
if log.V(3) {
log.Infof(ctx, "done")
}
log.VEventf(ctx, 3, "processing... done")
bq.successes.Inc(1)
return nil
}
Expand Down Expand Up @@ -915,9 +897,9 @@ func (bq *baseQueue) addToPurgatoryLocked(
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, fmt.Sprintf("storage.%s: purgatory processing replica", bq.name),
func(annotatedCtx context.Context) {
err := bq.processReplica(annotatedCtx, repl)
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
func(ctx context.Context) {
err := bq.processReplica(ctx, repl)
bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
return
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (td *truncateDecision) NumNewRaftSnapshots() int {

func (td *truncateDecision) String() string {
var buf strings.Builder
_, _ = fmt.Fprintf(&buf, "should truncate: %t [", td.ShouldTruncate())
_, _ = fmt.Fprintf(
&buf,
"truncate %d entries to first index %d (chosen via: %s)",
Expand All @@ -272,6 +273,7 @@ func (td *truncateDecision) String() string {
if n := td.NumNewRaftSnapshots(); n > 0 {
_, _ = fmt.Fprintf(&buf, "; implies %d Raft snapshot%s", n, util.Pluralize(int64(n)))
}
buf.WriteRune(']')

return buf.String()
}
Expand Down Expand Up @@ -429,7 +431,7 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.Syst
// Can and should the raft logs be truncated?
if decision.ShouldTruncate() {
if n := decision.NumNewRaftSnapshots(); log.V(1) || n > 0 && rlq.logSnapshots.ShouldProcess(timeutil.Now()) {
log.Info(ctx, decision)
log.Info(ctx, decision.String())
} else {
log.VEvent(ctx, 1, decision.String())
}
Expand All @@ -443,6 +445,8 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.Syst
return err
}
r.store.metrics.RaftLogTruncated.Inc(int64(decision.NumTruncatableIndexes()))
} else {
log.VEventf(ctx, 3, decision.String())
}
return nil
}
Expand Down
46 changes: 26 additions & 20 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func TestComputeTruncateDecision(t *testing.T) {

const targetSize = 1000

// NB: all tests here have a truncateDecions which starts with "should
// truncate: false", because these tests don't simulate enough data to be over
// the truncation threshold.
testCases := []struct {
progress []uint64
raftLogSize int64
Expand All @@ -117,85 +120,85 @@ func TestComputeTruncateDecision(t *testing.T) {
{
// Nothing to truncate.
[]uint64{1, 2}, 100, 1, 1, 0,
"truncate 0 entries to first index 1 (chosen via: quorum)"},
"should truncate: false [truncate 0 entries to first index 1 (chosen via: quorum)]"},
{
// Nothing to truncate on this replica, though a quorum elsewhere has more progress.
// NB this couldn't happen if we're truly the Raft leader, unless we appended to our
// own log asynchronously.
[]uint64{1, 5, 5}, 100, 1, 1, 0,
"truncate 0 entries to first index 1 (chosen via: followers)",
"should truncate: false [truncate 0 entries to first index 1 (chosen via: followers)]",
},
{
// We're not truncating anything, but one follower is already cut off. There's no pending
// snapshot so we shouldn't be causing any additional snapshots.
[]uint64{1, 5, 5}, 100, 2, 2, 0,
"truncate 0 entries to first index 2 (chosen via: first index)",
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
},
{
// The happy case.
[]uint64{5, 5, 5}, 100, 2, 5, 0,
"truncate 3 entries to first index 5 (chosen via: quorum)",
"should truncate: false [truncate 3 entries to first index 5 (chosen via: quorum)]",
},
{
// No truncation, but the outstanding snapshot is made obsolete by the truncation. However
// it was already obsolete before. (This example is also not one you could manufacture in
// a real system).
[]uint64{5, 5, 5}, 100, 2, 2, 1,
"truncate 0 entries to first index 2 (chosen via: first index)",
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
},
{
// Respecting the pending snapshot.
[]uint64{5, 5, 5}, 100, 2, 5, 3,
"truncate 1 entries to first index 3 (chosen via: pending snapshot)",
"should truncate: false [truncate 1 entries to first index 3 (chosen via: pending snapshot)]",
},
{
// Log is below target size, so respecting the slowest follower.
[]uint64{1, 2, 3, 4}, 100, 1, 5, 0,
"truncate 0 entries to first index 1 (chosen via: followers)",
"should truncate: false [truncate 0 entries to first index 1 (chosen via: followers)]",
},
{
// Truncating since local log starts at 2. One follower is already cut off without a pending
// snapshot.
[]uint64{1, 2, 3, 4}, 100, 2, 2, 0,
"truncate 0 entries to first index 2 (chosen via: first index)",
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
},
// If over targetSize, should truncate to quorum committed index. Minority will need snapshots.
{
[]uint64{1, 3, 3, 4}, 2000, 1, 3, 0,
"truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot",
"should truncate: false [truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot]",
},
// Don't truncate away pending snapshot, even when log too large.
{
[]uint64{100, 100}, 2000, 1, 100, 50,
"truncate 49 entries to first index 50 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)",
"should truncate: false [truncate 49 entries to first index 50 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)]",
},
{
[]uint64{1, 3, 3, 4}, 2000, 2, 3, 0,
"truncate 1 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)",
"should truncate: false [truncate 1 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)]",
},
{
[]uint64{1, 3, 3, 4}, 2000, 3, 3, 0,
"truncate 0 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)",
"should truncate: false [truncate 0 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B)]",
},
// The pending snapshot index affects the quorum commit index.
{
[]uint64{4}, 2000, 1, 7, 1,
"truncate 0 entries to first index 1 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)",
"should truncate: false [truncate 0 entries to first index 1 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)]",
},
// Never truncate past the quorum commit index.
{
[]uint64{3, 3, 6}, 100, 2, 7, 0,
"truncate 1 entries to first index 3 (chosen via: quorum)",
"should truncate: false [truncate 1 entries to first index 3 (chosen via: quorum)]",
},
// Never truncate past the last index.
{
[]uint64{5}, 100, 1, 3, 0,
"truncate 2 entries to first index 3 (chosen via: last index)",
"should truncate: false [truncate 2 entries to first index 3 (chosen via: last index)]",
},
// Never truncate "before the first index".
{
[]uint64{5}, 100, 2, 3, 1,
"truncate 0 entries to first index 2 (chosen via: first index)",
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
}}
for i, c := range testCases {
status := &raft.Status{
Expand Down Expand Up @@ -224,14 +227,17 @@ func TestComputeTruncateDecision(t *testing.T) {
func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) {
defer leaktest.AfterTest(t)()

// NB: most tests here have a truncateDecions which starts with "should
// truncate: false", because these tests don't simulate enough data to be over
// the truncation threshold.
exp := map[bool]map[bool]string{ // (tooLarge, active)
false: {
true: "truncate 0 entries to first index 10 (chosen via: probing follower)",
false: "truncate 90 entries to first index 100 (chosen via: followers)",
true: "should truncate: false [truncate 0 entries to first index 10 (chosen via: probing follower)]",
false: "should truncate: false [truncate 90 entries to first index 100 (chosen via: followers)]",
},
true: {
true: "truncate 0 entries to first index 10 (chosen via: probing follower); log too large (2.0 KiB > 1.0 KiB)",
false: "truncate 290 entries to first index 300 (chosen via: quorum); log too large (2.0 KiB > 1.0 KiB); implies 2 Raft snapshots",
true: "should truncate: false [truncate 0 entries to first index 10 (chosen via: probing follower); log too large (2.0 KiB > 1.0 KiB)]",
false: "should truncate: true [truncate 290 entries to first index 300 (chosen via: quorum); log too large (2.0 KiB > 1.0 KiB); implies 2 Raft snapshots]",
},
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4446,6 +4446,8 @@ func (s *Store) AllocatorDryRun(
func (s *Store) ManuallyEnqueue(
ctx context.Context, queueName string, repl *Replica, skipShouldQueue bool,
) ([]tracing.RecordedSpan, string, error) {
ctx = repl.AnnotateCtx(ctx)

var queue queueImpl
var needsLease bool
for _, replicaQueue := range s.scanner.queues {
Expand Down

0 comments on commit 1c02b84

Please sign in to comment.