Skip to content

Commit

Permalink
kvserver: add setting to reject overly large transactions
Browse files Browse the repository at this point in the history
Previously we had an ability to reject transactions that required too
much memory, however this was tied together with the condense limit. The
limits have too different purposes and should be set independently. The
condense size limit (kv.transaction.max_intents_bytes) is used to
protect the memory on the client side by using less precise tracking
once it passes a certain size. The new limit
(kv.transaction.reject_intents_bytes) is intended to protect the server
side by preventing a client from creating a transaction with too many
bytes in it. Large transactions with many intents can have a
negative impact especially in a multi-tenant system.

Epic: none

Fixes: #135841

Release note (ops change): Adds a new configurable parameter
kv.transaction.reject_intents_bytes which will prevent
transactions from creating too many intents.
  • Loading branch information
andrewbaptist committed Dec 3, 2024
1 parent 8bf8358 commit cd7512e
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 76 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled sys
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions application
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
kv.transaction.reject_intents_bytes integer 0 maximum number of intent bytes for a single transactions, 0 to disable application
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed application
kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, transactional locking reads are pipelined through Raft consensus application
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<tr><td><div id="setting-kv-transaction-max-intents-bytes" class="anchored"><code>kv.transaction.max_intents_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-refresh-spans-bytes" class="anchored"><code>kv.transaction.max_refresh_spans_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-randomized-anchor-key-enabled" class="anchored"><code>kv.transaction.randomized_anchor_key.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>dictates whether a transactions anchor key is randomized or not</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-reject-intents-bytes" class="anchored"><code>kv.transaction.reject_intents_bytes</code></div></td><td>integer</td><td><code>0</code></td><td>maximum number of intent bytes for a single transactions, 0 to disable</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-reject-over-max-intents-budget-enabled" class="anchored"><code>kv.transaction.reject_over_max_intents_budget.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-locking-reads-enabled" class="anchored"><code>kv.transaction.write_pipelining.locking_reads.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional locking reads are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-ranged-writes-enabled" class="anchored"><code>kv.transaction.write_pipelining.ranged_writes.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional ranged writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
56 changes: 41 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ var rejectTxnOverTrackedWritesBudget = settings.RegisterBoolSetting(
false,
settings.WithPublic)

// rejectTxnMaxBytes will reject transactions if the maximum number of intent
// bytes exceeds this value. This is based on the uncondensed size of the
// intents. Typically it is preferable to use this setting instead of
// kv.transaction.reject_over_max_intents_budget.enabled because it allows
// separating the threshold where we condense intents to preserve memory from
// the limit where we reject overly large transactions.
var rejectTxnMaxBytes = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.transaction.reject_intents_bytes",
"maximum number of intent bytes for a single transactions, 0 to disable",
0,
settings.WithPublic)

// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
// asynchronously proposed through Raft and ensures that all interfering
Expand Down Expand Up @@ -297,18 +310,16 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}

