Skip to content

Commit

Permalink
Merge #62537 #62540
Browse files Browse the repository at this point in the history
62537: sql/sem/builtins: reword follower_read_timestamp notices r=andreimatei a=andreimatei

The use of follower_read_timestamp() uses notices when either the binary
is non-CCL or when it is CCL but there's no enterprise license
activated. The notices were confusing: the non-CCL one had bad grammar
and talked about the timestamp returned and how it's "less likely" to
result in a follower read. The no license one had the wrong order for
arguments resulting in a mangled message. Both of them were trying to
narrowly refer to what the function returns and  missing the bigger
point - which is that routing to followers is disabled when there's no
license.

Now they read:

NOTICE: follower reads disabled because you are running a non-CCL distribution

NOTICE: follower reads disabled: use of follower reads requires an enterprise license. see https://cockroachlabs.com/pricing?cluster=5a077685-de30-496e-9804-77fa9cd60eb9 for details on how to enable enterprise features

Release note: None

62540: kvclient: add metrics for txns with condensed intents r=andreimatei a=andreimatei

Two new metrics - txn.condensed_intent_spans and
txn.condensed_intent_spans_gauge - tracking transactions whose intent
spans have been collapsed with a loss of fidelity because the respective
transaction has exceeded kv.transaction.max_intents_bytes. Such
transactions are a potential source of instability because resolving
their intents might cost significant CPU (and also latency => contention
footprint).

Also new logging aiming aiming to provide the IDs of some such
transactions.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Apr 3, 2021
3 parents df07592 + 8f95a6d + 1bfd8cc commit 521d568
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 174 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func newRootTxnCoordSender(
// Various interceptors below rely on sequence number allocation,
// so the sequence number allocator is near the top of the stack.
&tcs.interceptorAlloc.txnSeqNumAllocator,
// The pipelinger sits above the span refresher because it will
// The pipeliner sits above the span refresher because it will
// never generate transaction retry errors that could be avoided
// with a refresh.
&tcs.interceptorAlloc.txnPipeliner,
Expand Down Expand Up @@ -280,8 +280,10 @@ func (tc *TxnCoordSender) initCommonInterceptors(
riGen.ds = ds
}
tc.interceptorAlloc.txnPipeliner = txnPipeliner{
st: tcf.st,
riGen: riGen,
st: tcf.st,
riGen: riGen,
txnMetrics: &tc.metrics,
condensedIntentsEveryN: &tc.TxnCoordSenderFactory.condensedIntentsEveryN,
}
tc.interceptorAlloc.txnSpanRefresher = txnSpanRefresher{
st: tcf.st,
Expand Down
34 changes: 18 additions & 16 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
type TxnCoordSenderFactory struct {
log.AmbientContext

st *cluster.Settings
wrapped kv.Sender
clock *hlc.Clock
heartbeatInterval time.Duration
linearizable bool // enables linearizable behavior
stopper *stop.Stopper
metrics TxnMetrics
st *cluster.Settings
wrapped kv.Sender
clock *hlc.Clock
heartbeatInterval time.Duration
linearizable bool // enables linearizable behavior
stopper *stop.Stopper
metrics TxnMetrics
condensedIntentsEveryN log.EveryN

testingKnobs ClientTestingKnobs
}
Expand Down Expand Up @@ -62,15 +63,16 @@ func NewTxnCoordSenderFactory(
cfg TxnCoordSenderFactoryConfig, wrapped kv.Sender,
) *TxnCoordSenderFactory {
tcf := &TxnCoordSenderFactory{
AmbientContext: cfg.AmbientCtx,
st: cfg.Settings,
wrapped: wrapped,
clock: cfg.Clock,
stopper: cfg.Stopper,
linearizable: cfg.Linearizable,
heartbeatInterval: cfg.HeartbeatInterval,
metrics: cfg.Metrics,
testingKnobs: cfg.TestingKnobs,
AmbientContext: cfg.AmbientCtx,
st: cfg.Settings,
wrapped: wrapped,
clock: cfg.Clock,
stopper: cfg.Stopper,
linearizable: cfg.Linearizable,
heartbeatInterval: cfg.HeartbeatInterval,
metrics: cfg.Metrics,
condensedIntentsEveryN: log.Every(time.Second),
testingKnobs: cfg.TestingKnobs,
}
if tcf.st == nil {
tcf.st = cluster.MakeTestingClusterSettings()
Expand Down
129 changes: 0 additions & 129 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,135 +147,6 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) {
}
}

// TestTxnCoordSenderCondenseLockSpans verifies that lock spans are condensed
// along range boundaries when they exceed the maximum intent bytes threshold.
func TestTxnCoordSenderCondenseLockSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
a := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key(nil)}
b := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key(nil)}
c := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key(nil)}
d := roachpb.Span{Key: roachpb.Key("dddddd"), EndKey: roachpb.Key(nil)}
e := roachpb.Span{Key: roachpb.Key("e"), EndKey: roachpb.Key(nil)}
aToBClosed := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()}
cToEClosed := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("e").Next()}
fTof0 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("f0")}
g := roachpb.Span{Key: roachpb.Key("g"), EndKey: roachpb.Key(nil)}
g0Tog1 := roachpb.Span{Key: roachpb.Key("g0"), EndKey: roachpb.Key("g1")}
fTog1Closed := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")}
testCases := []struct {
span roachpb.Span
expLocks []roachpb.Span
expLocksSize int64
}{
{span: a, expLocks: []roachpb.Span{a}, expLocksSize: 1},
{span: b, expLocks: []roachpb.Span{a, b}, expLocksSize: 2},
{span: c, expLocks: []roachpb.Span{a, b, c}, expLocksSize: 3},
{span: d, expLocks: []roachpb.Span{a, b, c, d}, expLocksSize: 9},
// Note that c-e condenses and then lists first.
{span: e, expLocks: []roachpb.Span{cToEClosed, a, b}, expLocksSize: 5},
{span: fTof0, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0}, expLocksSize: 8},
{span: g, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0, g}, expLocksSize: 9},
{span: g0Tog1, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed}, expLocksSize: 9},
// Add a key in the middle of a span, which will get merged on commit.
{span: c, expLocks: []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}, expLocksSize: 9},
}
splits := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("f")},
{Key: roachpb.Key("f"), EndKey: roachpb.Key("j")},
}
descs := []roachpb.RangeDescriptor{testMetaRangeDescriptor}
for i, s := range splits {
descs = append(descs, roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(2 + i),
StartKey: roachpb.RKey(s.Key),
EndKey: roachpb.RKey(s.EndKey),
InternalReplicas: []roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1}},
})
}
descDB := mockRangeDescriptorDBForDescs(descs...)
s := createTestDB(t)
st := s.Store.ClusterSettings()
trackedWritesMaxSize.Override(&st.SV, 10) /* 10 bytes and it will condense */
defer s.Stop()

