diff --git a/pkg/storage/compactor/compactor.go b/pkg/storage/compactor/compactor.go index 5328ec12d5c4..241244d3005b 100644 --- a/pkg/storage/compactor/compactor.go +++ b/pkg/storage/compactor/compactor.go @@ -129,7 +129,11 @@ func NewCompactor( // compactionMinInterval, but only if there are compactions pending. func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) { // Wake up immediately to examine the queue and set the bytes queued metric. - c.ch <- struct{}{} + select { + // The compactor can already have compactions waiting on it, so don't try to block here. + case c.ch <- struct{}{}: + default: + } stopper.RunWorker(ctx, func(ctx context.Context) { var timer timeutil.Timer diff --git a/pkg/storage/compactor/compactor_test.go b/pkg/storage/compactor/compactor_test.go index e979ba77d1e2..5306f68eb539 100644 --- a/pkg/storage/compactor/compactor_test.go +++ b/pkg/storage/compactor/compactor_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" ) const testCompactionLatency = 1 * time.Millisecond @@ -533,6 +534,29 @@ func TestCompactorThresholds(t *testing.T) { } } +// TestCompactorDeadlockOnStart prevents regression of an issue that +// could cause nodes to lock up during the boot sequence. The +// compactor may receive suggestions before starting the goroutine, +// yet starting the goroutine could block on the suggestions channel, +// deadlocking the call to (Compactor).Start and thus the main node +// boot goroutine. This was observed in practice. +func TestCompactorDeadlockOnStart(t *testing.T) { + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + + eng := newWrappedEngine() + stopper.AddCloser(eng) + capFn := func() (roachpb.StoreCapacity, error) { + return roachpb.StoreCapacity{}, errors.New("never called") + } + doneFn := func(_ context.Context) {} + compactor := NewCompactor(eng, capFn, doneFn) + + compactor.ch <- struct{}{} + + compactor.Start(context.Background(), tracing.NewTracer(), stopper) +} + // TestCompactorProcessingInitialization verifies that a compactor gets // started with processing if the queue is non-empty. func TestCompactorProcessingInitialization(t *testing.T) {