Skip to content

Commit

Permalink
rangefeed: fix kv.rangefeed.registrations metric
Browse files Browse the repository at this point in the history
Previously when rangefeed processor was stopped by replica,
registrations metric was not correctly decreased leading to
metric creep on changefeed restarts or range splits.
This commit fixes metric decrease for such cases.

Epic: none
Fixes: #106126

Release note: None
  • Loading branch information
aliher1911 committed Sep 20, 2023
1 parent a499165 commit cdafd14
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,13 @@ func (p *LegacyProcessor) run(

// Close registrations and exit when signaled.
case pErr := <-p.stopC:
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectAllOnShutdown(pErr)
return

// Exit on stopper.
case <-stopper.ShouldQuiesce():
pErr := kvpb.NewError(&kvpb.NodeUnavailableError{})
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectAllOnShutdown(pErr)
return
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,18 @@ func (reg *registry) Unregister(ctx context.Context, r *registration) {
r.drainAllocations(ctx)
}

// DisconnectAllOnShutdown disconnectes all registrations on processor shutdown.
// This is different from normal disconnect as registrations won't be able to
// perform Unregister when processor's work loop is already terminated.
// This method will cleanup metrics controlled by registry itself beside posting
// errors to registrations.
// TODO: this should be revisited as part of
// https://github.com/cockroachdb/cockroach/issues/110634
func (reg *registry) DisconnectAllOnShutdown(pErr *kvpb.Error) {
reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len()))
reg.DisconnectWithErr(all, pErr)
}

// Disconnect disconnects all registrations that overlap the specified span with
// a nil error.
func (reg *registry) Disconnect(span roachpb.Span) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,23 @@ func TestRegistrationString(t *testing.T) {
require.Equal(t, tc.exp, tc.r.String())
}
}

// TestRegistryShutdown test verifies that when we shutdown registry with
// existing registration, registration won't try to update any metrics
// implicitly.
func TestRegistryShutdownMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
reg := makeRegistry(NewMetrics())

regDoneC := make(chan interface{})
r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false)
go func() {
r.runOutputLoop(context.Background(), 0)
close(regDoneC)
}()
reg.Register(&r.registration)

reg.DisconnectAllOnShutdown(nil)
<-regDoneC
require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop")
}
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,16 @@ func (p *ScheduledProcessor) processStop() {
}

func (p *ScheduledProcessor) cleanup() {
// Cleanup is called when all registrations were already disconnected prior to
// triggering processor stop (or before we accepted first registration if we
// failed to start).
// However, we want some defence in depth if lifecycle bug will allow shutdown
// before registrations are disconnected and drained. To handle that we will
// perform disconnect so that registrations have a chance to stop their work
// loop and terminate. This would at least trigger a warning that we are using
// memory budget already released by processor.
// Cleanup is normally called when all registrations are disconnected and
// unregistered or were not created yet (processor start failure).
// However, there's a case where processor is stopped by replica action while
// registrations are still active. In that case registrations won't have a
// chance to unregister themselves after their work loop terminates because
// processor is already disconnected from scheduler.
// To avoid leaking any registry resources and metrics, processor performs
// explicit registry termination in that case.
pErr := kvpb.NewError(&kvpb.NodeUnavailableError{})
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectAllOnShutdown(pErr)

// Unregister callback from scheduler
p.scheduler.Unregister()
Expand Down

0 comments on commit cdafd14

Please sign in to comment.