Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
135945: kvserver: add setting to reject overly large transactions r=arulajmani a=andrewbaptist

Previously we had an ability to reject transactions that required too much memory, however this was tied together with the condense limit. The limits have too different purposes and should be set independently. The condense size limit (kv.transaction.max_intents_bytes) is used to protect the memory on the client side by using less precise tracking once it passes a certain size. The new limit `kv.transaction.max_intents_and_locks` is intended to protect the server side by preventing a client from creating a transaction with too many operations in it. Large transactions with many intents can have a negative impact especially in a multi-tenant system.

Epic: none

Fixes: cockroachdb#135841

Release note (ops change): Adds a new configurable parameter kv.transaction.max_intents_and_locks which will prevent transactions from creating too many intents.

Co-authored-by: Andrew Baptist <[email protected]>
  • Loading branch information
craig[bot] and andrewbaptist committed Dec 18, 2024
2 parents 3d95da9 + 7ef9f24 commit f61084f
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 15 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,8 @@
<tr><td>APPLICATION</td><td>txn.condensed_intent_spans</td><td>KV transactions that have exceeded their intent tracking memory budget (kv.transaction.max_intents_bytes). See also txn.condensed_intent_spans_gauge for a gauge of such transactions currently running.</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.condensed_intent_spans_gauge</td><td>KV transactions currently running that have exceeded their intent tracking memory budget (kv.transaction.max_intents_bytes). See also txn.condensed_intent_spans for a perpetual counter/rate.</td><td>KV Transactions</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>txn.condensed_intent_spans_rejected</td><td>KV transactions that have been aborted because they exceeded their intent tracking memory budget (kv.transaction.max_intents_bytes). Rejection is caused by kv.transaction.reject_over_max_intents_budget.</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.count_limit_on_response</td><td>KV transactions that have exceeded the count limit on a response</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.count_limit_rejected</td><td>KV transactions that have been aborted because they exceeded the max number of writes and locking reads allowed</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.durations</td><td>KV transaction durations</td><td>KV Txn Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>txn.inflight_locks_over_tracking_budget</td><td>KV transactions whose in-flight writes and locking reads have exceeded the intent tracking memory budget (kv.transaction.max_intents_bytes).</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.parallelcommits</td><td>Number of KV transaction parallel commits</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited application
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval system-visible
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled system-visible
kv.transaction.max_intents_and_locks integer 0 maximum count of inserts or durable locks for a single transactions, 0 to disable application
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions application
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
<tr><td><div id="setting-kv-replication-reports-interval" class="anchored"><code>kv.replication_reports.interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-snapshot-rebalance-max-rate" class="anchored"><code>kv.snapshot_rebalance.max_rate</code></div></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-snapshot-receiver-excise-enabled" class="anchored"><code>kv.snapshot_receiver.excise.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to false to disable excises in place of range deletions for KV snapshots</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-intents-and-locks" class="anchored"><code>kv.transaction.max_intents_and_locks</code></div></td><td>integer</td><td><code>0</code></td><td>maximum count of inserts or durable locks for a single transactions, 0 to disable</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-intents-bytes" class="anchored"><code>kv.transaction.max_intents_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-refresh-spans-bytes" class="anchored"><code>kv.transaction.max_refresh_spans_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-randomized-anchor-key-enabled" class="anchored"><code>kv.transaction.randomized_anchor_key.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>dictates whether a transactions anchor key is randomized or not</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
79 changes: 68 additions & 11 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ var rejectTxnOverTrackedWritesBudget = settings.RegisterBoolSetting(
false,
settings.WithPublic)

// rejectTxnMaxCount will reject transactions if the number of inserts or locks
// exceeds this value. It is preferable to use this setting instead of
// kv.transaction.reject_over_max_intents_budget.enabled.
var rejectTxnMaxCount = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.transaction.max_intents_and_locks",
"maximum count of inserts or durable locks for a single transactions, 0 to disable",
0,
settings.WithPublic)

// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
// asynchronously proposed through Raft and ensures that all interfering
Expand Down Expand Up @@ -252,6 +262,11 @@ type txnPipeliner struct {
// contains all keys spans that the transaction will need to eventually
// clean up upon its completion.
lockFootprint condensableSpanSet

// writeCount counts the number of replicated lock acquisitions and intents
// written by this txnPipeliner. This includes both in-flight and successful
// operations.
writeCount int64
}

// condensableSpanSetRangeIterator describes the interface of RangeIterator
Expand Down Expand Up @@ -297,18 +312,17 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}

