Skip to content

Commit

Permalink
kv: initialize consistencyLimiter during Store construction, before S…
Browse files Browse the repository at this point in the history
…tart

Fixes #96231.

This commit attempts to fix #96231. It moves the initialization of
`Store.consistencyLimiter` up from the bottom of `Store.Start` into
`NewStore`. It is unsafe to initialize this variable after the call to
`Store.processRaft`, which starts Raft processing. Beyond that point,
incoming Raft traffic is permitted and calls to `computeChecksumPostApply`
are possible.

The two stacktraces we have in that issue conclusively point to the
`Store.consistencyLimiter` being nil during a call to
`(*Replica).computeChecksumPostApply`. This startup-time race is the
only possible explanation I could come up with.

Release note (bug fix): Fixed a rare startup race that could cause a
`invalid memory address or nil pointer dereference` error.
  • Loading branch information
nvanbenschoten committed Apr 4, 2023
1 parent e1c8325 commit 35c9283
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,17 @@ func NewStore(
s.renewableLeasesSignal = make(chan struct{}, 1)
}

s.consistencyLimiter = quotapool.NewRateLimiter(
"ConsistencyQueue",
quotapool.Limit(consistencyCheckRate.Get(&cfg.Settings.SV)),
consistencyCheckRate.Get(&cfg.Settings.SV)*consistencyCheckRateBurstFactor,
quotapool.WithMinimumWait(consistencyCheckRateMinWait))

consistencyCheckRate.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
rate := consistencyCheckRate.Get(&cfg.Settings.SV)
s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor)
})

s.limiters.BulkIOWriteRate = rate.NewLimiter(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)), kvserverbase.BulkIOWriteBurst)
bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.BulkIOWriteRate.SetLimit(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)))
Expand Down Expand Up @@ -2057,17 +2068,6 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.storeRebalancer.Start(ctx, s.stopper)
}

s.consistencyLimiter = quotapool.NewRateLimiter(
"ConsistencyQueue",
quotapool.Limit(consistencyCheckRate.Get(&s.ClusterSettings().SV)),
consistencyCheckRate.Get(&s.ClusterSettings().SV)*consistencyCheckRateBurstFactor,
quotapool.WithMinimumWait(consistencyCheckRateMinWait))

consistencyCheckRate.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) {
rate := consistencyCheckRate.Get(&s.ClusterSettings().SV)
s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor)
})

// Set the started flag (for unittests).
atomic.StoreInt32(&s.started, 1)

Expand Down

0 comments on commit 35c9283

Please sign in to comment.