// If we're configured to reject txns over budget, we pre-emptively check
// If we're configured to reject txns over budget, we preemptively check
// whether this current batch is likely to push us over the edge and, if it
// does, we reject it. Note that this check is not precise because generally
// we can't know exactly the size of the locks that will be taken by a
// request (think ResumeSpan); even if the check passes, we might end up over
// budget.
rejectOverBudget := rejectTxnOverTrackedWritesBudget.Get(&tp.st.SV)
maxBytes := TrackedWritesMaxSize.Get(&tp.st.SV)
if rejectOverBudget {
if err := tp.maybeRejectOverBudget(ba, maxBytes); err != nil {
return nil, kvpb.NewError(err)
}
condenseThreshold := TrackedWritesMaxSize.Get(&tp.st.SV)
if err := tp.maybeRejectOverBudget(ba, condenseThreshold, rejectOverBudget, rejectTxnMaxBytes.Get(&tp.st.SV)); err != nil {
return nil, kvpb.NewError(err)
}

ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)
Expand All @@ -330,7 +341,7 @@ func (tp *txnPipeliner) SendLocked(
// budget. Further requests will be rejected if they attempt to take more
// locks.
if err := tp.updateLockTracking(
ctx, ba, br, pErr, maxBytes, !rejectOverBudget, /* condenseLocksIfOverBudget */
ctx, ba, br, pErr, condenseThreshold, !rejectOverBudget, /* condenseLocksIfOverBudget */
); err != nil {
return nil, kvpb.NewError(err)
}
Expand All @@ -355,7 +366,12 @@ func (tp *txnPipeliner) SendLocked(
// the transaction commits. If it fails, then we'd add the lock spans to our
// tracking and exceed the budget. It's easier for this code and more
// predictable for the user if we just reject this batch, though.
func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes int64) error {
func (tp *txnPipeliner) maybeRejectOverBudget(
ba *kvpb.BatchRequest,
condenseThreshold int64,
rejectIfWouldCondense bool,
rejectTxnMaxBytes int64,
) error {
// Bail early if the current request is not locking, even if we are already
// over budget. In particular, we definitely want to permit rollbacks. We also
// want to permit lone commits, since the damage in taking too much memory has
Expand All @@ -374,12 +390,22 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
// Compute how many bytes we can allocate for locks. We account for the
// inflight-writes conservatively, since these might turn into lock spans
// later.
locksBudget := maxBytes - tp.ifWrites.byteSize()
ifLockBytes := tp.ifWrites.byteSize()
estimate := tp.lockFootprint.estimateSize(spans, condenseThreshold) + ifLockBytes
if rejectIfWouldCondense && estimate > condenseThreshold {
tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
bErr := newLockSpansOverBudgetError(estimate, condenseThreshold, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}

estimate := tp.lockFootprint.estimateSize(spans, locksBudget)
if estimate > locksBudget {
// NB: We use the same error message as the one above, to avoid adding
// additional encoding and decoding for a backport. We could consider
// splitting this error message in the future. Also we need to add the
// condensedBytes because we want to make sure we are accounting for the
// full intent size on the server.
if estimate+tp.lockFootprint.condensedBytes > rejectTxnMaxBytes {
tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
bErr := newLockSpansOverBudgetError(estimate+tp.ifWrites.byteSize(), maxBytes, ba)
bErr := newLockSpansOverBudgetError(estimate+tp.lockFootprint.condensedBytes, rejectTxnMaxBytes, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}
return nil
Expand Down Expand Up @@ -671,7 +697,7 @@ func (tp *txnPipeliner) updateLockTracking(
ba *kvpb.BatchRequest,
br *kvpb.BatchResponse,
pErr *kvpb.Error,
maxBytes int64,
condenseThreshold int64,
condenseLocksIfOverBudget bool,
) error {
if err := tp.updateLockTrackingInner(ctx, ba, br, pErr); err != nil {
Expand All @@ -682,7 +708,7 @@ func (tp *txnPipeliner) updateLockTracking(
// don't estimate the size of the locks accurately for ranged locking reads,
// it is possible that ifWrites have exceeded the maxBytes threshold. That's
// fine for now, but we add some observability to be aware of this happening.
if tp.ifWrites.byteSize() > maxBytes {
if tp.ifWrites.byteSize() > condenseThreshold {
if tp.inflightOverBudgetEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx, "a transaction's in-flight writes and locking reads have "+
"exceeded the intent tracking limit (kv.transaction.max_intents_bytes). "+
Expand All @@ -698,7 +724,7 @@ func (tp *txnPipeliner) updateLockTracking(
// in-flight writes. It's possible that locksBudget is negative, but the
// remainder of this function and maybeCondense handle this case (each span
// will be maximally condensed).
locksBudget := maxBytes - tp.ifWrites.byteSize()
locksBudget := condenseThreshold - tp.ifWrites.byteSize()
// If we're below budget, there's nothing more to do.
if tp.lockFootprint.bytesSize() <= locksBudget {
return nil
Expand Down
128 changes: 67 additions & 61 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2592,75 +2592,81 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.expRejectIdx >= len(tc.reqs) {
t.Fatalf("invalid test")
}

tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)

txn := makeTxnProto()

var respIdx int
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
// Handle rollbacks and commits separately.
if ba.IsSingleAbortTxnRequest() || ba.IsSingleCommitRequest() {
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
testutils.RunTrueAndFalse(t, "reject-threshold", func(t *testing.T, reject bool) {
if tc.expRejectIdx >= len(tc.reqs) {
t.Fatalf("invalid test")
}

var resp *kvpb.BatchResponse
if respIdx < len(tc.resp) {
resp = tc.resp[respIdx]
tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
if reject {
rejectTxnMaxBytes.Override(ctx, &tp.st.SV, tc.maxSize)
} else {
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
}
respIdx++

if resp != nil {
resp.Txn = ba.Txn
return resp, nil
}
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})
txn := makeTxnProto()

var respIdx int
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
// Handle rollbacks and commits separately.
if ba.IsSingleAbortTxnRequest() || ba.IsSingleCommitRequest() {
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
}

for i, ba := range tc.reqs {
ba.Header = kvpb.Header{Txn: &txn}
_, pErr := tp.SendLocked(ctx, ba)
if i == tc.expRejectIdx {
require.NotNil(t, pErr, "expected rejection, but request succeeded")
var resp *kvpb.BatchResponse
if respIdx < len(tc.resp) {
resp = tc.resp[respIdx]
}
respIdx++

budgetErr := (lockSpansOverBudgetError{})
if !errors.As(pErr.GoError(), &budgetErr) {
t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
if resp != nil {
resp.Txn = ba.Txn
return resp, nil
}
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

for i, ba := range tc.reqs {
ba.Header = kvpb.Header{Txn: &txn}
_, pErr := tp.SendLocked(ctx, ba)
if i == tc.expRejectIdx {
require.NotNil(t, pErr, "expected rejection, but request succeeded")

budgetErr := (lockSpansOverBudgetError{})
if !errors.As(pErr.GoError(), &budgetErr) {
t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
}
require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())

// Make sure rolling back the txn works.
rollback := &kvpb.BatchRequest{}
rollback.Add(&kvpb.EndTxnRequest{Commit: false})
rollback.Txn = &txn
_, pErr = tp.SendLocked(ctx, rollback)
require.Nil(t, pErr)
} else {
require.Nil(t, pErr)

// Make sure that committing works. This is particularly relevant for
// testcases where we ended up over budget but we didn't return an
// error (because we failed to pre-emptively detect that we're going
// to be over budget and the response surprised us with a large
// ResumeSpan). Committing in these situations is allowed, since the
// harm has already been done.
commit := &kvpb.BatchRequest{}
commit.Add(&kvpb.EndTxnRequest{Commit: true})
commit.Txn = &txn
_, pErr = tp.SendLocked(ctx, commit)
require.Nil(t, pErr)
}
require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())

// Make sure rolling back the txn works.
rollback := &kvpb.BatchRequest{}
rollback.Add(&kvpb.EndTxnRequest{Commit: false})
rollback.Txn = &txn
_, pErr = tp.SendLocked(ctx, rollback)
require.Nil(t, pErr)
} else {
require.Nil(t, pErr)

// Make sure that committing works. This is particularly relevant for
// testcases where we ended up over budget but we didn't return an
// error (because we failed to pre-emptively detect that we're going
// to be over budget and the response surprised us with a large
// ResumeSpan). Committing in these situations is allowed, since the
// harm has already been done.
commit := &kvpb.BatchRequest{}
commit.Add(&kvpb.EndTxnRequest{Commit: true})
commit.Txn = &txn
_, pErr = tp.SendLocked(ctx, commit)
require.Nil(t, pErr)
}
}
})
})
}
}
Expand Down

0 comments on commit cd7512e

Please sign in to comment.