Skip to content

Commit

Permalink
kvserver: add setting to reject overly large transactions
Browse files Browse the repository at this point in the history
Previously we had an ability to reject transactions that required too
much memory, however this was tied together with the condense limit. The
limits have too different purposes and should be set independently. The
condense size limit (kv.transaction.max_intents_bytes) is used to
protect the memory on the client side by using less precise tracking
once it passes a certain size. The new limit
(kv.transaction.reject_intents_bytes) is intended to protect the server
side by preventing a client from creating a transaction with too many
bytes in it. Large transactions with many intents can have a
negative impact especially in a multi-tenant system.

Epic: none

Fixes: #135841

Release note (ops change): Adds a new configurable parameter
kv.transaction.reject_intents_bytes which will prevent
transactions from creating too many intents.
  • Loading branch information
andrewbaptist committed Dec 9, 2024
1 parent 2cf102d commit 3279071
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 77 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1893,6 +1893,7 @@
<tr><td>APPLICATION</td><td>txn.restarts.writetooold</td><td>Number of restarts due to a concurrent writer committing first</td><td>Restarted Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.rollbacks.async.failed</td><td>Number of KV transaction that failed to send abort asynchronously which is not always retried</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.rollbacks.failed</td><td>Number of KV transaction that failed to send final abort</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.size_limit_rejected</td><td>KV transactions that have been aborted because they exceeded the size limit</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>SERVER</td><td>build.timestamp</td><td>Build information</td><td>Build Time</td><td>GAUGE</td><td>TIMESTAMP_SEC</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>go.scheduler_latency</td><td>Go scheduling latency</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>log.buffered.messages.dropped</td><td>Count of log messages that are dropped by buffered log sinks. When CRDB attempts to buffer a log message in a buffered log sink whose buffer is already full, it drops the oldest buffered messages to make space for the new message</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited application
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval system-visible
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled system-visible
kv.transaction.max_intent_and_lock_bytes integer 0 maximum number of bytes for a single transactions, 0 to disable application
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions application
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
<tr><td><div id="setting-kv-replication-reports-interval" class="anchored"><code>kv.replication_reports.interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-snapshot-rebalance-max-rate" class="anchored"><code>kv.snapshot_rebalance.max_rate</code></div></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-snapshot-receiver-excise-enabled" class="anchored"><code>kv.snapshot_receiver.excise.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to false to disable excises in place of range deletions for KV snapshots</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-intent-and-lock-bytes" class="anchored"><code>kv.transaction.max_intent_and_lock_bytes</code></div></td><td>integer</td><td><code>0</code></td><td>maximum number of bytes for a single transactions, 0 to disable</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-intents-bytes" class="anchored"><code>kv.transaction.max_intents_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-refresh-spans-bytes" class="anchored"><code>kv.transaction.max_refresh_spans_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-randomized-anchor-key-enabled" class="anchored"><code>kv.transaction.randomized_anchor_key.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>dictates whether a transactions anchor key is randomized or not</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
59 changes: 43 additions & 16 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ var rejectTxnOverTrackedWritesBudget = settings.RegisterBoolSetting(
false,
settings.WithPublic)

// rejectTxnMaxBytes will reject transactions if the sum of intent bytes and
// locks exceeds this value. This is based on the uncondensed size of the
// intents. Typically it is preferable to use this setting instead of
// kv.transaction.reject_over_max_intents_budget.enabled because it allows
// separating the threshold where we condense intents to preserve memory from
// the limit where we reject overly large transactions.
var rejectTxnMaxBytes = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.transaction.max_intent_and_lock_bytes",
"maximum number of bytes for a single transactions, 0 to disable",
0,
settings.WithPublic)

// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
// asynchronously proposed through Raft and ensures that all interfering
Expand Down Expand Up @@ -297,18 +310,17 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}

