Skip to content

Commit

Permalink
Merge pull request #118411 from erikgrinaker/backport23.2-117859
Browse files Browse the repository at this point in the history
release-23.2: rangefeed: cancel async tasks on processor stop
  • Loading branch information
erikgrinaker authored Feb 1, 2024
2 parents 1ca40c4 + de2a001 commit 15202c8
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 39 deletions.
111 changes: 88 additions & 23 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,26 @@ func (h *processorTestHelper) syncEventAndRegistrationsSpan(span roachpb.Span) {
h.sendSpanSync(&span)
}

func (h *processorTestHelper) triggerTxnPushUntilPushed(
t *testing.T, ackC chan struct{}, timeout time.Duration,
) {
if h.scheduler != nil {
deadline := time.After(timeout)
for {
// triggerTxnPushUntilPushed will schedule PushTxnQueued events until pushedC
// indicates that a transaction push attempt has started by posting an event.
// If a push does not happen in 10 seconds, the attempt fails.
func (h *processorTestHelper) triggerTxnPushUntilPushed(t *testing.T, pushedC <-chan struct{}) {
timeoutC := time.After(10 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
if h.scheduler != nil {
h.scheduler.Enqueue(PushTxnQueued)
select {
case <-deadline:
t.Fatal("failed to get txn push notification")
case ackC <- struct{}{}:
return
case <-time.After(100 * time.Millisecond):
// We keep sending events to avoid the situation where event arrives
// but flag indicating that push is still running is not reset.
}
}
} else {
ackC <- struct{}{}
select {
case <-pushedC:
return
case <-ticker.C:
// We keep sending events to avoid the situation where event arrives
// but flag indicating that push is still running is not reset.
case <-timeoutC:
t.Fatal("failed to get txn push notification")
}
}
}

Expand Down Expand Up @@ -1081,7 +1082,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Create a TxnPusher that performs assertions during the first 3 uses.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
// The txns are not in a sorted order. Enforce one.
sort.Slice(txns, func(i, j int) bool {
return bytes.Compare(txns[i].Key, txns[j].Key) < 0
Expand Down Expand Up @@ -1134,7 +1137,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
return nil
}

<-pausePushAttemptsC
pausePushAttemptsC <- struct{}{}
<-resumePushAttemptsC
return nil
})
Expand All @@ -1155,7 +1158,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get())

// Wait for the first txn push attempt to complete.
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second)
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC)

// The resolved timestamp hasn't moved.
h.syncEventC()
Expand All @@ -1169,7 +1172,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Unblock the second txn push attempt and wait for it to complete.
resumePushAttemptsC <- struct{}{}
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second)
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC)

// The resolved timestamp should have moved forwards to the closed
// timestamp.
Expand All @@ -1193,7 +1196,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Unblock the third txn push attempt and wait for it to complete.
resumePushAttemptsC <- struct{}{}
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second)
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC)

// The resolved timestamp should have moved forwards to the closed
// timestamp.
Expand Down Expand Up @@ -1246,7 +1249,9 @@ func TestProcessorTxnPushDisabled(t *testing.T) {
// a new test when the legacy processor is removed and the scheduled processor
// is used by default.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts)
t.Errorf("%v", err)
return nil, err
Expand Down Expand Up @@ -1797,3 +1802,63 @@ func TestProcessorBackpressure(t *testing.T) {
},
}, events[len(events)-1])
}

// TestProcessorContextCancellation tests that the processor cancels the
// contexts of async tasks when stopped. It does not, however, cancel the
// process() context -- it probably should, but this should arguably also be
// handled by the scheduler.
func TestProcessorContextCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) {
// Try stopping both via the stopper and via Processor.Stop().
testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) {

// Set up a transaction to push.
txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp
txnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS}

// Set up a transaction pusher that will block until the context cancels.
pushReadyC := make(chan struct{})
pushDoneC := make(chan struct{})

var pusher testTxnPusher
pusher.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
pushReadyC <- struct{}{}
<-ctx.Done()
close(pushDoneC)
return nil, ctx.Err()
})
pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
return nil
})

// Start a test processor.
p, h, stopper := newTestProcessor(t, withPusher(&pusher), withProcType(pt))
ctx := context.Background()
defer stopper.Stop(ctx)

// Add an intent and move the closed timestamp past it. This should trigger a
// txn push attempt, wait for that to happen.
p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta))
p.ForwardClosedTS(ctx, txnTS.Add(1, 0))
h.syncEventC()
h.triggerTxnPushUntilPushed(t, pushReadyC)

