Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#85756

74337: kv: pool rangeCacheKey objects r=arulajmani a=nvanbenschoten

This commit introduces a `sync.Pool` for `rangeCacheKey` objects. This
is used to avoid heap allocations when querying and updating the `RangeCache`.

```
name                      old time/op    new time/op    delta
KV/Scan/Native/rows=1-10    14.8µs ± 2%    14.9µs ± 4%    ~     (p=0.356 n=9+10)
KV/Scan/SQL/rows=1-10       92.1µs ± 4%    94.3µs ± 5%    ~     (p=0.113 n=9+10)

name                      old alloc/op   new alloc/op   delta
KV/Scan/Native/rows=1-10    6.87kB ± 0%    6.85kB ± 0%  -0.35%  (p=0.000 n=10+10)
KV/Scan/SQL/rows=1-10       20.0kB ± 0%    20.0kB ± 0%  -0.25%  (p=0.012 n=10+10)

name                      old allocs/op  new allocs/op  delta
KV/Scan/Native/rows=1-10      52.0 ± 0%      51.0 ± 0%  -1.92%  (p=0.000 n=10+10)
KV/Scan/SQL/rows=1-10          244 ± 0%       242 ± 0%  -0.78%  (p=0.000 n=10+10)
```
----

This is part of a collection of assorted micro-optimizations:
- cockroachdb#74336
- cockroachdb#74337
- cockroachdb#74338
- cockroachdb#74339
- cockroachdb#74340
- cockroachdb#74341
- cockroachdb#74342
- cockroachdb#74343
- cockroachdb#74344
- cockroachdb#74345
- cockroachdb#74346
- cockroachdb#74347
- cockroachdb#74348

Combined, these changes have the following effect on end-to-end SQL query performance:
```
name                      old time/op    new time/op    delta
KV/Scan/SQL/rows=1-10       94.4µs ±10%    92.3µs ±11%   -2.20%  (p=0.000 n=93+93)
KV/Scan/SQL/rows=10-10       102µs ±10%      99µs ±10%   -2.16%  (p=0.000 n=94+94)
KV/Update/SQL/rows=10-10     378µs ±15%     370µs ±11%   -2.04%  (p=0.003 n=95+91)
KV/Insert/SQL/rows=1-10      133µs ±14%     132µs ±12%     ~     (p=0.738 n=95+93)
KV/Insert/SQL/rows=10-10     197µs ±14%     196µs ±13%     ~     (p=0.902 n=95+94)
KV/Update/SQL/rows=1-10      186µs ±14%     185µs ±14%     ~     (p=0.351 n=94+93)
KV/Delete/SQL/rows=1-10      132µs ±13%     132µs ±14%     ~     (p=0.473 n=94+94)
KV/Delete/SQL/rows=10-10     254µs ±16%     250µs ±16%     ~     (p=0.086 n=100+99)

name                      old alloc/op   new alloc/op   delta
KV/Scan/SQL/rows=1-10       20.1kB ± 0%    19.1kB ± 1%   -4.91%  (p=0.000 n=96+96)
KV/Scan/SQL/rows=10-10      21.7kB ± 0%    20.7kB ± 1%   -4.61%  (p=0.000 n=96+97)
KV/Delete/SQL/rows=10-10    64.0kB ± 3%    63.7kB ± 3%   -0.55%  (p=0.000 n=100+100)
KV/Update/SQL/rows=1-10     45.8kB ± 1%    45.5kB ± 1%   -0.55%  (p=0.000 n=97+98)
KV/Update/SQL/rows=10-10     105kB ± 1%     105kB ± 1%   -0.10%  (p=0.008 n=97+98)
KV/Delete/SQL/rows=1-10     40.8kB ± 0%    40.7kB ± 0%   -0.08%  (p=0.001 n=95+96)
KV/Insert/SQL/rows=1-10     37.4kB ± 1%    37.4kB ± 0%     ~     (p=0.698 n=97+96)
KV/Insert/SQL/rows=10-10    76.4kB ± 1%    76.4kB ± 0%     ~     (p=0.822 n=99+98)

name                      old allocs/op  new allocs/op  delta
KV/Scan/SQL/rows=1-10          245 ± 0%       217 ± 0%  -11.43%  (p=0.000 n=95+92)
KV/Scan/SQL/rows=10-10         280 ± 0%       252 ± 0%  -10.11%  (p=0.000 n=75+97)
KV/Delete/SQL/rows=10-10       478 ± 0%       459 ± 0%   -4.04%  (p=0.000 n=94+97)
KV/Delete/SQL/rows=1-10        297 ± 1%       287 ± 1%   -3.34%  (p=0.000 n=97+97)
KV/Update/SQL/rows=1-10        459 ± 0%       444 ± 0%   -3.27%  (p=0.000 n=97+97)
KV/Insert/SQL/rows=1-10        291 ± 0%       286 ± 0%   -1.72%  (p=0.000 n=82+86)
KV/Update/SQL/rows=10-10       763 ± 1%       750 ± 1%   -1.68%  (p=0.000 n=96+98)
KV/Insert/SQL/rows=10-10       489 ± 0%       484 ± 0%   -1.03%  (p=0.000 n=98+98)
```