// If we're configured to reject txns over budget, we pre-emptively check
// If we're configured to reject txns over budget, we preemptively check
// whether this current batch is likely to push us over the edge and, if it
// does, we reject it. Note that this check is not precise because generally
// we can't know exactly the size of the locks that will be taken by a
// request (think ResumeSpan); even if the check passes, we might end up over
// budget.
rejectOverBudget := rejectTxnOverTrackedWritesBudget.Get(&tp.st.SV)
maxBytes := TrackedWritesMaxSize.Get(&tp.st.SV)
if rejectOverBudget {
if err := tp.maybeRejectOverBudget(ba, maxBytes); err != nil {
return nil, kvpb.NewError(err)
}
condenseThreshold := TrackedWritesMaxSize.Get(&tp.st.SV)
rejectTxnMaxBytes := rejectTxnMaxBytes.Get(&tp.st.SV)
if err := tp.maybeRejectOverBudget(ba, condenseThreshold, rejectOverBudget, rejectTxnMaxBytes); err != nil {
return nil, kvpb.NewError(err)
}

ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)
Expand All @@ -330,7 +342,7 @@ func (tp *txnPipeliner) SendLocked(
// budget. Further requests will be rejected if they attempt to take more
// locks.
if err := tp.updateLockTracking(
ctx, ba, br, pErr, maxBytes, !rejectOverBudget, /* condenseLocksIfOverBudget */
ctx, ba, br, pErr, condenseThreshold, !rejectOverBudget, /* condenseLocksIfOverBudget */
); err != nil {
return nil, kvpb.NewError(err)
}
Expand All @@ -355,7 +367,12 @@ func (tp *txnPipeliner) SendLocked(
// the transaction commits. If it fails, then we'd add the lock spans to our
// tracking and exceed the budget. It's easier for this code and more
// predictable for the user if we just reject this batch, though.
func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes int64) error {
func (tp *txnPipeliner) maybeRejectOverBudget(
ba *kvpb.BatchRequest,
condenseThreshold int64,
rejectIfWouldCondense bool,
rejectTxnMaxBytes int64,
) error {
// Bail early if the current request is not locking, even if we are already
// over budget. In particular, we definitely want to permit rollbacks. We also
// want to permit lone commits, since the damage in taking too much memory has
Expand All @@ -374,12 +391,22 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
// Compute how many bytes we can allocate for locks. We account for the
// inflight-writes conservatively, since these might turn into lock spans
// later.
locksBudget := maxBytes - tp.ifWrites.byteSize()

estimate := tp.lockFootprint.estimateSize(spans, locksBudget)
if estimate > locksBudget {
ifLockBytes := tp.ifWrites.byteSize()
estimate := tp.lockFootprint.estimateSize(spans, condenseThreshold) + ifLockBytes
if rejectIfWouldCondense && estimate > condenseThreshold {
tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
bErr := newLockSpansOverBudgetError(estimate+tp.ifWrites.byteSize(), maxBytes, ba)
bErr := newLockSpansOverBudgetError(estimate, condenseThreshold, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}

// NB: We use the same error message as the one above, to avoid adding
// additional encoding and decoding for a backport. We could consider
// splitting this error message in the future. Also we need to add the
// totalUncondensedBytes because we want to make sure we are accounting for
// the full intent size on the server.
if rejectTxnMaxBytes > 0 && estimate+tp.lockFootprint.totalUncondensedBytes > rejectTxnMaxBytes {
tp.txnMetrics.TxnsRejectedByLockSizeLimit.Inc(1)
bErr := newLockSpansOverBudgetError(estimate+tp.lockFootprint.totalUncondensedBytes, rejectTxnMaxBytes, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}
return nil
Expand Down Expand Up @@ -671,7 +698,7 @@ func (tp *txnPipeliner) updateLockTracking(
ba *kvpb.BatchRequest,
br *kvpb.BatchResponse,
pErr *kvpb.Error,
maxBytes int64,
condenseThreshold int64,
condenseLocksIfOverBudget bool,
) error {
if err := tp.updateLockTrackingInner(ctx, ba, br, pErr); err != nil {
Expand All @@ -682,7 +709,7 @@ func (tp *txnPipeliner) updateLockTracking(
// don't estimate the size of the locks accurately for ranged locking reads,
// it is possible that ifWrites have exceeded the maxBytes threshold. That's
// fine for now, but we add some observability to be aware of this happening.
if tp.ifWrites.byteSize() > maxBytes {
if tp.ifWrites.byteSize() > condenseThreshold {
if tp.inflightOverBudgetEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx, "a transaction's in-flight writes and locking reads have "+
"exceeded the intent tracking limit (kv.transaction.max_intents_bytes). "+
Expand All @@ -698,7 +725,7 @@ func (tp *txnPipeliner) updateLockTracking(
// in-flight writes. It's possible that locksBudget is negative, but the
// remainder of this function and maybeCondense handle this case (each span
// will be maximally condensed).
locksBudget := maxBytes - tp.ifWrites.byteSize()
locksBudget := condenseThreshold - tp.ifWrites.byteSize()
// If we're below budget, there's nothing more to do.
if tp.lockFootprint.bytesSize() <= locksBudget {
return nil
Expand Down
132 changes: 71 additions & 61 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2592,75 +2592,85 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.expRejectIdx >= len(tc.reqs) {
t.Fatalf("invalid test")
}

tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)

txn := makeTxnProto()

var respIdx int
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
// Handle rollbacks and commits separately.
if ba.IsSingleAbortTxnRequest() || ba.IsSingleCommitRequest() {
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
testutils.RunTrueAndFalse(t, "reject-using-txn-max-bytes", func(t *testing.T, txnMaxBytes bool) {
if tc.expRejectIdx >= len(tc.reqs) {
t.Fatalf("invalid test")
}

var resp *kvpb.BatchResponse
if respIdx < len(tc.resp) {
resp = tc.resp[respIdx]
tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
if txnMaxBytes {
rejectTxnMaxBytes.Override(ctx, &tp.st.SV, tc.maxSize)
} else {
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
}
respIdx++

if resp != nil {
resp.Txn = ba.Txn
return resp, nil
}
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})
txn := makeTxnProto()

var respIdx int
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
// Handle rollbacks and commits separately.
if ba.IsSingleAbortTxnRequest() || ba.IsSingleCommitRequest() {
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
}

for i, ba := range tc.reqs {
ba.Header = kvpb.Header{Txn: &txn}
_, pErr := tp.SendLocked(ctx, ba)
if i == tc.expRejectIdx {
require.NotNil(t, pErr, "expected rejection, but request succeeded")
var resp *kvpb.BatchResponse
if respIdx < len(tc.resp) {
resp = tc.resp[respIdx]
}
respIdx++

budgetErr := (lockSpansOverBudgetError{})
if !errors.As(pErr.GoError(), &budgetErr) {
t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
if resp != nil {
resp.Txn = ba.Txn
return resp, nil
}
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

for i, ba := range tc.reqs {
ba.Header = kvpb.Header{Txn: &txn}
_, pErr := tp.SendLocked(ctx, ba)
if i == tc.expRejectIdx {
require.NotNil(t, pErr, "expected rejection, but request succeeded")

budgetErr := (lockSpansOverBudgetError{})
if !errors.As(pErr.GoError(), &budgetErr) {
t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
}
require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
if txnMaxBytes {
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSizeLimit.Count())
} else {
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
}

// Make sure rolling back the txn works.
rollback := &kvpb.BatchRequest{}
rollback.Add(&kvpb.EndTxnRequest{Commit: false})
rollback.Txn = &txn
_, pErr = tp.SendLocked(ctx, rollback)
require.Nil(t, pErr)
} else {
require.Nil(t, pErr)

// Make sure that committing works. This is particularly relevant for
// testcases where we ended up over budget but we didn't return an
// error (because we failed to pre-emptively detect that we're going
// to be over budget and the response surprised us with a large
// ResumeSpan). Committing in these situations is allowed, since the
// harm has already been done.
commit := &kvpb.BatchRequest{}
commit.Add(&kvpb.EndTxnRequest{Commit: true})
commit.Txn = &txn
_, pErr = tp.SendLocked(ctx, commit)
require.Nil(t, pErr)
}
require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())

// Make sure rolling back the txn works.
rollback := &kvpb.BatchRequest{}
rollback.Add(&kvpb.EndTxnRequest{Commit: false})
rollback.Txn = &txn
_, pErr = tp.SendLocked(ctx, rollback)
require.Nil(t, pErr)
} else {
require.Nil(t, pErr)

// Make sure that committing works. This is particularly relevant for
// testcases where we ended up over budget but we didn't return an
// error (because we failed to pre-emptively detect that we're going
// to be over budget and the response surprised us with a large
// ResumeSpan). Committing in these situations is allowed, since the
// harm has already been done.
commit := &kvpb.BatchRequest{}
commit.Add(&kvpb.EndTxnRequest{Commit: true})
commit.Txn = &txn
_, pErr = tp.SendLocked(ctx, commit)
require.Nil(t, pErr)
}
}
})
})
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type TxnMetrics struct {
TxnsWithCondensedIntents *metric.Counter
TxnsWithCondensedIntentsGauge *metric.Gauge
TxnsRejectedByLockSpanBudget *metric.Counter
TxnsRejectedByLockSizeLimit *metric.Counter
TxnsInFlightLocksOverTrackingBudget *metric.Counter

// Restarts is the number of times we had to restart the transaction.
Expand Down Expand Up @@ -174,6 +175,12 @@ var (
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaTxnsRejectedByLockSizeLimit = metric.Metadata{
Name: "txn.size_limit_rejected",
Help: "KV transactions that have been aborted because they exceeded the size limit",
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaTxnsInflightLocksOverTrackingBudget = metric.Metadata{
Name: "txn.inflight_locks_over_tracking_budget",
Help: "KV transactions whose in-flight writes and locking reads have exceeded " +
Expand Down Expand Up @@ -289,6 +296,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
TxnsWithCondensedIntents: metric.NewCounter(metaTxnsWithCondensedIntentSpans),
TxnsWithCondensedIntentsGauge: metric.NewGauge(metaTxnsWithCondensedIntentSpansGauge),
TxnsRejectedByLockSpanBudget: metric.NewCounter(metaTxnsRejectedByLockSpanBudget),
TxnsRejectedByLockSizeLimit: metric.NewCounter(metaTxnsRejectedByLockSizeLimit),
TxnsInFlightLocksOverTrackingBudget: metric.NewCounter(metaTxnsInflightLocksOverTrackingBudget),
Restarts: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaRestartsHistogram,
Expand Down

0 comments on commit 3279071

Please sign in to comment.