Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
66959: roachtest: unskip import/tpcc/warehouses=4000/geo r=pbardea a=pbardea

This test completed successfully 5/5 times and it was skipped a long time
ago (~2 years), unskipping and will keep an eye on it in case it starts
OOMing again.

Will continue running in background to get more successful runs.

Release note: None

67003: kvserver: remove TestRollbackSyncRangedIntentResolution r=tbg a=erikgrinaker

This test is flaky since intent resolution is non-deterministic, doesn't
seem worth it to keep it around.

Release note: None

67005: kvserver: skip TestReliableIntentCleanup r=tbg a=erikgrinaker

Flakorama. Unclear if it'll be possible to salvage this test without
significant changes to txn cleanup.

Touches #66895.

Release note: None

67006: kvserver: fix data race on replicaScanner.stopper r=tbg a=erikgrinaker

In #65781, a race was introduced on `replicaScanner.stopper`, since it
is set by `Start()` and accessed by `RemoveReplica()` but `Start()` is
called asynchronously.

To avoid introducing a mutex around the stopper or refactoring the store
construction, this adds a hack that writes the stopper directly to
`replicaScanner.stopper` synchronously during `Store.Start()`.

Release note: None

/cc @cockroachdb/kv 

Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
3 people committed Jun 29, 2021
5 parents 8507369 + ef97701 + 570fecb + cf45a1e + 89d5d47 commit fb69c0c
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 51 deletions.
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func registerImportTPCC(r *testRegistry) {
const geoWarehouses = 4000
const geoZones = "europe-west2-b,europe-west4-b,asia-northeast1-b,us-west1-b"
r.Add(TestSpec{
Skip: "#37349 - OOMing",
Name: fmt.Sprintf("import/tpcc/warehouses=%d/geo", geoWarehouses),
Owner: OwnerBulkIO,
Cluster: r.makeClusterSpec(8, spec.CPU(16), spec.Geo(), spec.Zones(geoZones)),
Expand Down
35 changes: 1 addition & 34 deletions pkg/kv/kvserver/intent_resolver_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math"
"math/rand"
"sync"
Expand Down Expand Up @@ -183,44 +182,12 @@ func TestContendedIntentWithDependencyCycle(t *testing.T) {
}
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/64092
// which makes sure that synchronous ranged intent resolution during rollback
// completes in a reasonable time.
func TestRollbackSyncRangedIntentResolution(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "timing-sensitive test")

ctx := context.Background()
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
DisableLoadBasedSplitting: true,
IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{
ForceSyncIntentResolution: true,
},
},
},
})
defer srv.Stopper().Stop(ctx)

txn := srv.DB().NewTxn(ctx, "test")
batch := txn.NewBatch()
for i := 0; i < 100000; i++ {
batch.Put([]byte(fmt.Sprintf("key%v", i)), []byte("value"))
}
require.NoError(t, txn.Run(ctx, batch))
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
require.NoError(t, txn.Rollback(ctx))
require.NoError(t, ctx.Err())
}

// Tests that intents and transaction records are cleaned up within a reasonable
// timeframe in various scenarios.
func TestReliableIntentCleanup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 66895, "Flaky due to non-deterministic txn cleanup")
skip.UnderShort(t) // takes 294s
skip.UnderRace(t, "timing-sensitive test")
skip.UnderStress(t, "memory-hungry test")
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ func (rs *replicaScanner) AddQueues(queues ...replicaQueue) {
}

// Start spins up the scanning loop.
func (rs *replicaScanner) Start(stopper *stop.Stopper) {
rs.stopper = stopper
func (rs *replicaScanner) Start() {
for _, queue := range rs.queues {
queue.Start(stopper)
queue.Start(rs.stopper)
}
rs.scanLoop()
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ func TestScannerAddToQueues(t *testing.T) {
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges)
s.AddQueues(q1, q2)
stopper := stop.NewStopper()
s.stopper = stop.NewStopper()

// Start scanner and verify that all ranges are added to both queues.
s.Start(stopper)
s.Start()
testutils.SucceedsSoon(t, func() error {
if q1.count() != count || q2.count() != count {
return errors.Errorf("q1 or q2 count != %d; got %d, %d", count, q1.count(), q2.count())
Expand All @@ -239,7 +239,7 @@ func TestScannerAddToQueues(t *testing.T) {
})

// Stop scanner and verify both queues are stopped.
stopper.Stop(context.Background())
s.stopper.Stop(context.Background())
if !q1.isDone() || !q2.isDone() {
t.Errorf("expected all queues to stop; got %t, %t", q1.isDone(), q2.isDone())
}
Expand All @@ -265,10 +265,10 @@ func TestScannerTiming(t *testing.T) {
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, duration, 0, 0, ranges)
s.AddQueues(q)
stopper := stop.NewStopper()
s.Start(stopper)
s.stopper = stop.NewStopper()
s.Start()
time.Sleep(runTime)
stopper.Stop(context.Background())
s.stopper.Stop(context.Background())

avg := s.avgScan()
log.Infof(context.Background(), "%d: average scan: %s", i, avg)
Expand Down Expand Up @@ -349,9 +349,9 @@ func TestScannerDisabled(t *testing.T) {
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges)
s.AddQueues(q)
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
s.Start(stopper)
s.stopper = stop.NewStopper()
defer s.stopper.Stop(context.Background())
s.Start()

// Verify queue gets all ranges.
testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -414,9 +414,9 @@ func TestScannerEmptyRangeSet(t *testing.T) {
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, time.Hour, 0, 0, ranges)
s.AddQueues(q)
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
s.Start(stopper)
s.stopper = stop.NewStopper()
defer s.stopper.Stop(context.Background())
s.Start()
time.Sleep(time.Millisecond) // give it some time to (not) busy loop
if count := s.scanCount(); count > 1 {
t.Errorf("expected at most one loop, but got %d", count)
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,12 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// depends on the scanner not having added its own log tag.
if s.scanner != nil {
s.scanner.AmbientContext.AddLogTag("s", s.StoreID())

// We have to set the stopper here to avoid races, since scanner.Start() is
// called async and the stopper is not available when the scanner is
// created. This is a hack, the scanner/queue construction should be
// refactored.
s.scanner.stopper = s.stopper
}

// If the nodeID is 0, it has not be assigned yet.
Expand Down Expand Up @@ -1566,7 +1572,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
_ = s.stopper.RunAsyncTask(ctx, "scanner", func(context.Context) {
select {
case <-s.cfg.Gossip.Connected:
s.scanner.Start(s.stopper)
s.scanner.Start()
case <-s.stopper.ShouldQuiesce():
return
}
Expand Down

0 comments on commit fb69c0c

Please sign in to comment.