From fcb4370033f11d3634b61e45a118eb0e4d78d6c8 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 8 Mar 2023 19:11:48 -0500 Subject: [PATCH 1/2] kvcoord: Close done context if init fails When we close mux rangefeed init context due to an error, it is important to also close client done context so that this error can be propagated to the caller. Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index fb97943a36d0..38086a99c1ff 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -248,8 +248,9 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( ms.initCtx.close(err) if err == nil { - doneCtx.close(m.receiveEventsFromNode(ctx, ms)) + err = m.receiveEventsFromNode(ctx, ms) } + doneCtx.close(err) // We propagated error to the caller via init/done context. return nil //nolint:returnerrcheck @@ -289,9 +290,13 @@ func (m *rangefeedMuxer) demuxLoop(ctx context.Context) (retErr error) { case <-ctx.Done(): return ctx.Err() case producer.eventCh <- &e.RangeFeedEvent: + case <-producer.callerCtx.Done(): + if log.V(1) { + log.Infof(ctx, "received stray event, but caller exited: stream=%d: e=%v", e.StreamID, e) + } case <-producer.muxClientCtx.Done(): if log.V(1) { - log.Infof(ctx, "received stray event stream %d: %v", e.StreamID, e) + log.Infof(ctx, "received stray event, but node mux exited: stream=%d: e=%v", e.StreamID, e) } } } From bcbf16e16a130ed2d6079812089f05a6039e904b Mon Sep 17 00:00:00 2001 From: Jeff Date: Sun, 12 Mar 2023 18:54:20 +0000 Subject: [PATCH 2/2] upgradejob: initialize the kvdb on TenantDeps The KVDB field was created as part of PR #93218, but it was never initialized. Part of #94843 Release Note: None --- pkg/upgrade/upgradejob/upgrade_job.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/upgrade/upgradejob/upgrade_job.go b/pkg/upgrade/upgradejob/upgrade_job.go index b1b199a6db52..a790b1c77e71 100644 --- a/pkg/upgrade/upgradejob/upgrade_job.go +++ b/pkg/upgrade/upgradejob/upgrade_job.go @@ -91,6 +91,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { Codec: execCtx.ExecCfg().Codec, Settings: execCtx.ExecCfg().Settings, DB: execCtx.ExecCfg().InternalDB, + KVDB: execCtx.ExecCfg().DB, LeaseManager: execCtx.ExecCfg().LeaseManager, InternalExecutor: ex, JobRegistry: execCtx.ExecCfg().JobRegistry,