// If we're configured to reject txns over budget, we pre-emptively check
// If we're configured to reject txns over budget, we preemptively check
// whether this current batch is likely to push us over the edge and, if it
// does, we reject it. Note that this check is not precise because generally
// we can't know exactly the size of the locks that will be taken by a
// request (think ResumeSpan); even if the check passes, we might end up over
// budget.
rejectOverBudget := rejectTxnOverTrackedWritesBudget.Get(&tp.st.SV)
maxBytes := TrackedWritesMaxSize.Get(&tp.st.SV)
if rejectOverBudget {
if err := tp.maybeRejectOverBudget(ba, maxBytes); err != nil {
return nil, kvpb.NewError(err)
}
rejectTxnMaxCount := rejectTxnMaxCount.Get(&tp.st.SV)
if err := tp.maybeRejectOverBudget(ba, maxBytes, rejectOverBudget, rejectTxnMaxCount); err != nil {
return nil, kvpb.NewError(err)
}

ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)
Expand All @@ -330,7 +344,7 @@ func (tp *txnPipeliner) SendLocked(
// budget. Further requests will be rejected if they attempt to take more
// locks.
if err := tp.updateLockTracking(
ctx, ba, br, pErr, maxBytes, !rejectOverBudget, /* condenseLocksIfOverBudget */
ctx, ba, br, pErr, maxBytes, !rejectOverBudget /* condenseLocksIfOverBudget */, rejectTxnMaxCount,
); err != nil {
return nil, kvpb.NewError(err)
}
Expand All @@ -355,7 +369,9 @@ func (tp *txnPipeliner) SendLocked(
// the transaction commits. If it fails, then we'd add the lock spans to our
// tracking and exceed the budget. It's easier for this code and more
// predictable for the user if we just reject this batch, though.
func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes int64) error {
func (tp *txnPipeliner) maybeRejectOverBudget(
ba *kvpb.BatchRequest, maxBytes int64, rejectIfWouldCondense bool, rejectTxnMaxCount int64,
) error {
// Bail early if the current request is not locking, even if we are already
// over budget. In particular, we definitely want to permit rollbacks. We also
// want to permit lone commits, since the damage in taking too much memory has
Expand All @@ -364,9 +380,20 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
return nil
}

// NB: The reqEstimate is a count the number of spans in this request with
// replicated durability. This is an estimate since accurate accounting
// requires the response as well. For point requests this will be accurate,
// but for scans, we will count 1 for every span. In reality for scans, it
// could be 0 or many replicated locks. When we receive the response we will
// get the actual counts in `updateLockTracking` and update
// `txnPipeliner.writeCount`.
var reqEstimate int64
var spans []roachpb.Span
if err := ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, _ lock.Durability) {
if err := ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, durability lock.Durability) {
spans = append(spans, sp)
if durability == lock.Replicated {
reqEstimate++
}
}); err != nil {
return errors.Wrap(err, "iterating lock spans")
}
Expand All @@ -377,11 +404,23 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
locksBudget := maxBytes - tp.ifWrites.byteSize()

