Skip to content

Commit

Permalink
timeutil: stack-allocate Timer, remove pooling
Browse files Browse the repository at this point in the history
Informs cockroachdb#119593.
Closes cockroachdb#38055.

This commit removes the pooling of timeutil.Timer structs and, in doing so,
permits the structs to be stack allocated so that no pooling is necessary. This
superfluous (and in hindsight, harmful) memory pooling was introduced in
f11ec1c, which also added very necessary pooling for the internal time.Timer
structs.

The pooling was harmful because it mandated a contract where Timer structs could
not be used after their Stop method was called. This was surprising (time.Timer
has no such limitation) and led to subtle use-after-free bugs over time (cockroachdb#61373
and cockroachdb#119595). It was also unnecessary because the outer Timer structs can be
stack allocated. Ironically, the only thing that causes them to escape to the
heap was the pooling mechanism itself. Removing pooling solves the issue.

```
name      old time/op    new time/op    delta
Timer-10     153µs ± 1%     152µs ± 1%   ~     (p=0.589 n=10+9)

name      old alloc/op   new alloc/op   delta
Timer-10      200B ± 0%      200B ± 0%   ~     (all equal)

name      old allocs/op  new allocs/op  delta
Timer-10      3.00 ± 0%      3.00 ± 0%   ~     (all equal)
```

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Mar 5, 2024
1 parent 7ce3e6b commit e4318c3
Show file tree
Hide file tree
Showing 47 changed files with 108 additions and 199 deletions.
5 changes: 2 additions & 3 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type backupDataProcessor struct {
// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulk.TracingAggregator
aggTimer *timeutil.Timer
aggTimer timeutil.Timer

// completedSpans tracks how many spans have been successfully backed up by
// the backup processor.
Expand Down Expand Up @@ -206,7 +206,6 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// bps' trace recording.
bp.agg = bulk.TracingAggregatorForContext(ctx)
bp.aggTimer = timeutil.NewTimer()
// If the aggregator is nil, we do not want the timer to fire.
if bp.agg != nil {
bp.aggTimer.Reset(15 * time.Second)
Expand Down Expand Up @@ -471,7 +470,7 @@ func runBackupProcessor(
// priority becomes true when we're sending re-attempts of reads far enough
// in the past that we want to run them with priority.
var priority bool
timer := timeutil.NewTimer()
var timer timeutil.Timer
defer timer.Stop()

ctxDone := ctx.Done()
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type restoreDataProcessor struct {
// Aggregator that aggregates StructuredEvents emitted in the
// restoreDataProcessors' trace recording.
agg *bulkutil.TracingAggregator
aggTimer *timeutil.Timer
aggTimer timeutil.Timer

// qp is a MemoryBackedQuotaPool that restricts the amount of memory that
// can be used by this processor to open iterators on SSTs.
Expand Down Expand Up @@ -229,7 +229,6 @@ func newRestoreDataProcessor(
func (rd *restoreDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", rd.spec.JobID)
rd.agg = bulkutil.TracingAggregatorForContext(ctx)
rd.aggTimer = timeutil.NewTimer()
// If the aggregator is nil, we do not want the timer to fire.
if rd.agg != nil {
rd.aggTimer.Reset(15 * time.Second)
Expand Down
16 changes: 6 additions & 10 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type streamIngestionProcessor struct {
batcher *bulk.SSTBatcher
// rangeBatcher is used to flush range KVs into SST to the storage layer.
rangeBatcher *rangeKeyBatcher
maxFlushRateTimer *timeutil.Timer
maxFlushRateTimer timeutil.Timer

// client is a streaming client which provides a stream of events from a given
// address.
Expand Down Expand Up @@ -266,7 +266,7 @@ type streamIngestionProcessor struct {
// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulkutil.TracingAggregator
aggTimer *timeutil.Timer
aggTimer timeutil.Timer
}

// partitionEvent augments a normal event with the partition it came from.
Expand Down Expand Up @@ -311,10 +311,9 @@ func newStreamIngestionDataProcessor(
}

sip := &streamIngestionProcessor{
flowCtx: flowCtx,
spec: spec,
frontier: frontier,
maxFlushRateTimer: timeutil.NewTimer(),
flowCtx: flowCtx,
spec: spec,
frontier: frontier,
cutoverProvider: &cutoverFromJobProgress{
jobID: jobspb.JobID(spec.JobID),
db: flowCtx.Cfg.DB,
Expand Down Expand Up @@ -373,7 +372,6 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", sip.spec.JobID)
log.Infof(ctx, "starting ingest proc")
sip.agg = bulkutil.TracingAggregatorForContext(ctx)
sip.aggTimer = timeutil.NewTimer()

// If the aggregator is nil, we do not want the timer to fire.
if sip.agg != nil {
Expand Down Expand Up @@ -578,9 +576,7 @@ func (sip *streamIngestionProcessor) close() {
if sip.batcher != nil {
sip.batcher.Close(sip.Ctx())
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
}
sip.maxFlushRateTimer.Stop()
sip.aggTimer.Stop()

sip.InternalClose()
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {

m.Go(func(ctx context.Context) error {
defer close(done)
testTimer := timeutil.NewTimer()
var testTimer timeutil.Timer
testTimer.Reset(workloadDuration)
select {
case <-earlyExit:
Expand All @@ -104,7 +104,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
randomPoller := func(f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context) error {

return func(ctx context.Context) error {
pTimer := timeutil.NewTimer()
var pTimer timeutil.Timer
defer pTimer.Stop()
for {
waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second
Expand Down
3 changes: 1 addition & 2 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,7 @@ func (g *Gossip) bootstrap(rpcContext *rpc.Context) {
func (g *Gossip) manage(rpcContext *rpc.Context) {
ctx := g.AnnotateCtx(context.Background())
_ = g.server.stopper.RunAsyncTask(ctx, "gossip-manage", func(ctx context.Context) {
cullTimer := timeutil.NewTimer()
stallTimer := timeutil.NewTimer()
var cullTimer, stallTimer timeutil.Timer
defer cullTimer.Stop()
defer stallTimer.Stop()

Expand Down
8 changes: 2 additions & 6 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,9 @@ func (b *RequestBatcher) run(ctx context.Context) {
}
}
deadline time.Time
timer = timeutil.NewTimer()
timer timeutil.Timer
)
// In any case, stop the timer when the function returns.
// We can't defer timer.Stop directly because we re-assign
// timer in maybeSetTimer below.
defer func() { timer.Stop() }()
defer timer.Stop()

maybeSetTimer := func() {
var nextDeadline time.Time
Expand All @@ -538,7 +535,6 @@ func (b *RequestBatcher) run(ctx context.Context) {
// Clear the current timer due to a sole batch already sent before
// the timer fired.
timer.Stop()
timer = timeutil.NewTimer()
}
}
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func jitter(dur time.Duration) time.Duration {
//
// Common usage pattern:
//
// lc, cleanup := makeLoopController(...)
// defer cleanup()
// lc := makeLoopController(...)
// defer lc.cleanup()
// for {
// select {
// case <- lc.update:
Expand All @@ -212,21 +212,20 @@ func jitter(dur time.Duration) time.Duration {
// }
// }
type loopController struct {
timer *timeutil.Timer
timer timeutil.Timer
lastRun time.Time
updated chan struct{}
// getInterval returns the value of the associated cluster setting.
getInterval func() time.Duration
}

// makeLoopController returns a structure that controls the execution of a job
// at regular intervals. Moreover, it returns a cleanup function that should be
// deferred to execute before destroying the instantiated structure.
// at regular intervals. The structure's cleanup method should be deferred to
// execute before destroying the instantiated structure.
func makeLoopController(
st *cluster.Settings, s *settings.DurationSetting, overrideKnob *time.Duration,
) (loopController, func()) {
) loopController {
lc := loopController{
timer: timeutil.NewTimer(),
lastRun: timeutil.Now(),
updated: make(chan struct{}, 1),
// getInterval returns the value of the associated cluster setting. If
Expand All @@ -253,7 +252,7 @@ func makeLoopController(
intervalBaseSetting.SetOnChange(&st.SV, onChange)

lc.timer.Reset(jitter(lc.getInterval()))
return lc, func() { lc.timer.Stop() }
return lc
}

// onUpdate is called when the associated interval setting gets updated.
Expand All @@ -266,3 +265,8 @@ func (lc *loopController) onExecute() {
lc.lastRun = timeutil.Now()
lc.timer.Reset(jitter(lc.getInterval()))
}

// cleanup stops the loop controller's timer.
func (lc *loopController) cleanup() {
lc.timer.Stop()
}
2 changes: 1 addition & 1 deletion pkg/jobs/metricspoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error
exec := execCtx.(sql.JobExecContext)
metrics := exec.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypePollJobsStats].(pollerMetrics)

t := timeutil.NewTimer()
var t timeutil.Timer
defer t.Stop()

runTask := func(name string, task func(ctx context.Context, execCtx sql.JobExecContext) error) error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,8 +1021,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
defer cancel()

cancelLoopTask(ctx)
lc, cleanup := makeLoopController(r.settings, cancelIntervalSetting, r.knobs.IntervalOverrides.Cancel)
defer cleanup()
lc := makeLoopController(r.settings, cancelIntervalSetting, r.knobs.IntervalOverrides.Cancel)
defer lc.cleanup()
for {
select {
case <-lc.updated:
Expand Down Expand Up @@ -1053,8 +1053,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

lc, cleanup := makeLoopController(r.settings, gcIntervalSetting, r.knobs.IntervalOverrides.Gc)
defer cleanup()
lc := makeLoopController(r.settings, gcIntervalSetting, r.knobs.IntervalOverrides.Gc)
defer lc.cleanup()

// Retention duration of terminal job records.
retentionDuration := func() time.Duration {
Expand Down Expand Up @@ -1092,8 +1092,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {

ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()
lc, cleanup := makeLoopController(r.settings, adoptIntervalSetting, r.knobs.IntervalOverrides.Adopt)
defer cleanup()
lc := makeLoopController(r.settings, adoptIntervalSetting, r.knobs.IntervalOverrides.Adopt)
defer lc.cleanup()
for {
select {
case <-lc.updated:
Expand Down
3 changes: 2 additions & 1 deletion pkg/keyvisualizer/spanstatscollector/span_stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func (s *SpanStatsCollector) Start(ctx context.Context, stopper *stop.Stopper) {
if err := stopper.RunAsyncTask(ctx, "span-stats-collector",
func(ctx context.Context) {
s.reset()
t := timeutil.NewTimer()
var t timeutil.Timer
defer t.Stop()
for {
samplePeriod := keyvissettings.SampleInterval.Get(&s.settings.SV)
now := timeutil.Now()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {

// We use the probe interval as the scan interval, since we can sort of
// consider this to be probing the replicas for a stall.
timer := timeutil.NewTimer()
var timer timeutil.Timer
defer timer.Stop()
timer.Reset(CircuitBreakerProbeInterval.Get(&d.settings.SV))

Expand Down Expand Up @@ -670,7 +670,7 @@ func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {

// Continually probe the replica until it succeeds. We probe immediately
// since we only trip the breaker on probe failure.
timer := timeutil.NewTimer()
var timer timeutil.Timer
defer timer.Stop()
timer.Reset(CircuitBreakerProbeInterval.Get(&r.d.settings.SV))

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
d := func() time.Duration {
return withJitter(interval.Get(&p.settings.SV), rnd)
}
t := timeutil.NewTimer()
var t timeutil.Timer
defer t.Stop()
t.Reset(d())

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5304,7 +5304,7 @@ func TestOptimisticEvalRetry(t *testing.T) {
})
}()
removedLocks := false
timer := timeutil.NewTimer()
var timer timeutil.Timer
timer.Reset(time.Second * 2)
defer timer.Stop()
done := false
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) {
s.buf.Close()
}()

timer := timeutil.NewTimer()
var timer timeutil.Timer
defer timer.Stop()
for {
interval := closedts.SideTransportCloseInterval.Get(&s.st.SV)
Expand All @@ -251,7 +251,6 @@ func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) {
} else {
// Disable the side-transport.
timer.Stop()
timer = timeutil.NewTimer()
}
select {
case <-timer.C:
Expand Down
11 changes: 3 additions & 8 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func (w *lockTableWaiterImpl) WaitOn(
ctxDoneC := ctx.Done()
shouldQuiesceC := w.stopper.ShouldQuiesce()
// Used to delay liveness and deadlock detection pushes.
var timer *timeutil.Timer
var timer timeutil.Timer
defer timer.Stop()
var timerC <-chan time.Time
var timerWaitingState waitingState
// Used to enforce lock timeouts.
Expand Down Expand Up @@ -227,10 +228,6 @@ func (w *lockTableWaiterImpl) WaitOn(
delay, deadlockOrLivenessPush, timeoutPush, priorityPush, waitPolicyPush)

if delay > 0 {
if timer == nil {
timer = timeutil.NewTimer()
defer timer.Stop()
}
timer.Reset(delay)
timerC = timer.C
} else {
Expand Down Expand Up @@ -309,10 +306,8 @@ func (w *lockTableWaiterImpl) WaitOn(
// it should push. It may be the case that the transaction is part
// of a dependency cycle or that the lock holder's coordinator node
// has crashed.
timer.Read = true
timerC = nil
if timer != nil {
timer.Read = true
}
if w.onPushTimer != nil {
w.onPushTimer()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/protectedts/ptcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (c *Cache) periodicallyRefreshProtectedtsCache(ctx context.Context) {
default:
}
})
timer := timeutil.NewTimer()
var timer timeutil.Timer
defer timer.Stop()
timer.Reset(0) // Read immediately upon startup
var lastReset time.Time
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *Reconciler) run(ctx context.Context, stopper *stop.Stopper) {
const jitterFrac = .1
return time.Duration(float64(interval) * (1 + (rand.Float64()-.5)*jitterFrac))
}
timer := timeutil.NewTimer()
var timer timeutil.Timer
defer timer.Stop()
for {
timer.Reset(timeutil.Until(lastReconciled.Add(getInterval())))
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ func (t *RaftTransport) startDroppingFlowTokensForDisconnectedNodes(ctx context.
}
})

timer := timeutil.NewTimer()
var timer timeutil.Timer
defer timer.Stop()

for {
Expand All @@ -1053,7 +1053,6 @@ func (t *RaftTransport) startDroppingFlowTokensForDisconnectedNodes(ctx context.
} else {
// Disable the mechanism.
timer.Stop()
timer = timeutil.NewTimer()
}
select {
case <-timer.C:
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu

// Wait for the checksum computation to start.
dur := r.checksumInitialWait(ctx)
t := timeutil.NewTimer()
var t timeutil.Timer
t.Reset(dur)
defer t.Stop()
var taskCancel context.CancelFunc
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// against this in checkRequestTimeRLocked). So instead of assuming
// anything, we iterate and check again.
pErr = func() (pErr *kvpb.Error) {
slowTimer := timeutil.NewTimer()
var slowTimer timeutil.Timer
defer slowTimer.Stop()
slowTimer.Reset(base.SlowRequestThreshold)
tBegin := timeutil.Now()
Expand Down
Loading

0 comments on commit e4318c3

Please sign in to comment.