Skip to content

Commit

Permalink
Merge #98268 #98489
Browse files Browse the repository at this point in the history
98268: kvcoord: Close done context if init fails r=miretskiy a=miretskiy

When we close mux rangefeed init context due to an error, it is important to also close client done context so that this error can be propagated to the caller.

Epic: None

Release note: None

98489: upgradejob: initialize the kvdb on TenantDeps r=JeffSwenson a=JeffSwenson

The KVDB field was created as part of PR #93218, but it was never initialized.

Part of #94843

Release Note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Jeff <[email protected]>
  • Loading branch information
3 people committed Mar 13, 2023
3 parents f44ffdb + fcb4370 + bcbf16e commit 9ed78bf
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,9 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
ms.initCtx.close(err)

if err == nil {
doneCtx.close(m.receiveEventsFromNode(ctx, ms))
err = m.receiveEventsFromNode(ctx, ms)
}
doneCtx.close(err)

// We propagated error to the caller via init/done context.
return nil //nolint:returnerrcheck
Expand Down Expand Up @@ -289,9 +290,13 @@ func (m *rangefeedMuxer) demuxLoop(ctx context.Context) (retErr error) {
case <-ctx.Done():
return ctx.Err()
case producer.eventCh <- &e.RangeFeedEvent:
case <-producer.callerCtx.Done():
if log.V(1) {
log.Infof(ctx, "received stray event, but caller exited: stream=%d: e=%v", e.StreamID, e)
}
case <-producer.muxClientCtx.Done():
if log.V(1) {
log.Infof(ctx, "received stray event stream %d: %v", e.StreamID, e)
log.Infof(ctx, "received stray event, but node mux exited: stream=%d: e=%v", e.StreamID, e)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/upgrade/upgradejob/upgrade_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error {
Codec: execCtx.ExecCfg().Codec,
Settings: execCtx.ExecCfg().Settings,
DB: execCtx.ExecCfg().InternalDB,
KVDB: execCtx.ExecCfg().DB,
LeaseManager: execCtx.ExecCfg().LeaseManager,
InternalExecutor: ex,
JobRegistry: execCtx.ExecCfg().JobRegistry,
Expand Down

0 comments on commit 9ed78bf

Please sign in to comment.