Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
105401: roachtest: address c2c roachtest goroutine leaks r=renatolabs a=msbutler

Previously in the c2c roachtest driver, the monitor which mangaged the
goroutine running the foreground workload was initiated with the wrong context.
This allowed the workload goroutine to continue running even after node deaths
and after the run() function returned.  The monitor is now initiated with the
context the driver later uses to cleanly cancel the workload

Further, there were several other goroutines in the c2c driver and in 2 other
roachtests (in mixed_version_c2c.go and in disk_stall.go) which leaked after
the caller returned. Further, all goroutine callers now wait for their
goroutines to complete.

Epic: none

Release note: None

105557: sql: fix logic test for showing cluster setting defaults and origin r=zachlite a=xinhaoz

A logic test that was verifying the results of
`ALTER TENANT ALL SET CLUSTER SETTING` was failing under stress runs as the effects of this query can take some time to propagate to tenants. Adding a retry to the select stmt fixes this issue by ensuring we wait for the setting to change.

Fixes: #105485

Release note: None

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
  • Loading branch information
3 people committed Jun 26, 2023
3 parents d03d3e6 + f797955 + e6803eb commit b2e9215
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/cmd/roachtest/cluster/monitor_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Monitor interface {
ExpectDeath()
ExpectDeaths(count int32)
ResetDeaths()

// Go spawns a goroutine whose fatal errors will be handled gracefully leading to a
// clean roachtest shutdown. To prevent leaky goroutines, the caller must call
// Wait() or WaitE() before returning.
Go(fn func(context.Context) error)
GoWithCancel(fn func(context.Context) error) func()
WaitE() error
Expand Down
29 changes: 19 additions & 10 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,18 +634,23 @@ func (rd *replicationDriver) main(ctx context.Context) {
}

rd.t.L().Printf("begin workload on src cluster")
m := rd.newMonitor(ctx)
// The roachtest driver can use the workloadCtx to cancel the workload.

// Pass a cancellable context to the workload monitor so the driver can cleanly cancel the
// workload goroutine.
workloadCtx, workloadCancel := context.WithCancel(ctx)
defer workloadCancel()
workloadMonitor := rd.newMonitor(workloadCtx)
defer func() {
workloadCancel()
workloadMonitor.Wait()
}()

workloadDoneCh := make(chan struct{})
m.Go(func(ctx context.Context) error {
workloadMonitor.Go(func(ctx context.Context) error {
defer close(workloadDoneCh)
err := rd.runWorkload(workloadCtx)
err := rd.runWorkload(ctx)
// The workload should only return an error if the roachtest driver cancels the
// workloadCtx after rd.additionalDuration has elapsed after the initial scan completes.
if err != nil && workloadCtx.Err() == nil {
// ctx after the rd.additionalDuration has elapsed after the initial scan completes.
if err != nil && ctx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
Expand All @@ -669,9 +674,11 @@ func (rd *replicationDriver) main(ctx context.Context) {
getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0)
defer lv.maybeLogLatencyHist()

m.Go(func(ctx context.Context) error {
latencyMonitor := rd.newMonitor(ctx)
latencyMonitor.Go(func(ctx context.Context) error {
return lv.pollLatency(ctx, rd.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh)
})
defer latencyMonitor.Wait()

rd.t.L().Printf("waiting for replication stream to finish ingesting initial scan")
rd.waitForReplicatedTime(ingestionJobID, rd.rs.timeout/2)
Expand All @@ -684,7 +691,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.t.L().Printf("workload finished on its own")
case <-time.After(rd.getWorkloadTimeout()):
workloadCancel()
rd.t.L().Printf("workload has cancelled after %s", rd.rs.additionalDuration)
rd.t.L().Printf("workload was cancelled after %s", rd.rs.additionalDuration)
case <-ctx.Done():
rd.t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`)
return
Expand Down Expand Up @@ -848,6 +855,7 @@ func registerClusterToCluster(r registry.Registry) {
defer hc.Done()

rd.main(ctx)
m.Wait()
})
}
}
Expand Down Expand Up @@ -1123,6 +1131,7 @@ func registerClusterReplicationResilience(r registry.Registry) {
rrd.main(ctx)
return nil
})
defer m.Wait()

// Don't begin shutdown process until c2c job is set up.
<-shutdownSetupDone
Expand Down Expand Up @@ -1193,6 +1202,7 @@ func registerClusterReplicationDisconnect(r registry.Registry) {
rd.main(ctx)
return nil
})
defer m.Wait()

// Dont begin node disconnecion until c2c job is setup.
<-shutdownSetupDone
Expand Down Expand Up @@ -1224,7 +1234,6 @@ func registerClusterReplicationDisconnect(r registry.Registry) {
getSrcDestNodePairs(rd, ingestionProgressUpdate)
blackholeFailer.Cleanup(ctx)
rd.t.L().Printf("Nodes reconnected. C2C Job should eventually complete")
m.Wait()
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/disk_stall.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func runDiskStalledDetection(
`{pgurl:1-3}`)
return nil
})
defer m.Wait()

// Wait between [3m,6m) before stalling the disk.
pauseDur := 3*time.Minute + time.Duration(rand.Intn(3))*time.Minute
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/cluster_settings
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ ALTER TENANT ALL SET CLUSTER SETTING sql.index_recommendation.drop_unused_durati
user root

onlyif config 3node-tenant-default-configs
query TTTT
query TTTT retry
SELECT variable, value, default_value, origin FROM [SHOW ALL CLUSTER SETTINGS]
WHERE variable IN ('sql.index_recommendation.drop_unused_duration')
----
Expand Down

0 comments on commit b2e9215

Please sign in to comment.