85730: kvserver: also block LEARNER snaps to paused followers r=erikgrinaker a=tbg

We checked whether the snapshot recipient was paused only in the
raft log queue path. By pushing the check down into `sendSnapshot`,
it is now hit by any snapshot attempt, which includes the replicate
queue and store rebalancer. For best results, both of these should
avoid moving replicas to paused followers in the first place, which
they already do, at least partially, so this change shouldn't have
much of an impact in practice.

Fixes cockroachdb#85479.

Release note: None


85732:  kvserver: only pause followers when holding active lease r=erikgrinaker a=tbg

If the raft leader is not the leaseholder (which includes the case in
which we just transferred the lease away), leave all followers unpaused.
Otherwise, the leaseholder won't learn that the entries it submitted
were committed which effectively causes range unavailability.

Fixes cockroachdb#84884.

Release note: None


85756: builtins: add strptime/strftime builtins without experimental prefix r=rafiss a=rafiss

refs cockroachdb#52139

These are just an alias for the existing implementation.

Release note (sql change): The strptime and strftime builtin functions
were added as aliases for experimental_strptime and
experimental_strftime.

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
4 people committed Aug 10, 2022
5 parents c1d005c + 7f713fb + 04481d8 + e4ae047 + dfc6123 commit 9be7cf5
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 169 deletions.
8 changes: 8 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,14 @@ has no relationship with the commit order of concurrent transactions.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="statement_timestamp"></a><code>statement_timestamp() &rarr; <a href="timestamp.html">timestamptz</a></code></td><td><span class="funcdesc"><p>Returns the start time of the current statement.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="strftime"></a><code>strftime(input: <a href="date.html">date</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="strftime"></a><code>strftime(input: <a href="timestamp.html">timestamp</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="strftime"></a><code>strftime(input: <a href="timestamp.html">timestamptz</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="strptime"></a><code>strptime(input: <a href="string.html">string</a>, format: <a href="string.html">string</a>) &rarr; <a href="timestamp.html">timestamptz</a></code></td><td><span class="funcdesc"><p>Returns <code>input</code> as a timestamptz using <code>format</code> (which uses standard <code>strptime</code> formatting).</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="timeofday"></a><code>timeofday() &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Returns the current system time on one of the cluster nodes as a string.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="timezone"></a><code>timezone(timezone: <a href="string.html">string</a>, time: <a href="time.html">time</a>) &rarr; timetz</code></td><td><span class="funcdesc"><p>Treat given time without time zone as located in the specified time zone.</p>
Expand Down
70 changes: 52 additions & 18 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/biogo/store/llrb"
Expand All @@ -41,16 +42,34 @@ import (
// RangeCache.
type rangeCacheKey roachpb.RKey

var minCacheKey interface{} = rangeCacheKey(roachpb.RKeyMin)
var minCacheKey = newRangeCacheKey(roachpb.RKeyMin)

func (a rangeCacheKey) String() string {
return roachpb.Key(a).String()
var rangeCacheKeyPool = sync.Pool{
New: func() interface{} { return &rangeCacheKey{} },
}

// Compare implements the llrb.Comparable interface for rangeCacheKey, so that
// newRangeCacheKey allocates a new rangeCacheKey using the supplied key. The
// objects escape to the heap because they are passed through an interface{}
// when handed to an OrderedCache, so the sync.Pool avoids a heap allocation.
func newRangeCacheKey(key roachpb.RKey) *rangeCacheKey {
k := rangeCacheKeyPool.Get().(*rangeCacheKey)
*k = rangeCacheKey(key)
return k
}

func (k *rangeCacheKey) release() {
*k = rangeCacheKey{}
rangeCacheKeyPool.Put(k)
}

func (k *rangeCacheKey) String() string {
return roachpb.Key(*k).String()
}

// Compare implements the llrb.Comparable interface for *rangeCacheKey, so that
// it can be used as a key for util.OrderedCache.
func (a rangeCacheKey) Compare(b llrb.Comparable) int {
return bytes.Compare(a, b.(rangeCacheKey))
func (k *rangeCacheKey) Compare(o llrb.Comparable) int {
return bytes.Compare(*k, *o.(*rangeCacheKey))
}

// RangeDescriptorDB is a type which can query range descriptors from an
Expand Down Expand Up @@ -201,7 +220,7 @@ func (rc *RangeCache) String() string {
func (rc *RangeCache) stringLocked() string {
var buf strings.Builder
rc.rangeCache.cache.Do(func(k, v interface{}) bool {
fmt.Fprintf(&buf, "key=%s desc=%+v\n", roachpb.Key(k.(rangeCacheKey)), v)
fmt.Fprintf(&buf, "key=%s desc=%+v\n", roachpb.Key(*k.(*rangeCacheKey)), v)
return false
})
return buf.String()
Expand Down Expand Up @@ -566,6 +585,8 @@ func (rc *RangeCache) GetCachedOverlapping(ctx context.Context, span roachpb.RSp
func (rc *RangeCache) getCachedOverlappingRLocked(
ctx context.Context, span roachpb.RSpan,
) []*cache.Entry {
from := newRangeCacheKey(span.EndKey)
defer from.release()
var res []*cache.Entry
rc.rangeCache.cache.DoRangeReverseEntry(func(e *cache.Entry) (exit bool) {
desc := rc.getValue(e).Desc()
Expand All @@ -579,7 +600,7 @@ func (rc *RangeCache) getCachedOverlappingRLocked(
}
res = append(res, e)
return false // continue iterating
}, rangeCacheKey(span.EndKey), minCacheKey)
}, from, minCacheKey)
// Invert the results so the get sorted ascendingly.
for i, j := 0, len(res)-1; i < j; i, j = i+1, j-1 {
res[i], res[j] = res[j], res[i]
Expand Down Expand Up @@ -847,7 +868,7 @@ func (rc *RangeCache) EvictByKey(ctx context.Context, descKey roachpb.RKey) bool
return false
}
log.VEventf(ctx, 2, "evict cached descriptor: %s", cachedDesc)
rc.rangeCache.cache.DelEntry(entry)
rc.delEntryLocked(entry)
return true
}

Expand All @@ -868,7 +889,7 @@ func (rc *RangeCache) evictDescLocked(ctx context.Context, desc *roachpb.RangeDe
// equal because the desc that the caller supplied also came from the cache
// and the cache is not expected to go backwards). Evict it.
log.VEventf(ctx, 2, "evict cached descriptor: desc=%s", cachedEntry)
rc.rangeCache.cache.DelEntry(rawEntry)
rc.delEntryLocked(rawEntry)
return true
}

Expand Down Expand Up @@ -897,14 +918,18 @@ func (rc *RangeCache) getCachedRLocked(
// key, in the direction indicated by inverted.
var rawEntry *cache.Entry
if !inverted {
k := newRangeCacheKey(key)
defer k.release()
var ok bool
rawEntry, ok = rc.rangeCache.cache.FloorEntry(rangeCacheKey(key))
rawEntry, ok = rc.rangeCache.cache.FloorEntry(k)
if !ok {
return nil, nil
}
} else {
from := newRangeCacheKey(key)
defer from.release()
rc.rangeCache.cache.DoRangeReverseEntry(func(e *cache.Entry) bool {
startKey := roachpb.RKey(e.Key.(rangeCacheKey))
startKey := roachpb.RKey(*e.Key.(*rangeCacheKey))
if key.Equal(startKey) {
// DoRangeReverseEntry is inclusive on the higher key. We're iterating
// backwards and we got a range that starts at key. We're not interested
Expand All @@ -914,7 +939,7 @@ func (rc *RangeCache) getCachedRLocked(
}
rawEntry = e
return true
}, rangeCacheKey(key), minCacheKey)
}, from, minCacheKey)
// DoRangeReverseEntry is exclusive on the "to" part, so we need to check
// manually if there's an entry for RKeyMin.
if rawEntry == nil {
Expand Down Expand Up @@ -998,11 +1023,10 @@ func (rc *RangeCache) insertLockedInner(ctx context.Context, rs []*CacheEntry) [
entries[i] = newerEntry
continue
}
rangeKey := ent.Desc().StartKey
if log.V(2) {
log.Infof(ctx, "adding cache entry: value=%s", ent)
}
rc.rangeCache.cache.Add(rangeCacheKey(rangeKey), ent)
rc.addEntryLocked(ent)
entries[i] = ent
}
return entries
Expand Down Expand Up @@ -1043,7 +1067,7 @@ func (rc *RangeCache) clearOlderOverlappingLocked(
if log.V(2) {
log.Infof(ctx, "clearing overlapping descriptor: key=%s entry=%s", e.Key, rc.getValue(e))
}
rc.rangeCache.cache.DelEntry(e)
rc.delEntryLocked(e)
} else {
newest = false
if descsCompatible(entry.Desc(), newEntry.Desc()) {
Expand Down Expand Up @@ -1074,13 +1098,23 @@ func (rc *RangeCache) swapEntryLocked(
}
}

rc.rangeCache.cache.DelEntry(oldEntry)
rc.delEntryLocked(oldEntry)
if newEntry != nil {
log.VEventf(ctx, 2, "caching new entry: %s", newEntry)
rc.rangeCache.cache.Add(oldEntry.Key, newEntry)
rc.addEntryLocked(newEntry)
}
}

func (rc *RangeCache) addEntryLocked(entry *CacheEntry) {
key := newRangeCacheKey(entry.Desc().StartKey)
rc.rangeCache.cache.Add(key, entry)
}

func (rc *RangeCache) delEntryLocked(entry *cache.Entry) {
rc.rangeCache.cache.DelEntry(entry)
entry.Key.(*rangeCacheKey).release()
}

// DB returns the descriptor database, for tests.
func (rc *RangeCache) DB() RangeDescriptorDB {
return rc.db
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: *defDesc})
cache.addEntryLocked(&CacheEntry{desc: *defDesc})

// Now, add a new, overlapping set of descriptors.
minToBDesc := &roachpb.RangeDescriptor{
Expand All @@ -1026,13 +1026,13 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
}
curGeneration := roachpb.RangeGeneration(1)
require.True(t, clearOlderOverlapping(ctx, cache, minToBDesc))
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKey("b"))), &CacheEntry{desc: *minToBDesc})
cache.addEntryLocked(&CacheEntry{desc: *minToBDesc})
if desc := cache.GetCached(ctx, roachpb.RKey("b"), false); desc != nil {
t.Errorf("descriptor unexpectedly non-nil: %s", desc)
}

require.True(t, clearOlderOverlapping(ctx, cache, bToMaxDesc))
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: *bToMaxDesc})
cache.addEntryLocked(&CacheEntry{desc: *bToMaxDesc})
ri := cache.GetCached(ctx, roachpb.RKey("b"), false)
require.Equal(t, bToMaxDesc, ri.Desc())

Expand All @@ -1041,7 +1041,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
curGeneration++
defDescCpy.Generation = curGeneration
require.True(t, clearOlderOverlapping(ctx, cache, &defDescCpy))
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: defDescCpy})
cache.addEntryLocked(&CacheEntry{desc: defDescCpy})
for _, key := range []roachpb.RKey{roachpb.RKey("a"), roachpb.RKey("b")} {
ri = cache.GetCached(ctx, key, false)
require.Equal(t, &defDescCpy, ri.Desc())
Expand All @@ -1055,7 +1055,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
Generation: curGeneration,
}
require.True(t, clearOlderOverlapping(ctx, cache, bToCDesc))
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKey("c"))), &CacheEntry{desc: *bToCDesc})
cache.addEntryLocked(&CacheEntry{desc: *bToCDesc})
ri = cache.GetCached(ctx, roachpb.RKey("c"), true)
require.Equal(t, bToCDesc, ri.Desc())

