diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index f08a8fd0969a..852c6d7d324a 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -36,6 +36,7 @@ kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions
+kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed
security.ocsp.mode enumeration off "use OCSP to check whether TLS certificates are revoked. If the OCSP
server is unreachable, in strict mode all certificates will be rejected
and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]"
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 4f5d93be44b8..6ec9c71bc637 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -40,6 +40,7 @@
kv.snapshot_recovery.max_rate | byte size | 8.0 MiB | the rate limit (bytes/sec) to use for recovery snapshots |
kv.transaction.max_intents_bytes | integer | 4194304 | maximum number of bytes used to track locks in transactions |
kv.transaction.max_refresh_spans_bytes | integer | 256000 | maximum number of bytes used to track refresh spans in serializable transactions |
+kv.transaction.reject_over_max_intents_budget.enabled | boolean | false | if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed |
security.ocsp.mode | enumeration | off | use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] |
security.ocsp.timeout | duration | 3s | timeout before considering the OCSP server unreachable |
server.auth_log.sql_connections.enabled | boolean | false | if set, log SQL client connect and disconnect events (note: may hinder performance on loaded nodes) |
diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel
index 6c9c1b3c8921..3c8dcee4c6c2 100644
--- a/pkg/kv/kvclient/kvcoord/BUILD.bazel
+++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel
@@ -46,6 +46,8 @@ go_library(
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
+ "//pkg/sql/pgwire/pgcode",
+ "//pkg/sql/pgwire/pgerror",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/contextutil",
@@ -64,8 +66,10 @@ go_library(
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
+ "@com_github_cockroachdb_errors//errorspb",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
+ "@com_github_gogo_protobuf//proto",
"@com_github_google_btree//:btree",
],
)
@@ -148,6 +152,8 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
+ "//pkg/sql/pgwire/pgcode",
+ "//pkg/sql/pgwire/pgerror",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set.go b/pkg/kv/kvclient/kvcoord/condensable_span_set.go
index 859c9dfaff48..d5e4a24b1d8c 100644
--- a/pkg/kv/kvclient/kvcoord/condensable_span_set.go
+++ b/pkg/kv/kvclient/kvcoord/condensable_span_set.go
@@ -51,7 +51,7 @@ func (s *condensableSpanSet) insert(spans ...roachpb.Span) {
// increase the overall bounds of the span set, but will eliminate duplicated
// spans and combine overlapping spans.
//
-// The method has the side effect of sorting the stable write set.
+// The method has the side effect of sorting the set.
func (s *condensableSpanSet) mergeAndSort() {
oldLen := len(s.s)
s.s, _ = roachpb.MergeSpans(&s.s)
@@ -175,6 +175,61 @@ func (s *condensableSpanSet) clear() {
*s = condensableSpanSet{}
}
+// estimateSize returns the size that the spanSet would grow to if spans were
+// added to the set. As a side-effect, the receiver might get its spans merged.
+//
+// The result doesn't take into consideration the effect of condensing the
+// spans, but might take into consideration the effects of merging the spans
+// (which is not a lossy operation): mergeThresholdBytes instructs the
+// simulation to perform merging and de-duping if the size grows over this
+// threshold.
+func (s *condensableSpanSet) estimateSize(spans []roachpb.Span, mergeThresholdBytes int64) int64 {
+ var bytes int64
+ for _, sp := range spans {
+ bytes += spanSize(sp)
+ }
+ {
+ estimate := s.bytes + bytes
+ if estimate <= mergeThresholdBytes {
+ return estimate
+ }
+ }
+
+ // Merge and de-dupe in the hope of saving some space.
+
+ // First, merge the existing spans in-place. Doing it in-place instead of
+ // operating on a copy avoids the risk of quadratic behavior over a series of
+ // estimateSize() calls, where each call has to repeatedly merge a copy in
+ // order to discover that the merger saves a lot of space.
+ s.mergeAndSort()
+
+ // See if merging s was enough.
+ estimate := s.bytes + bytes
+ if estimate <= mergeThresholdBytes {
+ return estimate
+ }
+
+ // Try harder - merge (a copy of) the existing spans with the new spans.
+ spans = append(spans, s.s...)
+ lenBeforeMerge := len(spans)
+ spans, _ = roachpb.MergeSpans(&spans)
+ if len(spans) == lenBeforeMerge {
+ // Nothing changed -i.e. we failed to merge any spans.
+ return estimate
+ }
+ // Recompute the size.
+ bytes = 0
+ for _, sp := range spans {
+ bytes += spanSize(sp)
+ }
+ return bytes
+}
+
+// bytesSize returns the size of the tracked spans.
+func (s *condensableSpanSet) bytesSize() int64 {
+ return s.bytes
+}
+
func spanSize(sp roachpb.Span) int64 {
return int64(len(sp.Key) + len(sp.EndKey))
}
diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go b/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go
index b7f1e3cb069c..1d39a73ed9b4 100644
--- a/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go
+++ b/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go
@@ -31,3 +31,50 @@ func TestCondensableSpanSetMergeContiguousSpans(t *testing.T) {
s.mergeAndSort()
require.Equal(t, int64(2), s.bytes)
}
+
+func TestCondensableSpanSetEstimateSize(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ab := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}
+ bc := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}
+
+ tests := []struct {
+ name string
+ set []roachpb.Span
+ newSpans []roachpb.Span
+ mergeThreshold int64
+ expEstimate int64
+ }{
+ {
+ name: "new spans fit without merging",
+ set: []roachpb.Span{ab, bc},
+ newSpans: []roachpb.Span{ab},
+ mergeThreshold: 100,
+ expEstimate: 6,
+ },
+ {
+ // The set gets merged, the new spans don't.
+ name: "set needs merging",
+ set: []roachpb.Span{ab, bc},
+ newSpans: []roachpb.Span{ab},
+ mergeThreshold: 5,
+ expEstimate: 4,
+ },
+ {
+ // The set gets merged, and then it gets merged again with the newSpans.
+ name: "new spans fit without merging",
+ set: []roachpb.Span{ab, bc},
+ newSpans: []roachpb.Span{ab, bc},
+ mergeThreshold: 5,
+ expEstimate: 2,
+ },
+ }
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ s := condensableSpanSet{}
+ s.insert(tc.set...)
+ require.Equal(t, tc.expEstimate, s.estimateSize(tc.newSpans, tc.mergeThreshold))
+ })
+ }
+}
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
index 4d29034cae17..47220e6ef2d4 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
@@ -14,13 +14,19 @@ import (
"context"
"fmt"
"sort"
+ "strconv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/errors"
+ "github.com/cockroachdb/errors/errorspb"
+ "github.com/gogo/protobuf/proto"
"github.com/google/btree"
)
@@ -76,6 +82,15 @@ var trackedWritesMaxSize = settings.RegisterIntSetting(
1<<22, /* 4 MB */
).WithPublic()
+// rejectTxnOverTrackedWritesBudget dictates what happens when a txn exceeds
+// kv.transaction.max_intents_bytes.
+var rejectTxnOverTrackedWritesBudget = settings.RegisterBoolSetting(
+ "kv.transaction.reject_over_max_intents_budget.enabled",
+ "if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) "+
+ "are rejected instead of having their lock spans imprecisely compressed",
+ false,
+).WithPublic()
+
// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
// asynchronously proposed through Raft and ensures that all interfering
@@ -251,6 +266,20 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}
+ // If we're configured to reject txns over budget, we pre-emptively check
+ // whether this current batch is likely to push us over the edge and, if it
+ // does, we reject it. Note that this check is not precise because generally
+ // we can't know exactly the size of the locks that will be taken by a
+ // request (think ResumeSpan); even if the check passes, we might end up over
+ // budget.
+ rejectOverBudget := rejectTxnOverTrackedWritesBudget.Get(&tp.st.SV)
+ maxBytes := trackedWritesMaxSize.Get(&tp.st.SV)
+ if rejectOverBudget {
+ if err := tp.maybeRejectOverBudget(ba, maxBytes); err != nil {
+ return nil, roachpb.NewError(err)
+ }
+ }
+
ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)
// Adjust the batch so that it doesn't miss any in-flight writes.
@@ -261,13 +290,64 @@ func (tp *txnPipeliner) SendLocked(
// Update the in-flight write set and the lock footprint with the results of
// the request.
- tp.updateLockTracking(ctx, ba, br)
+ //
+ // If we were configured to reject transaction when they go over budget, we
+ // don't want to condense the lock spans (even if it turns out that we did go
+ // over budget). The point of the rejection setting is to avoid condensing
+ // because of the possible performance cliff when doing so. As such, if we did
+ // go over budget despite the earlier pre-emptive check, then we stay over
+ // budget. Further requests will be rejected if they attempt to take more
+ // locks.
+ tp.updateLockTracking(ctx, ba, br, maxBytes, !rejectOverBudget /* condenseLocksIfOverBudget */)
if pErr != nil {
return nil, tp.adjustError(ctx, ba, pErr)
}
return tp.stripQueryIntents(br), nil
}
+// maybeRejectOverBudget checks the request against the memory limit for
+// tracking the txn's locks. If the request is estimated to exceed the lock
+// tracking budget, an error is returned.
+//
+// We can only estimate what spans this request would end up locking
+// (because of ResumeSpans, for example), so this is a best-effort check.
+//
+// NOTE: We could be more discriminate here: if the request contains an
+// EndTxn among other writes (for example, if it's a 1PC batch) then we
+// could allow the request through and hope that it succeeds. If if
+// succeeds, then its locks are never technically counted against the
+// transaction's budget because the client doesn't need to track them after
+// the transaction commits. If it fails, then we'd add the lock spans to our
+// tracking and exceed the budget. It's easier for this code and more
+// predictable for the user if we just reject this batch, though.
+func (tp *txnPipeliner) maybeRejectOverBudget(ba roachpb.BatchRequest, maxBytes int64) error {
+ // Bail early if the current request is not locking, even if we are already
+ // over budget. In particular, we definitely want to permit rollbacks. We also
+ // want to permit lone commits, since the damage in taking too much memory has
+ // already been done.
+ if !ba.IsLocking() {
+ return nil
+ }
+
+ var spans []roachpb.Span
+ ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, _ lock.Durability) {
+ spans = append(spans, sp)
+ })
+
+ // Compute how many bytes we can allocate for locks. We account for the
+ // inflight-writes conservatively, since these might turn into lock spans
+ // later.
+ locksBudget := maxBytes - tp.ifWrites.byteSize()
+
+ estimate := tp.lockFootprint.estimateSize(spans, locksBudget)
+ if estimate > locksBudget {
+ tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
+ bErr := newLockSpansOverBudgetError(estimate+tp.ifWrites.byteSize(), maxBytes, ba)
+ return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
+ }
+ return nil
+}
+
// attachLocksToEndTxn attaches the in-flight writes and the lock footprint that
// the interceptor has been tracking to any EndTxn requests present in the
// provided batch. It augments these sets with locking requests from the current
@@ -505,36 +585,54 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba roachpb.BatchRequest) roachpb.B
// 3. it moves all in-flight writes that the request proved to exist from
// the in-flight writes set to the lock footprint.
//
-// After updating the write sets, the lock footprint is condensed to ensure that
-// it remains under its memory limit.
+// The transaction's lock set is only allowed to go up to maxBytes. If it goes
+// over, the behavior depends on the condenseLocksIfOverBudget - we either
+// compress the spans with loss of fidelity (which can be a significant
+// performance problem at commit/rollback time) or we don't compress (and we
+// stay over budget).
//
// If no response is provided (indicating an error), all writes from the batch
// are added directly to the lock footprint to avoid leaking any locks when the
// transaction cleans up.
func (tp *txnPipeliner) updateLockTracking(
- ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
+ ctx context.Context,
+ ba roachpb.BatchRequest,
+ br *roachpb.BatchResponse,
+ maxBytes int64,
+ condenseLocksIfOverBudget bool,
) {
tp.updateLockTrackingInner(ctx, ba, br)
// Deal with compacting the lock spans.
- // After adding new writes to the lock footprint, check whether we need to
- // condense the set to stay below memory limits.
- alreadyCondensed := tp.lockFootprint.condensed
- // Compute how many bytes we can allocate for locks. We account for the
- // inflight-writes conservatively, since these might turn into lock spans
- // later.
- locksBudget := trackedWritesMaxSize.Get(&tp.st.SV) - tp.ifWrites.byteSize()
- condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, locksBudget)
- if condensed && !alreadyCondensed {
- if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
- log.Warningf(ctx,
- "a transaction has hit the intent tracking limit (kv.transaction.max_intents_bytes); "+
- "is it a bulk operation? Intent cleanup will be slower. txn: %s ba: %s",
- ba.Txn, ba.Summary())
+ // Compute how many bytes are left for locks after accounting for the
+ // in-flight writes.
+ locksBudget := maxBytes - tp.ifWrites.byteSize()
+ // If we're below budget, there's nothing more to do.
+ if tp.lockFootprint.bytesSize() <= locksBudget {
+ return
+ }
+
+ // We're over budget. Depending on rejectOverBudget, we either condense the
+ // lock spans, or just try to reduce the size without condensing.
+
+ if condenseLocksIfOverBudget {
+ // After adding new writes to the lock footprint, check whether we need to
+ // condense the set to stay below memory limits.
+ alreadyCondensed := tp.lockFootprint.condensed
+ condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, locksBudget)
+ if condensed && !alreadyCondensed {
+ if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
+ log.Warningf(ctx,
+ "a transaction has hit the intent tracking limit (kv.transaction.max_intents_bytes); "+
+ "is it a bulk operation? Intent cleanup will be slower. txn: %s ba: %s",
+ ba.Txn, ba.Summary())
+ }
+ tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
+ tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
}
- tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
- tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
+ } else {
+ tp.lockFootprint.mergeAndSort()
}
}
@@ -951,3 +1049,74 @@ func (a *inFlightWriteAlloc) clear() {
}
*a = (*a)[:0]
}
+
+type lockSpansOverBudgetError struct {
+ lockSpansBytes int64
+ limitBytes int64
+ baSummary string
+ txnDetails string
+}
+
+func newLockSpansOverBudgetError(
+ lockSpansBytes, limitBytes int64, ba roachpb.BatchRequest,
+) lockSpansOverBudgetError {
+ return lockSpansOverBudgetError{
+ lockSpansBytes: lockSpansBytes,
+ limitBytes: limitBytes,
+ baSummary: ba.Summary(),
+ txnDetails: ba.Txn.String(),
+ }
+}
+
+func (l lockSpansOverBudgetError) Error() string {
+ return fmt.Sprintf("the transaction is locking too many rows and exceeded its lock-tracking memory budget; "+
+ "lock spans: %d bytes > budget: %d bytes. Request pushing transaction over the edge: %s. "+
+ "Transaction details: %s.", l.lockSpansBytes, l.limitBytes, l.baSummary, l.txnDetails)
+}
+
+func encodeLockSpansOverBudgetError(
+ _ context.Context, err error,
+) (msgPrefix string, safe []string, details proto.Message) {
+ t := err.(lockSpansOverBudgetError)
+ details = &errorspb.StringsPayload{
+ Details: []string{
+ strconv.FormatInt(t.lockSpansBytes, 10), strconv.FormatInt(t.limitBytes, 10),
+ t.baSummary, t.txnDetails,
+ },
+ }
+ msgPrefix = "the transaction is locking too many rows"
+ return msgPrefix, nil, details
+}
+
+func decodeLockSpansOverBudgetError(
+ _ context.Context, msgPrefix string, safeDetails []string, payload proto.Message,
+) error {
+ m, ok := payload.(*errorspb.StringsPayload)
+ if !ok || len(m.Details) < 4 {
+ // If this ever happens, this means some version of the library
+ // (presumably future) changed the payload type, and we're
+ // receiving this here. In this case, give up and let
+ // DecodeError use the opaque type.
+ return nil
+ }
+ lockBytes, decodeErr := strconv.ParseInt(m.Details[0], 10, 64)
+ if decodeErr != nil {
+ return nil //nolint:returnerrcheck
+ }
+ limitBytes, decodeErr := strconv.ParseInt(m.Details[1], 10, 64)
+ if decodeErr != nil {
+ return nil //nolint:returnerrcheck
+ }
+ return lockSpansOverBudgetError{
+ lockSpansBytes: lockBytes,
+ limitBytes: limitBytes,
+ baSummary: m.Details[2],
+ txnDetails: m.Details[3],
+ }
+}
+
+func init() {
+ pKey := errors.GetTypeKey(lockSpansOverBudgetError{})
+ errors.RegisterLeafEncoder(pKey, encodeLockSpansOverBudgetError)
+ errors.RegisterLeafDecoder(pKey, decodeLockSpansOverBudgetError)
+}
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
index 2ca84f6cea07..00d358cb6d86 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
@@ -23,11 +23,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
@@ -1593,7 +1596,7 @@ func TestTxnPipelinerCondenseLockSpans2(t *testing.T) {
lockSpans: []span{{"a1", "a2"}, {"a3", "a4"}, {"b1", "b2"}, {"b3", "b4"}},
ifWrites: []string{c30},
maxBytes: 20,
- req: putBatch(roachpb.Key("b"), nil, false /* asyncConsensus */),
+ req: putBatchNoAsyncConsensus(roachpb.Key("b"), nil),
// We expect the locks to be condensed as aggressively as possible, which
// means that they're completely condensed at the level of each range.
// Note that the "b" key from the request is included.
@@ -1608,7 +1611,7 @@ func TestTxnPipelinerCondenseLockSpans2(t *testing.T) {
name: "new large inflight-writes",
lockSpans: []span{{"a1", "a2"}, {"a3", "a4"}, {"b1", "b2"}, {"b3", "b4"}},
maxBytes: 20,
- req: putBatch(roachpb.Key(c30), nil, true /* asyncConsensus */),
+ req: putBatch(roachpb.Key(c30), nil),
expLockSpans: []span{{"a1", "a4"}, {"b1", "b4"}, {c30, ""}},
expIfWrites: nil, // The request was not allowed to perform async consensus.
},
@@ -1677,9 +1680,7 @@ func TestTxnPipelinerCondenseLockSpans2(t *testing.T) {
}
}
-// putArgs returns a PutRequest addressed to the default replica for the
-// specified key / value.
-func putBatch(key roachpb.Key, value []byte, asyncConsensus bool) roachpb.BatchRequest {
+func putBatch(key roachpb.Key, value []byte) roachpb.BatchRequest {
ba := roachpb.BatchRequest{}
ba.Add(&roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
@@ -1687,14 +1688,19 @@ func putBatch(key roachpb.Key, value []byte, asyncConsensus bool) roachpb.BatchR
},
Value: roachpb.MakeValueFromBytes(value),
})
- // If we don't want async consensus, we pile on a read that inhibits it.
- if !asyncConsensus {
- ba.Add(&roachpb.GetRequest{
- RequestHeader: roachpb.RequestHeader{
- Key: key,
- },
- })
- }
+ return ba
+}
+
+// putBatchNoAsyncConsesnsus returns a PutRequest addressed to the default
+// replica for the specified key / value. The batch also contains a Get, which
+// inhibits the asyncConsensus flag.
+func putBatchNoAsyncConsensus(key roachpb.Key, value []byte) roachpb.BatchRequest {
+ ba := putBatch(key, value)
+ ba.Add(&roachpb.GetRequest{
+ RequestHeader: roachpb.RequestHeader{
+ Key: key,
+ },
+ })
return ba
}
@@ -1728,6 +1734,166 @@ func (s descriptorDBRangeIterator) Error() error {
return nil
}
+// Test that the pipeliner rejects requests when the lock span budget is
+// exceeded, if configured to do so.
+func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+
+ largeAs := make([]byte, 11)
+ for i := 0; i < len(largeAs); i++ {
+ largeAs[i] = 'a'
+ }
+ largeWrite := putBatch(largeAs, nil)
+ mediumWrite := putBatch(largeAs[:5], nil)
+
+ delRange := roachpb.BatchRequest{}
+ delRange.Header.MaxSpanRequestKeys = 1
+ delRange.Add(&roachpb.DeleteRangeRequest{
+ RequestHeader: roachpb.RequestHeader{
+ Key: roachpb.Key("a"),
+ EndKey: roachpb.Key("b"),
+ },
+ })
+ delRangeResp := delRange.CreateReply()
+ delRangeResp.Responses[0].GetInner().(*roachpb.DeleteRangeResponse).ResumeSpan = &roachpb.Span{
+ Key: largeAs,
+ EndKey: roachpb.Key("b"),
+ }
+
+ testCases := []struct {
+ name string
+ // The requests to be sent one by one.
+ reqs []roachpb.BatchRequest
+ // The responses for reqs. If an entry is nil, a response is automatically
+ // generated for it. Requests past the end of the resp array are also
+ // generated automatically.
+ resp []*roachpb.BatchResponse
+ // The 0-based index of the request that's expected to be rejected. -1 if no
+ // request is expected to be rejected.
+ expRejectIdx int
+ }{
+ {name: "large request",
+ reqs: []roachpb.BatchRequest{largeWrite},
+ expRejectIdx: 0,
+ },
+ {name: "requests that add up",
+ reqs: []roachpb.BatchRequest{
+ putBatchNoAsyncConsensus(roachpb.Key("aaaa"), nil),
+ putBatchNoAsyncConsensus(roachpb.Key("bbbb"), nil),
+ putBatchNoAsyncConsensus(roachpb.Key("cccc"), nil)},
+ expRejectIdx: 2,
+ },
+ {name: "async requests that add up",
+ // Like the previous test, but this time the requests run with async
+ // consensus. Being tracked as in-flight writes, this test shows that
+ // in-flight writes count towards the budget.
+ reqs: []roachpb.BatchRequest{
+ putBatch(roachpb.Key("aaaa"), nil),
+ putBatch(roachpb.Key("bbbb"), nil),
+ putBatch(roachpb.Key("cccc"), nil)},
+ expRejectIdx: 2,
+ },
+ {
+ name: "response goes over budget, next request rejected",
+ // A request returns a response with a large resume span, which takes up
+ // the budget. Then the next request will be rejected.
+ reqs: []roachpb.BatchRequest{delRange, putBatch(roachpb.Key("a"), nil)},
+ resp: []*roachpb.BatchResponse{delRangeResp},
+ expRejectIdx: 1,
+ },
+ {
+ name: "response goes over budget",
+ // Like the previous test, except here we don't have a followup request
+ // once we're above budget. The test runner will commit the txn, and this
+ // test checks that committing is allowed.
+ reqs: []roachpb.BatchRequest{delRange},
+ resp: []*roachpb.BatchResponse{delRangeResp},
+ expRejectIdx: -1,
+ },
+ {
+ // Request keys overlap, so they don't count twice.
+ name: "overlapping requests",
+ reqs: []roachpb.BatchRequest{mediumWrite, mediumWrite, mediumWrite},
+ expRejectIdx: -1,
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ if tc.expRejectIdx >= len(tc.reqs) {
+ t.Fatalf("invalid test")
+ }
+
+ tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
+ trackedWritesMaxSize.Override(ctx, &tp.st.SV, 10) /* reject when exceeding 10 bytes */
+ rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
+
+ txn := makeTxnProto()
+
+ var respIdx int
+ mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
+ // Handle rollbacks and commits separately.
+ if ba.IsSingleAbortTxnRequest() || ba.IsSingleCommitRequest() {
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ return br, nil
+ }
+
+ var resp *roachpb.BatchResponse
+ if respIdx < len(tc.resp) {
+ resp = tc.resp[respIdx]
+ }
+ respIdx++
+
+ if resp != nil {
+ resp.Txn = ba.Txn
+ return resp, nil
+ }
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ return br, nil
+ })
+
+ for i, ba := range tc.reqs {
+ ba.Header = roachpb.Header{Txn: &txn}
+ _, pErr := tp.SendLocked(ctx, ba)
+ if i == tc.expRejectIdx {
+ require.NotNil(t, pErr, "expected rejection, but request succeeded")
+
+ budgetErr := (lockSpansOverBudgetError{})
+ if !errors.As(pErr.GoError(), &budgetErr) {
+ t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
+ }
+ require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
+ require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
+
+ // Make sure rolling back the txn works.
+ rollback := roachpb.BatchRequest{}
+ rollback.Add(&roachpb.EndTxnRequest{Commit: false})
+ rollback.Txn = &txn
+ _, pErr = tp.SendLocked(ctx, rollback)
+ require.Nil(t, pErr)
+ } else {
+ require.Nil(t, pErr)
+
+ // Make sure that committing works. This is particularly relevant for
+ // testcases where we ended up over budget but we didn't return an
+ // error (because we failed to pre-emptively detect that we're going
+ // to be over budget and the response surprised us with a large
+ // ResumeSpan). Committing in these situations is allowed, since the
+ // harm has already been done.
+ commit := roachpb.BatchRequest{}
+ commit.Add(&roachpb.EndTxnRequest{Commit: true})
+ commit.Txn = &txn
+ _, pErr = tp.SendLocked(ctx, commit)
+ require.Nil(t, pErr)
+ }
+ }
+ })
+ }
+}
+
func (s descriptorDBRangeIterator) Desc() *roachpb.RangeDescriptor {
return &s.curDesc
}
diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go
index d3605c6408af..3802da50d937 100644
--- a/pkg/kv/kvclient/kvcoord/txn_metrics.go
+++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go
@@ -35,6 +35,7 @@ type TxnMetrics struct {
TxnsWithCondensedIntents *metric.Counter
TxnsWithCondensedIntentsGauge *metric.Gauge
+ TxnsRejectedByLockSpanBudget *metric.Counter
// Restarts is the number of times we had to restart the transaction.
Restarts *metric.Histogram
@@ -148,6 +149,15 @@ var (
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
+ metaTxnsRejectedByLockSpanBudget = metric.Metadata{
+ Name: "txn.condensed_intent_spans_rejected",
+ Help: "KV transactions that have been aborted because they exceeded their intent tracking " +
+ "memory budget (kv.transaction.max_intents_bytes). " +
+ "Rejection is caused by kv.transaction.reject_over_max_intents_budget.",
+ Measurement: "KV Transactions",
+ Unit: metric.Unit_COUNT,
+ }
+
metaRestartsHistogram = metric.Metadata{
Name: "txn.restarts",
Help: "Number of restarted KV transactions",
@@ -260,6 +270,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
Durations: metric.NewLatency(metaDurationsHistograms, histogramWindow),
TxnsWithCondensedIntents: metric.NewCounter(metaTxnsWithCondensedIntentSpans),
TxnsWithCondensedIntentsGauge: metric.NewGauge(metaTxnsWithCondensedIntentSpansGauge),
+ TxnsRejectedByLockSpanBudget: metric.NewCounter(metaTxnsRejectedByLockSpanBudget),
Restarts: metric.NewHistogram(metaRestartsHistogram, histogramWindow, 100, 3),
RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld),
RestartsWriteTooOldMulti: telemetry.NewCounterWithMetric(metaRestartsWriteTooOldMulti),
diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go
index b684f2c60b59..cfbf601e4a02 100644
--- a/pkg/roachpb/batch.go
+++ b/pkg/roachpb/batch.go
@@ -221,6 +221,15 @@ func (ba *BatchRequest) IsSingleAbortTxnRequest() bool {
return false
}
+// IsSingleCommitRequest returns true iff the batch contains a single request,
+// and that request is an EndTxnRequest(commit=true).
+func (ba *BatchRequest) IsSingleCommitRequest() bool {
+ if ba.isSingleRequestWithMethod(EndTxn) {
+ return ba.Requests[0].GetInner().(*EndTxnRequest).Commit
+ }
+ return false
+}
+
// IsSingleRefreshRequest returns true iff the batch contains a single request,
// and that request is a RefreshRequest.
func (ba *BatchRequest) IsSingleRefreshRequest() bool {
diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go
index cfd8c7fee502..14b5c7719931 100644
--- a/pkg/testutils/lint/lint_test.go
+++ b/pkg/testutils/lint/lint_test.go
@@ -1145,6 +1145,7 @@ func TestLint(t *testing.T) {
"*.go",
":!*.pb.go",
":!*.pb.gw.go",
+ ":!kv/kvclient/kvcoord/txn_interceptor_pipeliner.go",
":!sql/pgwire/pgerror/constraint_name.go",
":!sql/pgwire/pgerror/severity.go",
":!sql/pgwire/pgerror/with_candidate_code.go",
@@ -2102,6 +2103,8 @@ func TestLint(t *testing.T) {
stream.GrepNot(`pkg/roachpb/errors\.go:.*invalid direct cast on error object`),
// Cast in decode handler.
stream.GrepNot(`pkg/sql/pgwire/pgerror/constraint_name\.go:.*invalid direct cast on error object`),
+ // Cast in decode handler.
+ stream.GrepNot(`pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test\.go:.*invalid direct cast on error object`),
// pgerror's pgcode logic uses its own custom cause recursion
// algorithm and thus cannot use errors.If() which mandates a
// different recursion order.
diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go
index 985a78e2ccac..80c5e81432ca 100644
--- a/pkg/ts/catalog/chart_catalog.go
+++ b/pkg/ts/catalog/chart_catalog.go
@@ -1047,6 +1047,13 @@ var charts = []sectionDescription{
"txn.condensed_intent_spans_gauge",
},
},
+ {
+ Title: "Intents condensing - transactions rejected",
+ Downsampler: DescribeAggregator_MAX,
+ Metrics: []string{
+ "txn.condensed_intent_spans_rejected",
+ },
+ },
},
},
{