Skip to content

Commit

Permalink
storage: rate-limit AddSST requests
Browse files Browse the repository at this point in the history
We've been seeing extremely high latency for foreground traffic during bulk
index backfills, because AddSST requests into non-empty ranges can be
expensive, and write requests that are queued behind an AddSST request for an
overlapping span can get stuck waiting for multiple seconds. This PR limits the
number of concurrent AddSST requests for a single store, determined by a new
cluster setting, `kv.bulk_io_write.concurrent_addsst_requests`, to decrease the
impact of index backfills on foreground writes. (It also decreases the risk of
writing too many L0 files to RocksDB at once, which causes stalls.)

Release note (general change): Add a new cluster setting,
`kv.bulk_io_write.concurrent_addsst_requests`, which limits the number of
SSTables that can be added concurrently during bulk operations.
  • Loading branch information
lucy-zhang committed Apr 2, 2019
1 parent cf19139 commit fff2cc9
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>2</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.qps_rebalance_threshold</code></td><td>float</td><td><code>0.25</code></td><td>minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_addsst_requests</code></td><td>integer</td><td><code>1</code></td><td>number of AddSST requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_export_requests</code></td><td>integer</td><td><code>3</code></td><td>number of export requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_import_requests</code></td><td>integer</td><td><code>1</code></td><td>number of import requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.max_rate</code></td><td>byte size</td><td><code>8.0 EiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,16 @@ func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool {
return false
}

// IsSingleAddSSTableRequest returns true iff the batch contains a single
// request, and that request is an AddSSTableRequest.
func (ba *BatchRequest) IsSingleAddSSTableRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*AddSSTableRequest)
return ok
}
return false
}

// IsCompleteTransaction determines whether a batch contains every write in a
// transactions.
func (ba *BatchRequest) IsCompleteTransaction() bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Limiters struct {
BulkIOWriteRate *rate.Limiter
ConcurrentImports limit.ConcurrentRequestLimiter
ConcurrentExports limit.ConcurrentRequestLimiter
ConcurrentAddSSTs limit.ConcurrentRequestLimiter
// concurrentRangefeedIters is a semaphore used to limit the number of
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ var importRequestsLimit = settings.RegisterPositiveIntSetting(
1,
)

// addSSTRequestLimit limits concurrent AddSST requests.
var addSSTRequestLimit = settings.RegisterPositiveIntSetting(
"kv.bulk_io_write.concurrent_addsst_requests",
"number of AddSST requests a store will handle concurrently before queuing",
1,
)

// concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators.
var concurrentRangefeedItersLimit = settings.RegisterPositiveIntSetting(
"kv.rangefeed.concurrent_catchup_iterators",
Expand Down Expand Up @@ -852,6 +859,12 @@ func NewStore(
}
s.limiters.ConcurrentExports.SetLimit(limit)
})
s.limiters.ConcurrentAddSSTs = limit.MakeConcurrentRequestLimiter(
"addSSTRequestLimiter", int(addSSTRequestLimit.Get(&cfg.Settings.SV)),
)
importRequestsLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.ConcurrentAddSSTs.SetLimit(int(addSSTRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter(
"rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)),
)
Expand Down Expand Up @@ -2743,6 +2756,15 @@ func (s *Store) Send(
}
}

// Limit the number of concurrent AddSST requests, since they're expensive
// and block all other writes to the same span.
if ba.IsSingleAddSSTableRequest() {
if err := s.limiters.ConcurrentAddSSTs.Begin(ctx); err != nil {
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTs.Finish()
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
return nil, roachpb.NewError(err)
}
Expand Down

0 comments on commit fff2cc9

Please sign in to comment.