diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index fb97943a36d0..38086a99c1ff 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -248,8 +248,9 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( ms.initCtx.close(err) if err == nil { - doneCtx.close(m.receiveEventsFromNode(ctx, ms)) + err = m.receiveEventsFromNode(ctx, ms) } + doneCtx.close(err) // We propagated error to the caller via init/done context. return nil //nolint:returnerrcheck @@ -289,9 +290,13 @@ func (m *rangefeedMuxer) demuxLoop(ctx context.Context) (retErr error) { case <-ctx.Done(): return ctx.Err() case producer.eventCh <- &e.RangeFeedEvent: + case <-producer.callerCtx.Done(): + if log.V(1) { + log.Infof(ctx, "received stray event, but caller exited: stream=%d: e=%v", e.StreamID, e) + } case <-producer.muxClientCtx.Done(): if log.V(1) { - log.Infof(ctx, "received stray event stream %d: %v", e.StreamID, e) + log.Infof(ctx, "received stray event, but node mux exited: stream=%d: e=%v", e.StreamID, e) } } } diff --git a/pkg/upgrade/upgradejob/upgrade_job.go b/pkg/upgrade/upgradejob/upgrade_job.go index b1b199a6db52..a790b1c77e71 100644 --- a/pkg/upgrade/upgradejob/upgrade_job.go +++ b/pkg/upgrade/upgradejob/upgrade_job.go @@ -91,6 +91,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { Codec: execCtx.ExecCfg().Codec, Settings: execCtx.ExecCfg().Settings, DB: execCtx.ExecCfg().InternalDB, + KVDB: execCtx.ExecCfg().DB, LeaseManager: execCtx.ExecCfg().LeaseManager, InternalExecutor: ex, JobRegistry: execCtx.ExecCfg().JobRegistry,