Expand All @@ -1066,7 +1066,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
Generation: curGeneration,
}
require.True(t, clearOlderOverlapping(ctx, cache, aToBDesc))
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKey("b"))), ri)
cache.addEntryLocked(ri)
ri = cache.GetCached(ctx, roachpb.RKey("c"), true)
require.Equal(t, bToCDesc, ri.Desc())
}
Expand Down
12 changes: 0 additions & 12 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,6 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
return false, nil
}
}
repl.mu.RLock()
_, destPaused := repl.mu.pausedFollowers[id]
repl.mu.RUnlock()
if ioThresh := repl.store.ioOverloadedStores.Load()[repDesc.StoreID]; ioThresh != nil && destPaused {
// If the destination is paused, be more hesitant to send snapshots. The destination being
// paused implies that we have recently checked that it's not required for quorum, and that
// we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on
// the snapshot as well.
err := errors.Errorf("skipping snapshot; %s is overloaded: %s", repDesc, ioThresh)
repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err)
return false, err
}

err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY)

Expand Down
21 changes: 15 additions & 6 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2592,10 +2592,23 @@ func (r *Replica) sendSnapshot(
r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr)
}()

sender, err := r.GetReplicaDescriptor()
r.mu.RLock()
sender, err := r.getReplicaDescriptorRLocked()
_, destPaused := r.mu.pausedFollowers[recipient.ReplicaID]
r.mu.RUnlock()