// Now, stop the processor, and wait for the txn pusher to exit.
if useStopper {
stopper.Stop(ctx)
} else {
p.Stop()
}
select {
case <-pushDoneC:
case <-time.After(3 * time.Second):
t.Fatal("txn pusher did not exit")
}
})
})
}
32 changes: 22 additions & 10 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,27 @@ type ScheduledProcessor struct {

// processCtx is the annotated background context used for process(). It is
// stored here to avoid reconstructing it on every call.
processCtx context.Context
processCtx context.Context
// taskCtx is the context used to spawn async tasks (e.g. the txn pusher),
// along with its cancel function which is called when the processor stops or
// the stopper quiesces. It is independent of processCtx, and constructed
// during Start().
//
// TODO(erikgrinaker): the context handling here should be cleaned up.
// processCtx should be passed in from the scheduler and propagate stopper
// quiescence, and the async tasks should probably be run on scheduler
// threads or at least a separate bounded worker pool. But this will do for
// now.
taskCtx context.Context
taskCancel func()

requestQueue chan request
eventC chan *event
// If true, processor is not processing data anymore and waiting for registrations
// to be complete.
stopping bool
stoppedC chan struct{}

// Processor startup runs background tasks to scan intents. If processor is
// stopped early, this task needs to be terminated to avoid resource waste.
startupCancel func()
// stopper passed by start that is used for firing up async work from scheduler.
stopper *stop.Stopper
txnPushActive bool
Expand Down Expand Up @@ -94,9 +104,9 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor {
func (p *ScheduledProcessor) Start(
stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor,
) error {
ctx := p.Config.AmbientContext.AnnotateCtx(context.Background())
ctx, p.startupCancel = context.WithCancel(ctx)
p.stopper = stopper
p.taskCtx, p.taskCancel = p.stopper.WithCancelOnQuiesce(
p.Config.AmbientContext.AnnotateCtx(context.Background()))

// Note that callback registration must be performed before starting resolved
// timestamp init because resolution posts resolvedTS event when it is done.
Expand All @@ -112,13 +122,14 @@ func (p *ScheduledProcessor) Start(
initScan := newInitResolvedTSScan(p.Span, p, rtsIter)
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks during start.
if err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run); err != nil {
err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
initScan.Cancel()
p.scheduler.StopProcessor()
return err
}
} else {
p.initResolvedTS(ctx)
p.initResolvedTS(p.taskCtx)
}

p.Metrics.RangeFeedProcessorsScheduler.Inc(1)
Expand Down Expand Up @@ -203,7 +214,7 @@ func (p *ScheduledProcessor) processPushTxn(ctx context.Context) {
p.txnPushActive = true
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks for push.
err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run)
err := p.stopper.RunAsyncTask(p.taskCtx, "rangefeed: pushing old txns", pushTxns.Run)
if err != nil {
pushTxns.Cancel()
}
Expand Down Expand Up @@ -231,7 +242,7 @@ func (p *ScheduledProcessor) cleanup() {
// Unregister callback from scheduler
p.scheduler.Unregister()

p.startupCancel()
p.taskCancel()
close(p.stoppedC)
p.MemBudget.Close(context.Background())
}
Expand Down Expand Up @@ -343,6 +354,7 @@ func (p *ScheduledProcessor) Register(
}
}
}
// NB: use ctx, not p.taskCtx, as the registry handles teardown itself.
if err := p.Stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
// If we can't schedule internally, processor is already stopped which
// could only happen on shutdown. Disconnect stream and just remove
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (s *initResolvedTSScan) Run(ctx context.Context) {
defer s.Cancel()
if err := s.iterateAndConsume(ctx); err != nil {
err = errors.Wrap(err, "initial resolved timestamp scan failed")
log.Errorf(ctx, "%v", err)
if ctx.Err() == nil { // cancellation probably caused the error
log.Errorf(ctx, "%v", err)
}
s.p.StopWithErr(kvpb.NewError(err))
} else {
// Inform the processor that its resolved timestamp can be initialized.
Expand Down Expand Up @@ -227,7 +229,9 @@ func newTxnPushAttempt(
func (a *txnPushAttempt) Run(ctx context.Context) {
defer a.Cancel()
if err := a.pushOldTxns(ctx); err != nil {
log.Errorf(ctx, "pushing old intents failed: %v", err)
if ctx.Err() == nil { // cancellation probably caused the error
log.Errorf(ctx, "pushing old intents failed: %v", err)
}
}
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,22 +358,22 @@ func TestInitResolvedTSScan(t *testing.T) {
}

type testTxnPusher struct {
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error
}

func (tp *testTxnPusher) PushTxns(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
return tp.pushTxnsFn(txns, ts)
return tp.pushTxnsFn(ctx, txns, ts)
}

func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error {
return tp.resolveIntentsFn(ctx, intents)
}

func (tp *testTxnPusher) mockPushTxns(
fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
) {
tp.pushTxnsFn = fn
}
Expand Down Expand Up @@ -430,7 +430,9 @@ func TestTxnPushAttempt(t *testing.T) {

// Run a txnPushAttempt.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
require.Equal(t, 4, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
Expand Down

0 comments on commit 15202c8

Please sign in to comment.