Skip to content

Commit

Permalink
compactor: don't block during suggestion in Start()
Browse files Browse the repository at this point in the history
When we `Start` the `Compactor`, it may already have received
`Suggestion`s, deadlocking the previously blocking write to a full
channel.

Release note (bug fix): Fixed a scenario in which a node could deadlock
while starting up.
  • Loading branch information
tbg committed Apr 16, 2018
1 parent dddb257 commit c4b5e85
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func NewCompactor(
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) {
// Wake up immediately to examine the queue and set the bytes queued metric.
c.ch <- struct{}{}
select {
// The compactor can already have compactions waiting on it, so don't try to block here.
case c.ch <- struct{}{}:
default:
}

stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

const testCompactionLatency = 1 * time.Millisecond
Expand Down Expand Up @@ -533,6 +534,29 @@ func TestCompactorThresholds(t *testing.T) {
}
}

// TestCompactorDeadlockOnStart prevents regression of an issue that
// could cause nodes to lock up during the boot sequence. The
// compactor may receive suggestions before starting the goroutine,
// yet starting the goroutine could block on the suggestions channel,
// deadlocking the call to (Compactor).Start and thus the main node
// boot goroutine. This was observed in practice.
func TestCompactorDeadlockOnStart(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

eng := newWrappedEngine()
stopper.AddCloser(eng)
capFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{}, errors.New("never called")
}
doneFn := func(_ context.Context) {}
compactor := NewCompactor(eng, capFn, doneFn)

compactor.ch <- struct{}{}

compactor.Start(context.Background(), tracing.NewTracer(), stopper)
}

// TestCompactorProcessingInitialization verifies that a compactor gets
// started with processing if the queue is non-empty.
func TestCompactorProcessingInitialization(t *testing.T) {
Expand Down

0 comments on commit c4b5e85

Please sign in to comment.