if err != nil {
return err
}

if ioThresh := r.store.ioOverloadedStores.Load()[recipient.StoreID]; ioThresh != nil && destPaused {
// If the destination is paused, be more hesitant to send snapshots. The destination being
// paused implies that we have recently checked that it's not required for quorum, and that
// we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on
// the snapshot as well.
return errors.Errorf("skipping snapshot; %s is overloaded: %s", recipient, ioThresh)
}

// Check follower snapshots cluster setting.
if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) {
sender, err = r.getSenderReplica(ctx)
Expand All @@ -2607,10 +2620,6 @@ func (r *Replica) sendSnapshot(
log.VEventf(
ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender,
)
desc, err := r.GetReplicaDescriptor()
if err != nil {
return err
}
status := r.RaftStatus()
if status == nil {
// This code path is sometimes hit during scatter for replicas that
Expand All @@ -2621,7 +2630,7 @@ func (r *Replica) sendSnapshot(
// Create new delegate snapshot request with only required metadata.
delegateRequest := &kvserverpb.DelegateSnapshotRequest{
RangeID: r.RangeID,
CoordinatorReplica: desc,
CoordinatorReplica: sender,
RecipientReplica: recipient,
Priority: priority,
Type: snapType,
Expand Down
48 changes: 1 addition & 47 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,53 +1191,7 @@ func (r *Replica) tick(
return false, nil
}

if r.replicaID == r.mu.leaderID && len(ioOverloadMap) > 0 && quotaPoolEnabledForRange(*r.descRLocked()) {
// When multiple followers are overloaded, we may not be able to exclude all
// of them from replication traffic due to quorum constraints. We would like
// a given Range to deterministically exclude the same store (chosen
// randomly), so that across multiple Ranges we have a chance of removing
// load from all overloaded Stores in the cluster. (It would be a bad idea
// to roll a per-Range dice here on every tick, since that would rapidly
// include and exclude individual followers from replication traffic, which
// would be akin to a high rate of packet loss. Once we've decided to ignore
// a follower, this decision should be somewhat stable for at least a few
// seconds).
//
// Note that we don't enable this mechanism for the liveness range (see
// quotaPoolEnabledForRange), simply to play it safe, as we know that the
// liveness range is unlikely to be a major contributor to any follower's
// I/O and wish to reduce the likelihood of a problem in replication pausing
// contributing to an outage of that critical range.
seed := int64(r.RangeID)
now := r.store.Clock().Now().GoTime()
d := computeExpendableOverloadedFollowersInput{
replDescs: r.descRLocked().Replicas(),
ioOverloadMap: ioOverloadMap,
getProgressMap: func(_ context.Context) map[uint64]tracker.Progress {
prs := r.mu.internalRaftGroup.Status().Progress
updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool {
return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration())
})
return prs
},
minLiveMatchIndex: r.mu.proposalQuotaBaseIndex,
seed: seed,
}
r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d)
for replicaID := range r.mu.pausedFollowers {
// We're dropping messages to those followers (see handleRaftReady) but
// it's a good idea to tell raft not to even bother sending in the first
// place. Raft will react to this by moving the follower to probing state
// where it will be contacted only sporadically until it responds to an
// MsgApp (which it can only do once we stop dropping messages). Something
// similar would result naturally if we didn't report as unreachable, but
// with more wasted work.
r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID))
}
} else if len(r.mu.pausedFollowers) > 0 {
// No store in the cluster is overloaded, or this replica is not raft leader.
r.mu.pausedFollowers = nil
}
r.updatePausedFollowersLocked(ctx, ioOverloadMap)

now := r.store.Clock().NowAsClockTimestamp()
if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) {
Expand Down
Loading

0 comments on commit 9be7cf5

Please sign in to comment.