Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61456: kvserver: deflake TestBackpressureNotAppliedWhenReducingRangeSize r=ajwerner a=tbg

Fixes cockroachdb#61453.

One question I have is why `(*storage.Pebble).Close` swallows the error from
closing the underlying pebble instance, i.e. why don't we add this diff:

```diff
diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go
index a85e0e1d35..88ebc20025 100644
--- a/pkg/storage/pebble.go
+++ b/pkg/storage/pebble.go
@@ -635,7 +635,9 @@ func (p *Pebble) Close() {
                return
        }
        p.closed = true
-       _ = p.db.Close()
+       if err := p.db.Close(); err != nil {
+               panic(err)
+       }
 }
 
 // Closed implements the Engine interface.
```

From my cursory reading of the pebble codebase, leaked iterators are one
source of errors here.

Release justification: non-production code changes
Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Mar 4, 2021
2 parents ceab9c0 + 8a8605d commit a8d8f06
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/client_replica_backpressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ func TestBackpressureNotAppliedWhenReducingRangeSize(t *testing.T) {
require.Len(t, repl.Desc().Replicas().Descriptors(), 1)
// We really need to make sure that the split queue has hit this range,
// otherwise we'll fail to backpressure.
go func() { _ = s.ForceSplitScanAndProcess() }()
_ = tc.Stopper().RunAsyncTask(ctx, "force-split", func(context.Context) {
_ = s.ForceSplitScanAndProcess()
})

waitForBlocked(repl.RangeID)

// Observe backpressure now that the range is just over the limit.
Expand All @@ -287,11 +290,11 @@ func TestBackpressureNotAppliedWhenReducingRangeSize(t *testing.T) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
upsertErrCh := make(chan error)
go func() {
_ = tc.Stopper().RunAsyncTask(ctx, "upsert", func(ctx context.Context) {
_, err := c.ExecEx(ctxWithCancel, "UPSERT INTO foo VALUES ($1, $2)",
nil /* options */, rRand.Intn(numRows), randutil.RandBytes(rRand, rowSize))
upsertErrCh <- err
}()
})

select {
case <-time.After(10 * time.Millisecond):
Expand Down

0 comments on commit a8d8f06

Please sign in to comment.