From cdafd14d854251afc852c6c87ab136ebed30032f Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Wed, 20 Sep 2023 13:25:59 +0100 Subject: [PATCH] rangefeed: fix kv.rangefeed.registrations metric Previously when rangefeed processor was stopped by replica, registrations metric was not correctly decreased leading to metric creep on changefeed restarts or range splits. This commit fixes metric decrease for such cases. Epic: none Fixes: #106126 Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 4 ++-- pkg/kv/kvserver/rangefeed/registry.go | 12 +++++++++++ pkg/kv/kvserver/rangefeed/registry_test.go | 20 +++++++++++++++++++ .../kvserver/rangefeed/scheduled_processor.go | 18 ++++++++--------- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 00587f161081..aaef5aa22f18 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -509,13 +509,13 @@ func (p *LegacyProcessor) run( // Close registrations and exit when signaled. case pErr := <-p.stopC: - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) return // Exit on stopper. case <-stopper.ShouldQuiesce(): pErr := kvpb.NewError(&kvpb.NodeUnavailableError{}) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) return } } diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..55dac7775dd3 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -512,6 +512,18 @@ func (reg *registry) Unregister(ctx context.Context, r *registration) { r.drainAllocations(ctx) } +// DisconnectAllOnShutdown disconnectes all registrations on processor shutdown. +// This is different from normal disconnect as registrations won't be able to +// perform Unregister when processor's work loop is already terminated. +// This method will cleanup metrics controlled by registry itself beside posting +// errors to registrations. +// TODO: this should be revisited as part of +// https://github.com/cockroachdb/cockroach/issues/110634 +func (reg *registry) DisconnectAllOnShutdown(pErr *kvpb.Error) { + reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len())) + reg.DisconnectWithErr(all, pErr) +} + // Disconnect disconnects all registrations that overlap the specified span with // a nil error. func (reg *registry) Disconnect(span roachpb.Span) { diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..118a617091c1 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -591,3 +591,23 @@ func TestRegistrationString(t *testing.T) { require.Equal(t, tc.exp, tc.r.String()) } } + +// TestRegistryShutdown test verifies that when we shutdown registry with +// existing registration, registration won't try to update any metrics +// implicitly. +func TestRegistryShutdownMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + reg := makeRegistry(NewMetrics()) + + regDoneC := make(chan interface{}) + r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false) + go func() { + r.runOutputLoop(context.Background(), 0) + close(regDoneC) + }() + reg.Register(&r.registration) + + reg.DisconnectAllOnShutdown(nil) + <-regDoneC + require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop") +} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 112817a50d9b..45314c9825e9 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -215,16 +215,16 @@ func (p *ScheduledProcessor) processStop() { } func (p *ScheduledProcessor) cleanup() { - // Cleanup is called when all registrations were already disconnected prior to - // triggering processor stop (or before we accepted first registration if we - // failed to start). - // However, we want some defence in depth if lifecycle bug will allow shutdown - // before registrations are disconnected and drained. To handle that we will - // perform disconnect so that registrations have a chance to stop their work - // loop and terminate. This would at least trigger a warning that we are using - // memory budget already released by processor. + // Cleanup is normally called when all registrations are disconnected and + // unregistered or were not created yet (processor start failure). + // However, there's a case where processor is stopped by replica action while + // registrations are still active. In that case registrations won't have a + // chance to unregister themselves after their work loop terminates because + // processor is already disconnected from scheduler. + // To avoid leaking any registry resources and metrics, processor performs + // explicit registry termination in that case. pErr := kvpb.NewError(&kvpb.NodeUnavailableError{}) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) // Unregister callback from scheduler p.scheduler.Unregister()