From 3aea858404477b1c25eab87969ef99f78709cf86 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 1 Mar 2021 12:18:05 -0500 Subject: [PATCH] kvserver: plug a tracing span leak Fixes #60677, removing a stop-gap introduced in #59992. We were previously leaking "async consensus" spans, which was possible when a given proposal was never flushed out of the replica's proposal buffer. On server shut down, this buffered proposal was never finished, and thus the embedded span never closed. We now add a closer to clean up after ourselves. Release justification: bug fixes and low-risk updates to new functionality. Release note: None --- pkg/kv/kvserver/replica_raft.go | 7 ------- pkg/kv/kvserver/store_raft.go | 20 ++++++++++++++++++++ pkg/testutils/testcluster/testcluster.go | 2 +- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e1ffbafea801..b4efebf06e09 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -135,13 +135,6 @@ func (r *Replica) evalAndPropose( // Fork the proposal's context span so that the proposal's context // can outlive the original proposer's context. proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus") - { - // This span sometimes leaks. Disable it for the time being. - // - // Tracked in: https://github.com/cockroachdb/cockroach/issues/60677 - proposal.sp.Finish() - proposal.sp = nil - } // Signal the proposal's response channel immediately. reply := *proposal.Local.Reply diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 9c50aa81c8f6..659521311465 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -617,6 +617,26 @@ func (s *Store) processRaft(ctx context.Context) { s.stopper.AddCloser(stop.CloserFn(func() { s.cfg.Transport.Stop(s.StoreID()) })) + + // We'll want to cancel all in-flight proposals. Proposals embed tracing + // spans in them, and we don't want to be leaking any. + s.stopper.AddCloser(stop.CloserFn(func() { + s.VisitReplicas(func(r *Replica) (more bool) { + r.mu.proposalBuf.FlushLockedWithoutProposing(ctx) + r.mu.Lock() + for k, prop := range r.mu.proposals { + delete(r.mu.proposals, k) + prop.finishApplication( + context.Background(), + proposalResult{ + Err: roachpb.NewError(roachpb.NewAmbiguousResultError("store is stopping")), + }, + ) + } + r.mu.Unlock() + return true + }) + })) } func (s *Store) raftTickLoop(ctx context.Context) { diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 90d618462ae6..5b5e551b3968 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -142,7 +142,7 @@ func (tc *TestCluster) stopServers(ctx context.Context) { return nil } var buf strings.Builder - buf.WriteString("unexpectedly found active spans:\n") + fmt.Fprintf(&buf, "unexpectedly found %d active spans:\n", len(sps)) for _, sp := range sps { fmt.Fprintln(&buf, sp.GetRecording()) fmt.Fprintln(&buf)