Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: setting to reject txns above lock span limit #66927

Merged
merged 2 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed
security.ocsp.mode enumeration off "use OCSP to check whether TLS certificates are revoked. If the OCSP
server is unreachable, in strict mode all certificates will be rejected
and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]"
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.reject_over_max_intents_budget.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td></tr>
<tr><td><code>security.ocsp.mode</code></td><td>enumeration</td><td><code>off</code></td><td>use OCSP to check whether TLS certificates are revoked. If the OCSP<br/>server is unreachable, in strict mode all certificates will be rejected<br/>and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]</td></tr>
<tr><td><code>security.ocsp.timeout</code></td><td>duration</td><td><code>3s</code></td><td>timeout before considering the OCSP server unreachable</td></tr>
<tr><td><code>server.auth_log.sql_connections.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, log SQL client connect and disconnect events (note: may hinder performance on loaded nodes)</td></tr>
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"dist_sender_rangefeed.go",
"doc.go",
"local_test_cluster_util.go",
"lock_spans_over_budget_error.go",
"node_store.go",
"range_iter.go",
"replica_slice.go",
Expand Down Expand Up @@ -46,6 +47,8 @@ go_library(
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/contextutil",
Expand All @@ -64,8 +67,10 @@ go_library(
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//errorspb",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_google_btree//:btree",
],
)
Expand Down Expand Up @@ -148,6 +153,8 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
Expand Down
62 changes: 59 additions & 3 deletions pkg/kv/kvclient/kvcoord/condensable_span_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *condensableSpanSet) insert(spans ...roachpb.Span) {
// increase the overall bounds of the span set, but will eliminate duplicated
// spans and combine overlapping spans.
//
// The method has the side effect of sorting the stable write set.
// The method has the side effect of sorting the set.
func (s *condensableSpanSet) mergeAndSort() {
oldLen := len(s.s)
s.s, _ = roachpb.MergeSpans(&s.s)
Expand Down Expand Up @@ -80,7 +80,7 @@ func (s *condensableSpanSet) mergeAndSort() {
func (s *condensableSpanSet) maybeCondense(
ctx context.Context, riGen rangeIteratorFactory, maxBytes int64,
) bool {
if s.bytes < maxBytes {
if s.bytes <= maxBytes {
return false
}

Expand All @@ -89,7 +89,7 @@ func (s *condensableSpanSet) maybeCondense(
// nice property that it sorts the spans by start key, which we rely on
// lower in this method.
s.mergeAndSort()
if s.bytes < maxBytes {
if s.bytes <= maxBytes {
return false
}

Expand Down Expand Up @@ -175,6 +175,62 @@ func (s *condensableSpanSet) clear() {
*s = condensableSpanSet{}
}

// estimateSize returns the size that the spanSet would grow to if spans were
// added to the set. As a side-effect, the receiver might get its spans merged.
//
// The result doesn't take into consideration the effect of condensing the
// spans, but might take into consideration the effects of merging the spans
// (which is not a lossy operation): mergeThresholdBytes instructs the
// simulation to perform merging and de-duping if the size grows over this
// threshold.
func (s *condensableSpanSet) estimateSize(spans []roachpb.Span, mergeThresholdBytes int64) int64 {
var bytes int64
for _, sp := range spans {
bytes += spanSize(sp)
}
{
estimate := s.bytes + bytes
if estimate <= mergeThresholdBytes {
return estimate
}
}

// Merge and de-dupe in the hope of saving some space.

// First, merge the existing spans in-place. Doing it in-place instead of
// operating on a copy avoids the risk of quadratic behavior over a series of
// estimateSize() calls, where each call has to repeatedly merge a copy in
// order to discover that the merge saves enough space to stay under the
// threshold.
s.mergeAndSort()

// See if merging s was enough.
estimate := s.bytes + bytes
if estimate <= mergeThresholdBytes {
return estimate
}

// Try harder - merge (a copy of) the existing spans with the new spans.
spans = append(spans, s.s...)
lenBeforeMerge := len(spans)
spans, _ = roachpb.MergeSpans(&spans)
if len(spans) == lenBeforeMerge {
// Nothing changed -i.e. we failed to merge any spans.
return estimate
}
// Recompute the size.
bytes = 0
for _, sp := range spans {
bytes += spanSize(sp)
}
return bytes
}

// bytesSize returns the size of the tracked spans.
func (s *condensableSpanSet) bytesSize() int64 {
return s.bytes
}

func spanSize(sp roachpb.Span) int64 {
return int64(len(sp.Key) + len(sp.EndKey))
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/kv/kvclient/kvcoord/condensable_span_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,59 @@ func TestCondensableSpanSetMergeContiguousSpans(t *testing.T) {
s.mergeAndSort()
require.Equal(t, int64(2), s.bytes)
}

func TestCondensableSpanSetEstimateSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ab := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}
bc := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}
largeSpan := roachpb.Span{Key: roachpb.Key("ccccc"), EndKey: roachpb.Key("ddddd")}

tests := []struct {
name string
set []roachpb.Span
newSpans []roachpb.Span
mergeThreshold int64
expEstimate int64
}{
{
name: "new spans fit without merging",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab},
mergeThreshold: 100,
expEstimate: 6,
},
{
// The set gets merged, the new spans don't.
name: "set needs merging",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab},
mergeThreshold: 5,
expEstimate: 4,
},
{
// The set gets merged, and then it gets merged again with the newSpans.
name: "new spans fit without merging",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab, bc},
mergeThreshold: 5,
expEstimate: 2,
},
{
// Everything gets merged, but it still doesn't fit.
name: "new spans dont fit",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab, bc, largeSpan},
mergeThreshold: 5,
expEstimate: 12,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := condensableSpanSet{}
s.insert(tc.set...)
require.Equal(t, tc.expEstimate, s.estimateSize(tc.newSpans, tc.mergeThreshold))
})
}
}
95 changes: 95 additions & 0 deletions pkg/kv/kvclient/kvcoord/lock_spans_over_budget_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import (
"context"
"fmt"
"strconv"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/errorspb"
"github.com/gogo/protobuf/proto"
)

