Skip to content

Commit

Permalink
Merge #85286
Browse files Browse the repository at this point in the history
85286: kvcoord: subject refresh spans to the limit when importing from leaves r=yuzefovich a=yuzefovich

Whenever we're importing the refresh spans from the LeafTxnFinalState,
we're inserting those refresh spans into the refresh footprint set and
attempt to condense the set to stay under the maximum (determined by
a setting). However, the condensing is performed on a best-effort basis,
so previously it was possible to significantly over-shoot the limit.
Furthermore, this refresh footprint is not accounted for. This commit
improves things a bit by giving up on the ability to refresh (i.e.
invalidating the refresh footprint) once the limit is exceeded when
importing the refresh spans from the leaves - it is similar to what we
do after `Send`ing the BatchRequest.

Addresses: #64906.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 3, 2022
2 parents 7326bbb + 4e8b0bc commit 47d34c9
Showing 1 changed file with 34 additions and 29 deletions.
63 changes: 34 additions & 29 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,39 +171,43 @@ func (sr *txnSpanRefresher) SendLocked(
return nil, roachpb.NewError(err)
}
// Check whether we should condense the refresh spans.
maxBytes := MaxTxnRefreshSpansBytes.Get(&sr.st.SV)
if sr.refreshFootprint.bytes >= maxBytes {
condensedBefore := sr.refreshFootprint.condensed
condensedSufficient := sr.tryCondenseRefreshSpans(ctx, maxBytes)
if condensedSufficient {
log.VEventf(ctx, 2, "condensed refresh spans for txn %s to %d bytes",
br.Txn, sr.refreshFootprint.bytes)
} else {
// Condensing was not enough. Giving up on tracking reads. Refreshed
// will not be possible.
log.VEventf(ctx, 2, "condensed refresh spans didn't save enough memory. txn %s. "+
"refresh spans after condense: %d bytes",
br.Txn, sr.refreshFootprint.bytes)
sr.refreshInvalid = true
sr.refreshFootprint.clear()
}

if sr.refreshFootprint.condensed && !condensedBefore {
sr.refreshMemoryLimitExceeded.Inc(1)
}
}
sr.maybeCondenseRefreshSpans(ctx, br.Txn)
}
return br, nil
}

// tryCondenseRefreshSpans attempts to condense the refresh spans in order to
// save memory. Returns true if we managed to condense them below maxBytes.
func (sr *txnSpanRefresher) tryCondenseRefreshSpans(ctx context.Context, maxBytes int64) bool {
if sr.knobs.CondenseRefreshSpansFilter != nil && !sr.knobs.CondenseRefreshSpansFilter() {
return false
// maybeCondenseRefreshSpans checks whether the refresh footprint exceeds the
// maximum size (as determined by the MaxTxnRefreshSpansBytes cluster setting)
// and attempts to condense it if so. If condensing is insufficient, then the
// refresh is invalidated. It is assumed that the refresh is valid when this
// method is called.
func (sr *txnSpanRefresher) maybeCondenseRefreshSpans(
ctx context.Context, txn *roachpb.Transaction,
) {
maxBytes := MaxTxnRefreshSpansBytes.Get(&sr.st.SV)
if sr.refreshFootprint.bytes >= maxBytes {
condensedBefore := sr.refreshFootprint.condensed
var condensedSufficient bool
if sr.knobs.CondenseRefreshSpansFilter == nil || sr.knobs.CondenseRefreshSpansFilter() {
sr.refreshFootprint.maybeCondense(ctx, sr.riGen, maxBytes)
condensedSufficient = sr.refreshFootprint.bytes < maxBytes
}
if condensedSufficient {
log.VEventf(ctx, 2, "condensed refresh spans for txn %s to %d bytes",
txn, sr.refreshFootprint.bytes)
} else {
// Condensing was not enough. Giving up on tracking reads. Refreshes
// will not be possible.
log.VEventf(ctx, 2, "condensed refresh spans didn't save enough memory. txn %s. "+
"refresh spans after condense: %d bytes",
txn, sr.refreshFootprint.bytes)
sr.refreshInvalid = true
sr.refreshFootprint.clear()
}
if sr.refreshFootprint.condensed && !condensedBefore {
sr.refreshMemoryLimitExceeded.Inc(1)
}
}
sr.refreshFootprint.maybeCondense(ctx, sr.riGen, maxBytes)
return sr.refreshFootprint.bytes < maxBytes
}

// sendLockedWithRefreshAttempts sends the batch through the wrapped sender. It
Expand Down Expand Up @@ -679,7 +683,8 @@ func (sr *txnSpanRefresher) importLeafFinalState(
sr.refreshFootprint.clear()
} else if !sr.refreshInvalid {
sr.refreshFootprint.insert(tfs.RefreshSpans...)
sr.refreshFootprint.maybeCondense(ctx, sr.riGen, MaxTxnRefreshSpansBytes.Get(&sr.st.SV))
// Check whether we should condense the refresh spans.
sr.maybeCondenseRefreshSpans(ctx, &tfs.Txn)
}
}

Expand Down

0 comments on commit 47d34c9

Please sign in to comment.