diff --git a/pkg/cmd/roachtest/import.go b/pkg/cmd/roachtest/import.go index b1b135ea4cdc..5fa5fdecbe17 100644 --- a/pkg/cmd/roachtest/import.go +++ b/pkg/cmd/roachtest/import.go @@ -147,7 +147,6 @@ func registerImportTPCC(r *testRegistry) { const geoWarehouses = 4000 const geoZones = "europe-west2-b,europe-west4-b,asia-northeast1-b,us-west1-b" r.Add(TestSpec{ - Skip: "#37349 - OOMing", Name: fmt.Sprintf("import/tpcc/warehouses=%d/geo", geoWarehouses), Owner: OwnerBulkIO, Cluster: r.makeClusterSpec(8, spec.CPU(16), spec.Geo(), spec.Zones(geoZones)), diff --git a/pkg/kv/kvserver/intent_resolver_integration_test.go b/pkg/kv/kvserver/intent_resolver_integration_test.go index 0d5bca6e1ff8..1f8476d1575f 100644 --- a/pkg/kv/kvserver/intent_resolver_integration_test.go +++ b/pkg/kv/kvserver/intent_resolver_integration_test.go @@ -14,7 +14,6 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "math" "math/rand" "sync" @@ -183,44 +182,12 @@ func TestContendedIntentWithDependencyCycle(t *testing.T) { } } -// Regression test for https://github.com/cockroachdb/cockroach/issues/64092 -// which makes sure that synchronous ranged intent resolution during rollback -// completes in a reasonable time. -func TestRollbackSyncRangedIntentResolution(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.UnderRace(t, "timing-sensitive test") - - ctx := context.Background() - srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: &StoreTestingKnobs{ - DisableLoadBasedSplitting: true, - IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{ - ForceSyncIntentResolution: true, - }, - }, - }, - }) - defer srv.Stopper().Stop(ctx) - - txn := srv.DB().NewTxn(ctx, "test") - batch := txn.NewBatch() - for i := 0; i < 100000; i++ { - batch.Put([]byte(fmt.Sprintf("key%v", i)), []byte("value")) - } - require.NoError(t, txn.Run(ctx, batch)) - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - require.NoError(t, txn.Rollback(ctx)) - require.NoError(t, ctx.Err()) -} - // Tests that intents and transaction records are cleaned up within a reasonable // timeframe in various scenarios. func TestReliableIntentCleanup(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.WithIssue(t, 66895, "Flaky due to non-deterministic txn cleanup") skip.UnderShort(t) // takes 294s skip.UnderRace(t, "timing-sensitive test") skip.UnderStress(t, "memory-hungry test") diff --git a/pkg/kv/kvserver/scanner.go b/pkg/kv/kvserver/scanner.go index 03053ccf4f8c..f2522e2bab62 100644 --- a/pkg/kv/kvserver/scanner.go +++ b/pkg/kv/kvserver/scanner.go @@ -121,10 +121,9 @@ func (rs *replicaScanner) AddQueues(queues ...replicaQueue) { } // Start spins up the scanning loop. -func (rs *replicaScanner) Start(stopper *stop.Stopper) { - rs.stopper = stopper +func (rs *replicaScanner) Start() { for _, queue := range rs.queues { - queue.Start(stopper) + queue.Start(rs.stopper) } rs.scanLoop() } diff --git a/pkg/kv/kvserver/scanner_test.go b/pkg/kv/kvserver/scanner_test.go index a755222d57d6..37b55c579927 100644 --- a/pkg/kv/kvserver/scanner_test.go +++ b/pkg/kv/kvserver/scanner_test.go @@ -212,10 +212,10 @@ func TestScannerAddToQueues(t *testing.T) { clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges) s.AddQueues(q1, q2) - stopper := stop.NewStopper() + s.stopper = stop.NewStopper() // Start scanner and verify that all ranges are added to both queues. - s.Start(stopper) + s.Start() testutils.SucceedsSoon(t, func() error { if q1.count() != count || q2.count() != count { return errors.Errorf("q1 or q2 count != %d; got %d, %d", count, q1.count(), q2.count()) @@ -239,7 +239,7 @@ func TestScannerAddToQueues(t *testing.T) { }) // Stop scanner and verify both queues are stopped. - stopper.Stop(context.Background()) + s.stopper.Stop(context.Background()) if !q1.isDone() || !q2.isDone() { t.Errorf("expected all queues to stop; got %t, %t", q1.isDone(), q2.isDone()) } @@ -265,10 +265,10 @@ func TestScannerTiming(t *testing.T) { clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) s := newReplicaScanner(makeAmbCtx(), clock, duration, 0, 0, ranges) s.AddQueues(q) - stopper := stop.NewStopper() - s.Start(stopper) + s.stopper = stop.NewStopper() + s.Start() time.Sleep(runTime) - stopper.Stop(context.Background()) + s.stopper.Stop(context.Background()) avg := s.avgScan() log.Infof(context.Background(), "%d: average scan: %s", i, avg) @@ -349,9 +349,9 @@ func TestScannerDisabled(t *testing.T) { clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges) s.AddQueues(q) - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - s.Start(stopper) + s.stopper = stop.NewStopper() + defer s.stopper.Stop(context.Background()) + s.Start() // Verify queue gets all ranges. testutils.SucceedsSoon(t, func() error { @@ -414,9 +414,9 @@ func TestScannerEmptyRangeSet(t *testing.T) { clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) s := newReplicaScanner(makeAmbCtx(), clock, time.Hour, 0, 0, ranges) s.AddQueues(q) - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - s.Start(stopper) + s.stopper = stop.NewStopper() + defer s.stopper.Stop(context.Background()) + s.Start() time.Sleep(time.Millisecond) // give it some time to (not) busy loop if count := s.scanCount(); count > 1 { t.Errorf("expected at most one loop, but got %d", count) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 25e49965d7e7..b0dc01daa456 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1393,6 +1393,12 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // depends on the scanner not having added its own log tag. if s.scanner != nil { s.scanner.AmbientContext.AddLogTag("s", s.StoreID()) + + // We have to set the stopper here to avoid races, since scanner.Start() is + // called async and the stopper is not available when the scanner is + // created. This is a hack, the scanner/queue construction should be + // refactored. + s.scanner.stopper = s.stopper } // If the nodeID is 0, it has not be assigned yet. @@ -1566,7 +1572,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { _ = s.stopper.RunAsyncTask(ctx, "scanner", func(context.Context) { select { case <-s.cfg.Gossip.Connected: - s.scanner.Start(s.stopper) + s.scanner.Start() case <-s.stopper.ShouldQuiesce(): return }