// lockSpansOverBudgetError signals that a txn is being rejected because lock
// spans do not fit in their memory budget.
type lockSpansOverBudgetError struct {
lockSpansBytes int64
limitBytes int64
baSummary string
txnDetails string
}

func newLockSpansOverBudgetError(
lockSpansBytes, limitBytes int64, ba roachpb.BatchRequest,
) lockSpansOverBudgetError {
return lockSpansOverBudgetError{
lockSpansBytes: lockSpansBytes,
limitBytes: limitBytes,
baSummary: ba.Summary(),
txnDetails: ba.Txn.String(),
}
}

func (l lockSpansOverBudgetError) Error() string {
return fmt.Sprintf("the transaction is locking too many rows and exceeded its lock-tracking memory budget; "+
"lock spans: %d bytes > budget: %d bytes. Request pushing transaction over the edge: %s. "+
"Transaction details: %s.", l.lockSpansBytes, l.limitBytes, l.baSummary, l.txnDetails)
}

func encodeLockSpansOverBudgetError(
_ context.Context, err error,
) (msgPrefix string, safe []string, details proto.Message) {
t := err.(lockSpansOverBudgetError)
details = &errorspb.StringsPayload{
Details: []string{
strconv.FormatInt(t.lockSpansBytes, 10), strconv.FormatInt(t.limitBytes, 10),
t.baSummary, t.txnDetails,
},
}
msgPrefix = "the transaction is locking too many rows"
return msgPrefix, nil, details
}

func decodeLockSpansOverBudgetError(
_ context.Context, msgPrefix string, safeDetails []string, payload proto.Message,
) error {
m, ok := payload.(*errorspb.StringsPayload)
if !ok || len(m.Details) < 4 {
// If this ever happens, this means some version of the library
// (presumably future) changed the payload type, and we're
// receiving this here. In this case, give up and let
// DecodeError use the opaque type.
return nil
}
lockBytes, decodeErr := strconv.ParseInt(m.Details[0], 10, 64)
if decodeErr != nil {
return nil //nolint:returnerrcheck
}
limitBytes, decodeErr := strconv.ParseInt(m.Details[1], 10, 64)
if decodeErr != nil {
return nil //nolint:returnerrcheck
}
return lockSpansOverBudgetError{
lockSpansBytes: lockBytes,
limitBytes: limitBytes,
baSummary: m.Details[2],
txnDetails: m.Details[3],
}
}

func init() {
pKey := errors.GetTypeKey(lockSpansOverBudgetError{})
errors.RegisterLeafEncoder(pKey, encodeLockSpansOverBudgetError)
errors.RegisterLeafDecoder(pKey, decodeLockSpansOverBudgetError)
}
Loading