estimate := tp.lockFootprint.estimateSize(spans, locksBudget)
if estimate > locksBudget {
if rejectIfWouldCondense && estimate > locksBudget {
tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
bErr := newLockSpansOverBudgetError(estimate+tp.ifWrites.byteSize(), maxBytes, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}

// This counts from three different sources. The inflight writes are
// included in the tp.writeCount.
estimateCount := tp.writeCount + reqEstimate
// TODO(baptist): We use the same error message as the one above, to avoid
// adding additional encoding and decoding for a backport. We could consider
// splitting this error message in the future.
if rejectTxnMaxCount > 0 && estimateCount > rejectTxnMaxCount {
tp.txnMetrics.TxnsRejectedByCountLimit.Inc(1)
bErr := newLockSpansOverBudgetError(estimateCount, rejectTxnMaxCount, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}
return nil
}

Expand Down Expand Up @@ -673,6 +712,7 @@ func (tp *txnPipeliner) updateLockTracking(
pErr *kvpb.Error,
maxBytes int64,
condenseLocksIfOverBudget bool,
rejectTxnMaxCount int64,
) error {
if err := tp.updateLockTrackingInner(ctx, ba, br, pErr); err != nil {
return err
Expand All @@ -691,6 +731,17 @@ func (tp *txnPipeliner) updateLockTracking(
}
tp.txnMetrics.TxnsInFlightLocksOverTrackingBudget.Inc(1)
}
// Similar to the in-flight writes case above, we may have gone over the
// rejectTxnMaxCount threshold because we don't accurately estimate the
// number of ranged locking reads before sending the request.
if tp.writeCount > rejectTxnMaxCount {
if tp.inflightOverBudgetEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx, "a transaction has exceeded the maximum number of writes "+
"allowed by kv.transaction.max_intents_and_locks: "+
"count: %d, txn: %s, ba: %s", tp.writeCount, ba.Txn, ba.Summary())
}
tp.txnMetrics.TxnsResponseOverCountLimit.Inc(1)
}

// Deal with compacting the lock spans.

Expand Down Expand Up @@ -814,7 +865,7 @@ func (tp *txnPipeliner) updateLockTrackingInner(
if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
str, _ = readOnlyReq.KeyLocking()
}
trackLocks := func(span roachpb.Span, _ lock.Durability) {
trackLocks := func(span roachpb.Span, durability lock.Durability) {
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
Expand All @@ -827,6 +878,9 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// then add them directly to our lock footprint.
tp.lockFootprint.insert(span)
}
if durability == lock.Replicated {
tp.writeCount++
}
}
if err := kvpb.LockSpanIterate(req, resp, trackLocks); err != nil {
return errors.Wrap(err, "iterating lock spans")
Expand All @@ -836,8 +890,11 @@ func (tp *txnPipeliner) updateLockTrackingInner(
return nil
}

func (tp *txnPipeliner) trackLocks(s roachpb.Span, _ lock.Durability) {
func (tp *txnPipeliner) trackLocks(s roachpb.Span, durability lock.Durability) {
tp.lockFootprint.insert(s)
if durability == lock.Replicated {
tp.writeCount++
}
}

// stripQueryIntents adjusts the BatchResponse to hide the fact that this
Expand Down
89 changes: 85 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2524,11 +2524,13 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
// request is expected to be rejected.
expRejectIdx int
maxSize int64
maxCount int64
}{
{name: "large request",
reqs: []*kvpb.BatchRequest{largeWrite},
expRejectIdx: 0,
maxSize: int64(len(largeAs)) - 1 + roachpb.SpanOverhead,
maxCount: 0,
},
{name: "requests that add up",
reqs: []*kvpb.BatchRequest{
Expand All @@ -2538,7 +2540,17 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
expRejectIdx: 2,
// maxSize is such that first two requests fit and the third one
// goes above the limit.
maxSize: 9 + 2*roachpb.SpanOverhead,
maxSize: 9 + 2*roachpb.SpanOverhead,
maxCount: 0,
},
{name: "requests that count up",
reqs: []*kvpb.BatchRequest{
putBatchNoAsyncConsensus(roachpb.Key("aaaa"), nil),
putBatchNoAsyncConsensus(roachpb.Key("bbbb"), nil),
putBatchNoAsyncConsensus(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
maxSize: 0,
maxCount: 2,
},
{name: "async requests that add up",
// Like the previous test, but this time the requests run with async
Expand All @@ -2550,6 +2562,19 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
putBatch(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{name: "async requests that count up",
// Like the previous test, but this time the requests run with async
// consensus. Being tracked as in-flight writes, this test shows that
// in-flight writes count towards the budget.
reqs: []*kvpb.BatchRequest{
putBatch(roachpb.Key("aaaa"), nil),
putBatch(roachpb.Key("bbbb"), nil),
putBatch(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
maxSize: 0,
maxCount: 2,
},
{
name: "scan response goes over budget, next request rejected",
Expand All @@ -2559,6 +2584,17 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: 1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "scan response goes over count, next request rejected",
// A request returns a response with many locked keys. Then the next
// request will be rejected.
reqs: []*kvpb.BatchRequest{lockingScanRequest, putBatch(roachpb.Key("a"), nil)},
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: 1,
maxSize: 0,
maxCount: 1,
},
{
name: "scan response goes over budget",
Expand All @@ -2569,6 +2605,18 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: -1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "scan response goes over count",
// Like the previous test, except here we don't have a followup request
// once we're above budget. The test runner will commit the txn, and this
// test checks that committing is allowed.
reqs: []*kvpb.BatchRequest{lockingScanRequest},
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: -1,
maxSize: 0,
maxCount: 1,
},
{
name: "del range response goes over budget, next request rejected",
Expand All @@ -2578,6 +2626,17 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: 1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "del range response goes over count, next request rejected",
// A request returns a response with a large set of locked keys, which
// takes up the budget. Then the next request will be rejected.
reqs: []*kvpb.BatchRequest{delRange, putBatch(roachpb.Key("a"), nil)},
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: 1,
maxSize: 0,
maxCount: 1,
},
{
name: "del range response goes over budget",
Expand All @@ -2588,6 +2647,18 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: -1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "del range response goes over count",
// Like the previous test, except here we don't have a followup request
// once we're above budget. The test runner will commit the txn, and this
// test checks that committing is allowed.
reqs: []*kvpb.BatchRequest{delRange},
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: -1,
maxSize: 0,
maxCount: 1,
},
}
for _, tc := range testCases {
Expand All @@ -2597,8 +2668,13 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
}

tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
if tc.maxCount > 0 {
rejectTxnMaxCount.Override(ctx, &tp.st.SV, tc.maxCount)
}
if tc.maxSize > 0 {
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
}

txn := makeTxnProto()

Expand Down Expand Up @@ -2637,7 +2713,12 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
}
require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
if tc.maxSize > 0 {
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
}
if tc.maxCount > 0 {
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByCountLimit.Count())
}

// Make sure rolling back the txn works.
rollback := &kvpb.BatchRequest{}
Expand Down
Loading

0 comments on commit f61084f

Please sign in to comment.