diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index c0454f2eb70d..d9605d1500f7 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -94,6 +94,7 @@ kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled sys
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions application
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
+kv.transaction.reject_intents_bytes integer 0 maximum number of intent bytes for a single transactions, 0 to disable application
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed application
kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, transactional locking reads are pipelined through Raft consensus application
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index de02c9020ce1..77b5c4139016 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -123,6 +123,7 @@
kv.transaction.max_intents_bytes
| integer | 4194304 | maximum number of bytes used to track locks in transactions | Serverless/Dedicated/Self-Hosted |
kv.transaction.max_refresh_spans_bytes
| integer | 4194304 | maximum number of bytes used to track refresh spans in serializable transactions | Serverless/Dedicated/Self-Hosted |
kv.transaction.randomized_anchor_key.enabled
| boolean | false | dictates whether a transactions anchor key is randomized or not | Serverless/Dedicated/Self-Hosted |
+kv.transaction.reject_intents_bytes
| integer | 0 | maximum number of intent bytes for a single transactions, 0 to disable | Serverless/Dedicated/Self-Hosted |
kv.transaction.reject_over_max_intents_budget.enabled
| boolean | false | if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed | Serverless/Dedicated/Self-Hosted |
kv.transaction.write_pipelining.locking_reads.enabled
| boolean | true | if enabled, transactional locking reads are pipelined through Raft consensus | Serverless/Dedicated/Self-Hosted |
kv.transaction.write_pipelining.ranged_writes.enabled
| boolean | true | if enabled, transactional ranged writes are pipelined through Raft consensus | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
index dd81238dfa95..555fcfe98ba3 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
@@ -111,6 +111,19 @@ var rejectTxnOverTrackedWritesBudget = settings.RegisterBoolSetting(
false,
settings.WithPublic)
+// rejectTxnMaxBytes will reject transactions if the maximum number of intent
+// bytes exceeds this value. This is based on the uncondensed size of the
+// intents. Typically it is preferable to use this setting instead of
+// kv.transaction.reject_over_max_intents_budget.enabled because it allows
+// separating the threshold where we condense intents to preserve memory from
+// the limit where we reject overly large transactions.
+var rejectTxnMaxBytes = settings.RegisterIntSetting(
+ settings.ApplicationLevel,
+ "kv.transaction.reject_intents_bytes",
+ "maximum number of intent bytes for a single transactions, 0 to disable",
+ 0,
+ settings.WithPublic)
+
// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
// asynchronously proposed through Raft and ensures that all interfering
@@ -297,18 +310,16 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}
- // If we're configured to reject txns over budget, we pre-emptively check
+ // If we're configured to reject txns over budget, we preemptively check
// whether this current batch is likely to push us over the edge and, if it
// does, we reject it. Note that this check is not precise because generally
// we can't know exactly the size of the locks that will be taken by a
// request (think ResumeSpan); even if the check passes, we might end up over
// budget.
rejectOverBudget := rejectTxnOverTrackedWritesBudget.Get(&tp.st.SV)
- maxBytes := TrackedWritesMaxSize.Get(&tp.st.SV)
- if rejectOverBudget {
- if err := tp.maybeRejectOverBudget(ba, maxBytes); err != nil {
- return nil, kvpb.NewError(err)
- }
+ condenseThreshold := TrackedWritesMaxSize.Get(&tp.st.SV)
+ if err := tp.maybeRejectOverBudget(ba, condenseThreshold, rejectOverBudget, rejectTxnMaxBytes.Get(&tp.st.SV)); err != nil {
+ return nil, kvpb.NewError(err)
}
ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)
@@ -330,7 +341,7 @@ func (tp *txnPipeliner) SendLocked(
// budget. Further requests will be rejected if they attempt to take more
// locks.
if err := tp.updateLockTracking(
- ctx, ba, br, pErr, maxBytes, !rejectOverBudget, /* condenseLocksIfOverBudget */
+ ctx, ba, br, pErr, condenseThreshold, !rejectOverBudget, /* condenseLocksIfOverBudget */
); err != nil {
return nil, kvpb.NewError(err)
}
@@ -355,7 +366,12 @@ func (tp *txnPipeliner) SendLocked(
// the transaction commits. If it fails, then we'd add the lock spans to our
// tracking and exceed the budget. It's easier for this code and more
// predictable for the user if we just reject this batch, though.
-func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes int64) error {
+func (tp *txnPipeliner) maybeRejectOverBudget(
+ ba *kvpb.BatchRequest,
+ condenseThreshold int64,
+ rejectIfWouldCondense bool,
+ rejectTxnMaxBytes int64,
+) error {
// Bail early if the current request is not locking, even if we are already
// over budget. In particular, we definitely want to permit rollbacks. We also
// want to permit lone commits, since the damage in taking too much memory has
@@ -374,12 +390,22 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
// Compute how many bytes we can allocate for locks. We account for the
// inflight-writes conservatively, since these might turn into lock spans
// later.
- locksBudget := maxBytes - tp.ifWrites.byteSize()
+ ifLockBytes := tp.ifWrites.byteSize()
+ estimate := tp.lockFootprint.estimateSize(spans, condenseThreshold) + ifLockBytes
+ if rejectIfWouldCondense && estimate > condenseThreshold {
+ tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
+ bErr := newLockSpansOverBudgetError(estimate, condenseThreshold, ba)
+ return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
+ }
- estimate := tp.lockFootprint.estimateSize(spans, locksBudget)
- if estimate > locksBudget {
+ // NB: We use the same error message as the one above, to avoid adding
+ // additional encoding and decoding for a backport. We could consider
+ // splitting this error message in the future. Also we need to add the
+ // condensedBytes because we want to make sure we are accounting for the
+ // full intent size on the server.
+ if estimate+tp.lockFootprint.condensedBytes > rejectTxnMaxBytes {
tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
- bErr := newLockSpansOverBudgetError(estimate+tp.ifWrites.byteSize(), maxBytes, ba)
+ bErr := newLockSpansOverBudgetError(estimate+tp.lockFootprint.condensedBytes, rejectTxnMaxBytes, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}
return nil
@@ -671,7 +697,7 @@ func (tp *txnPipeliner) updateLockTracking(
ba *kvpb.BatchRequest,
br *kvpb.BatchResponse,
pErr *kvpb.Error,
- maxBytes int64,
+ condenseThreshold int64,
condenseLocksIfOverBudget bool,
) error {
if err := tp.updateLockTrackingInner(ctx, ba, br, pErr); err != nil {
@@ -682,7 +708,7 @@ func (tp *txnPipeliner) updateLockTracking(
// don't estimate the size of the locks accurately for ranged locking reads,
// it is possible that ifWrites have exceeded the maxBytes threshold. That's
// fine for now, but we add some observability to be aware of this happening.
- if tp.ifWrites.byteSize() > maxBytes {
+ if tp.ifWrites.byteSize() > condenseThreshold {
if tp.inflightOverBudgetEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx, "a transaction's in-flight writes and locking reads have "+
"exceeded the intent tracking limit (kv.transaction.max_intents_bytes). "+
@@ -698,7 +724,7 @@ func (tp *txnPipeliner) updateLockTracking(
// in-flight writes. It's possible that locksBudget is negative, but the
// remainder of this function and maybeCondense handle this case (each span
// will be maximally condensed).
- locksBudget := maxBytes - tp.ifWrites.byteSize()
+ locksBudget := condenseThreshold - tp.ifWrites.byteSize()
// If we're below budget, there's nothing more to do.
if tp.lockFootprint.bytesSize() <= locksBudget {
return nil
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
index c93c6405d2d4..1b4203ccc1a8 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
@@ -2592,75 +2592,81 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
- if tc.expRejectIdx >= len(tc.reqs) {
- t.Fatalf("invalid test")
- }
-
- tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
- TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
- rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
-
- txn := makeTxnProto()
-
- var respIdx int
- mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
- // Handle rollbacks and commits separately.
- if ba.IsSingleAbortTxnRequest() || ba.IsSingleCommitRequest() {
- br := ba.CreateReply()
- br.Txn = ba.Txn
- return br, nil
+ testutils.RunTrueAndFalse(t, "reject-threshold", func(t *testing.T, reject bool) {
+ if tc.expRejectIdx >= len(tc.reqs) {
+ t.Fatalf("invalid test")
}
- var resp *kvpb.BatchResponse
- if respIdx < len(tc.resp) {
- resp = tc.resp[respIdx]
+ tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
+ if reject {
+ rejectTxnMaxBytes.Override(ctx, &tp.st.SV, tc.maxSize)
+ } else {
+ TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
+ rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
}
- respIdx++
- if resp != nil {
- resp.Txn = ba.Txn
- return resp, nil
- }
- br := ba.CreateReply()
- br.Txn = ba.Txn
- return br, nil
- })
+ txn := makeTxnProto()
+
+ var respIdx int
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ // Handle rollbacks and commits separately.
+ if ba.IsSingleAbortTxnRequest() || ba.IsSingleCommitRequest() {
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ return br, nil
+ }
- for i, ba := range tc.reqs {
- ba.Header = kvpb.Header{Txn: &txn}
- _, pErr := tp.SendLocked(ctx, ba)
- if i == tc.expRejectIdx {
- require.NotNil(t, pErr, "expected rejection, but request succeeded")
+ var resp *kvpb.BatchResponse
+ if respIdx < len(tc.resp) {
+ resp = tc.resp[respIdx]
+ }
+ respIdx++
- budgetErr := (lockSpansOverBudgetError{})
- if !errors.As(pErr.GoError(), &budgetErr) {
- t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
+ if resp != nil {
+ resp.Txn = ba.Txn
+ return resp, nil
+ }
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ return br, nil
+ })
+
+ for i, ba := range tc.reqs {
+ ba.Header = kvpb.Header{Txn: &txn}
+ _, pErr := tp.SendLocked(ctx, ba)
+ if i == tc.expRejectIdx {
+ require.NotNil(t, pErr, "expected rejection, but request succeeded")
+
+ budgetErr := (lockSpansOverBudgetError{})
+ if !errors.As(pErr.GoError(), &budgetErr) {
+ t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
+ }
+ require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
+ require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
+
+ // Make sure rolling back the txn works.
+ rollback := &kvpb.BatchRequest{}
+ rollback.Add(&kvpb.EndTxnRequest{Commit: false})
+ rollback.Txn = &txn
+ _, pErr = tp.SendLocked(ctx, rollback)
+ require.Nil(t, pErr)
+ } else {
+ require.Nil(t, pErr)
+
+ // Make sure that committing works. This is particularly relevant for
+ // testcases where we ended up over budget but we didn't return an
+ // error (because we failed to pre-emptively detect that we're going
+ // to be over budget and the response surprised us with a large
+ // ResumeSpan). Committing in these situations is allowed, since the
+ // harm has already been done.
+ commit := &kvpb.BatchRequest{}
+ commit.Add(&kvpb.EndTxnRequest{Commit: true})
+ commit.Txn = &txn
+ _, pErr = tp.SendLocked(ctx, commit)
+ require.Nil(t, pErr)
}
- require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
- require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
-
- // Make sure rolling back the txn works.
- rollback := &kvpb.BatchRequest{}
- rollback.Add(&kvpb.EndTxnRequest{Commit: false})
- rollback.Txn = &txn
- _, pErr = tp.SendLocked(ctx, rollback)
- require.Nil(t, pErr)
- } else {
- require.Nil(t, pErr)
-
- // Make sure that committing works. This is particularly relevant for
- // testcases where we ended up over budget but we didn't return an
- // error (because we failed to pre-emptively detect that we're going
- // to be over budget and the response surprised us with a large
- // ResumeSpan). Committing in these situations is allowed, since the
- // harm has already been done.
- commit := &kvpb.BatchRequest{}
- commit.Add(&kvpb.EndTxnRequest{Commit: true})
- commit.Txn = &txn
- _, pErr = tp.SendLocked(ctx, commit)
- require.Nil(t, pErr)
}
- }
+ })
})
}
}