// Check end transaction locks, which should be condensed and split
// at range boundaries.
expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}
sendFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
resp := ba.CreateReply()
resp.Txn = ba.Txn
if req, ok := ba.GetArg(roachpb.EndTxn); ok {
if !req.(*roachpb.EndTxnRequest).Commit {
t.Errorf("expected commit to be true")
}
et := req.(*roachpb.EndTxnRequest)
if a, e := et.LockSpans, expLocks; !reflect.DeepEqual(a, e) {
t.Errorf("expected end transaction to have locks %+v; got %+v", e, a)
}
resp.Txn.Status = roachpb.COMMITTED
}
return resp, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
ds := NewDistSender(DistSenderConfig{
AmbientCtx: ambient,
Clock: s.Clock,
NodeDescs: s.Gossip,
RPCContext: s.Cfg.RPCContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(sendFn),
},
RangeDescriptorDB: descDB,
Settings: cluster.MakeTestingClusterSettings(),
})
tsf := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: s.Clock,
Stopper: s.Stopper(),
},
ds,
)
db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper())
ctx := context.Background()

txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
// Disable txn pipelining so that all write spans are immediately
// added to the transaction's lock footprint.
if err := txn.DisablePipelining(); err != nil {
t.Fatal(err)
}
for i, tc := range testCases {
if tc.span.EndKey != nil {
if err := txn.DelRange(ctx, tc.span.Key, tc.span.EndKey); err != nil {
t.Fatal(err)
}
} else {
if err := txn.Put(ctx, tc.span.Key, []byte("value")); err != nil {
t.Fatal(err)
}
}
tcs := txn.Sender().(*TxnCoordSender)
locks := tcs.interceptorAlloc.txnPipeliner.lockFootprint.asSlice()
if a, e := locks, tc.expLocks; !reflect.DeepEqual(a, e) {
t.Errorf("%d: expected keys %+v; got %+v", i, e, a)
}
locksSize := int64(0)
for _, i := range locks {
locksSize += int64(len(i.Key) + len(i.EndKey))
}
if a, e := locksSize, tc.expLocksSize; a != e {
t.Errorf("%d: keys size expected %d; got %d", i, e, a)
}
}
if err := txn.Commit(ctx); err != nil {
t.Fatal(err)
}
}

// Test that the theartbeat loop detects aborted transactions and stops.
func TestTxnCoordSenderHeartbeat(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
31 changes: 25 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ var trackedWritesMaxSize = settings.RegisterIntSetting(
// attached to any end transaction request that is passed through the pipeliner
// to ensure that they the locks within them are released.
type txnPipeliner struct {
st *cluster.Settings
riGen rangeIteratorFactory // used to condense lock spans, if provided
wrapped lockedSender
disabled bool
st *cluster.Settings
riGen rangeIteratorFactory // used to condense lock spans, if provided
wrapped lockedSender
disabled bool
txnMetrics *TxnMetrics
condensedIntentsEveryN *log.EveryN

// In-flight writes are intent point writes that have not yet been proved
// to have succeeded. They will need to be proven before the transaction
Expand Down Expand Up @@ -488,7 +490,20 @@ func (tp *txnPipeliner) updateLockTracking(
) {
// After adding new writes to the lock footprint, check whether we need to
// condense the set to stay below memory limits.
defer tp.lockFootprint.maybeCondense(ctx, tp.riGen, trackedWritesMaxSize.Get(&tp.st.SV))
defer func() {
alreadyCondensed := tp.lockFootprint.condensed
condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, trackedWritesMaxSize.Get(&tp.st.SV))
if condensed && !alreadyCondensed {
if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx,
"a transaction has hit the intent tracking limit (kv.transaction.max_intents_bytes); "+
"is it a bulk operation? Intent cleanup will be slower. txn: %s ba: %s",
ba.Txn, ba.Summary())
}
tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
}
}()

// If the request failed, add all lock acquisitions attempts directly to the
// lock footprint. This reduces the likelihood of dangling locks blocking
Expand Down Expand Up @@ -682,7 +697,11 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
}

// closeLocked implements the txnInterceptor interface.
func (tp *txnPipeliner) closeLocked() {}
func (tp *txnPipeliner) closeLocked() {
if tp.lockFootprint.condensed {
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Dec(1)
}
}

// hasAcquiredLocks returns whether the interceptor has made an attempt to
// acquire any locks, whether doing so was known to be successful or not.
Expand Down
Loading

0 comments on commit 521d568

Please sign in to comment.