Skip to content

Commit

Permalink
rangefeed: add kv.rangefeed.push_txns.enabled
Browse files Browse the repository at this point in the history
This patch adds `kv.rangefeed.push_txns.enabled`, which can be used to
disable rangefeed txn pushes, e.g. if they cause excessive contention.
The cluster setting is meant to be used temporarily, since disabling txn
pushes can cause the rangefeed resolved timestamp to fall arbitrarily
far behind.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Oct 30, 2023
1 parent 52d6cbe commit 9640975
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/admission",
Expand Down
26 changes: 19 additions & 7 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/future"
Expand All @@ -38,6 +40,15 @@ var (
// consider a transaction old enough to push.
defaultPushTxnsAge = envutil.EnvOrDefaultDuration(
"COCKROACH_RANGEFEED_PUSH_TXNS_AGE", 10*time.Second)

// PushTxnsEnabled can be used to disable rangefeed txn pushes, typically to
// temporarily alleviate contention.
PushTxnsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.rangefeed.push_txns.enabled",
"periodically push txn write timestamps to advance rangefeed resolved timestamps",
true,
)
)

// newErrBufferCapacityExceeded creates an error that is returned to subscribers
Expand All @@ -52,10 +63,11 @@ func newErrBufferCapacityExceeded() *kvpb.Error {
// Config encompasses the configuration required to create a Processor.
type Config struct {
log.AmbientContext
Clock *hlc.Clock
Stopper *stop.Stopper
RangeID roachpb.RangeID
Span roachpb.RSpan
Clock *hlc.Clock
Stopper *stop.Stopper
Settings *cluster.Settings
RangeID roachpb.RangeID
Span roachpb.RSpan

TxnPusher TxnPusher
// PushTxnsInterval specifies the interval at which a Processor will push
Expand Down Expand Up @@ -463,9 +475,9 @@ func (p *LegacyProcessor) run(

// Check whether any unresolved intents need a push.
case <-txnPushTickerC:
// Don't perform transaction push attempts until the resolved timestamp
// has been initialized, or if we're not tracking any intents.
if !p.rts.IsInit() || p.rts.intentQ.Len() == 0 {
// Don't perform transaction push attempts if disabled, until the resolved
// timestamp has been initialized, or if we're not tracking any intents.
if !PushTxnsEnabled.Get(&p.Settings.SV) || !p.rts.IsInit() || p.rts.intentQ.Len() == 0 {
continue
}

Expand Down
82 changes: 78 additions & 4 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func writeIntentOpWithDetails(
})
}

func writeIntentOpFromMeta(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(
txn.ID, txn.Key, txn.IsoLevel, txn.MinTimestamp, txn.WriteTimestamp)
}

func writeIntentOpWithKey(
txnID uuid.UUID, key []byte, iso isolation.Level, ts hlc.Timestamp,
) enginepb.MVCCLogicalOp {
Expand Down Expand Up @@ -291,6 +296,19 @@ func withSpan(span roachpb.RSpan) option {
}
}

func withSettings(st *cluster.Settings) option {
return func(config *testConfig) {
config.Settings = st
}
}

func withPushTxnsIntervalAge(interval, age time.Duration) option {
return func(config *testConfig) {
config.PushTxnsInterval = interval
config.PushTxnsAge = age
}
}

// blockingScanner is a test intent scanner that allows test to track lifecycle
// of tasks.
// 1. it will always block on startup and will wait for block to be closed to
Expand Down Expand Up @@ -344,11 +362,13 @@ func newTestProcessor(
) (Processor, *processorTestHelper, *stop.Stopper) {
t.Helper()
stopper := stop.NewStopper()
st := cluster.MakeTestingClusterSettings()

cfg := testConfig{
Config: Config{
RangeID: 2,
Stopper: stopper,
Settings: st,
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: hlc.NewClockForTesting(nil),
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
Expand Down Expand Up @@ -1096,10 +1116,6 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
defer stopper.Stop(ctx)

// Add a few intents and move the closed timestamp forward.
writeIntentOpFromMeta := func(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(txn.ID, txn.Key, txn.IsoLevel, txn.MinTimestamp,
txn.WriteTimestamp)
}
p.ConsumeLogicalOps(ctx,
writeIntentOpFromMeta(txn1Meta),
writeIntentOpFromMeta(txn2Meta),
Expand Down Expand Up @@ -1171,6 +1187,64 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
})
}

// TestProcessorTxnPushDisabled tests that processors don't attempt txn pushes
// when disabled.
func TestProcessorTxnPushDisabled(t *testing.T) {
defer leaktest.AfterTest(t)()

const pushInterval = 10 * time.Millisecond

// Set up a txn to write intents.
ts := hlc.Timestamp{WallTime: 10}
txnID := uuid.MakeV4()
txnMeta := enginepb.TxnMeta{
ID: txnID,
Key: keyA,
IsoLevel: isolation.Serializable,
WriteTimestamp: ts,
MinTimestamp: ts,
}

// Disable txn pushes.
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
PushTxnsEnabled.Override(ctx, &st.SV, false)

// Set up a txn pusher and processor that errors on any pushes.
//
// TODO(kv): We don't test the scheduled processor here, since the setting
// instead controls the Store.startRangefeedTxnPushNotifier() loop which sits
// outside of the processor and can't be tested with this test harness. Write
// a new test when the legacy processor is removed and the scheduled processor
// is used by default.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts)
t.Errorf("%v", err)
return nil, err
})

p, h, stopper := newTestProcessor(t, withSettings(st), withPusher(&tp),
withPushTxnsIntervalAge(pushInterval, time.Millisecond))
defer stopper.Stop(ctx)

// Move the resolved ts forward to just before the txn timestamp.
rts := ts.Add(-1, 0)
require.True(t, p.ForwardClosedTS(ctx, rts))
h.syncEventC()
require.Equal(t, rts, h.rts.Get())

// Add a few intents and move the closed timestamp forward.
p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta))
p.ForwardClosedTS(ctx, ts)
h.syncEventC()
require.Equal(t, rts, h.rts.Get())

// Wait for 10x the push txns interval, to make sure pushes are disabled.
// Waiting for something to not happen is a bit smelly, but gets the job done.
time.Sleep(10 * pushInterval)
}

// TestProcessorConcurrentStop tests that all methods in Processor's API
// correctly handle the processor concurrently shutting down. If they did
// not then it would be possible for them to deadlock.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
AmbientContext: r.AmbientContext,
Clock: r.Clock(),
Stopper: r.store.stopper,
Settings: r.store.ClusterSettings(),
RangeID: r.RangeID,
Span: desc.RSpan(),
TxnPusher: &tp,
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,9 @@ func (s *Store) startRangefeedTxnPushNotifier(ctx context.Context) {
for {
select {
case <-ticker.C:
if !rangefeed.PushTxnsEnabled.Get(&s.ClusterSettings().SV) {
continue
}
batch := makeSchedulerBatch()
s.rangefeedScheduler.EnqueueBatch(batch, rangefeed.PushTxnQueued)
batch.Close()
Expand Down

0 comments on commit 9640975

Please sign in to comment.