From 6fcbdfcd9ac29420751b9e9f96350092028d23dc Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 24 Jan 2024 10:09:33 +0000 Subject: [PATCH 1/5] rangefeed: use log.Fatalf instead of panic This will include the log tags from the context, aiding with debugging. Involves a bunch of context plumbing, and removing some assertion tests. A couple of panics were left in, where it didn't appear natural to plumb through a context. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/budget.go | 5 +- pkg/kv/kvserver/rangefeed/processor.go | 10 +- pkg/kv/kvserver/rangefeed/registry.go | 83 ++++++------ pkg/kv/kvserver/rangefeed/registry_test.go | 126 +++++------------- .../kvserver/rangefeed/resolved_timestamp.go | 43 +++--- .../rangefeed/resolved_timestamp_test.go | 69 +++++----- .../kvserver/rangefeed/scheduled_processor.go | 26 ++-- 7 files changed, 151 insertions(+), 211 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/budget.go b/pkg/kv/kvserver/rangefeed/budget.go index 26ea58bbe8fe..03b2a4f9db1c 100644 --- a/pkg/kv/kvserver/rangefeed/budget.go +++ b/pkg/kv/kvserver/rangefeed/budget.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -249,10 +250,10 @@ type SharedBudgetAllocation struct { // Use increases usage count for the allocation. It should be called by each // new consumer that plans to retain allocation after returning to a caller // that passed this allocation. -func (a *SharedBudgetAllocation) Use() { +func (a *SharedBudgetAllocation) Use(ctx context.Context) { if a != nil { if atomic.AddInt32(&a.refCount, 1) == 1 { - panic("unexpected shared memory allocation usage increase after free") + log.Fatalf(ctx, "unexpected shared memory allocation usage increase after free") } } } diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 58f95cd1b97e..cc4e408a9c05 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -112,14 +112,8 @@ type Config struct { // SetDefaults initializes unset fields in Config to values // suitable for use by a Processor. func (sc *Config) SetDefaults() { - if sc.TxnPusher == nil { - if sc.PushTxnsAge != 0 { - panic("nil TxnPusher with non-zero PushTxnsAge") - } - } else { - if sc.PushTxnsAge == 0 { - sc.PushTxnsAge = defaultPushTxnsAge - } + if sc.PushTxnsAge == 0 { + sc.PushTxnsAge = defaultPushTxnsAge } } diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 005ba4ec66d1..eb605cf94151 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -152,15 +152,15 @@ func newRegistration( func (r *registration) publish( ctx context.Context, event *kvpb.RangeFeedEvent, alloc *SharedBudgetAllocation, ) { - r.validateEvent(event) - e := getPooledSharedEvent(sharedEvent{event: r.maybeStripEvent(event), alloc: alloc}) + r.assertEvent(ctx, event) + e := getPooledSharedEvent(sharedEvent{event: r.maybeStripEvent(ctx, event), alloc: alloc}) r.mu.Lock() defer r.mu.Unlock() if r.mu.overflowed { return } - alloc.Use() + alloc.Use(ctx) select { case r.buf <- e: r.mu.caughtUp = false @@ -188,43 +188,42 @@ func (r *registration) publish( } } -// validateEvent checks that the event contains enough information for the -// registation. -func (r *registration) validateEvent(event *kvpb.RangeFeedEvent) { +// assertEvent asserts that the event contains the necessary data. +func (r *registration) assertEvent(ctx context.Context, event *kvpb.RangeFeedEvent) { switch t := event.GetValue().(type) { case *kvpb.RangeFeedValue: if t.Key == nil { - panic(fmt.Sprintf("unexpected empty RangeFeedValue.Key: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedValue.Key: %v", t) } if t.Value.RawBytes == nil { - panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.RawBytes: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.RawBytes: %v", t) } if t.Value.Timestamp.IsEmpty() { - panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.Timestamp: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.Timestamp: %v", t) } case *kvpb.RangeFeedCheckpoint: if t.Span.Key == nil { - panic(fmt.Sprintf("unexpected empty RangeFeedCheckpoint.Span.Key: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedCheckpoint.Span.Key: %v", t) } case *kvpb.RangeFeedSSTable: if len(t.Data) == 0 { - panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Data: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Data: %v", t) } if len(t.Span.Key) == 0 { - panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Span: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Span: %v", t) } if t.WriteTS.IsEmpty() { - panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Timestamp: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Timestamp: %v", t) } case *kvpb.RangeFeedDeleteRange: if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 { - panic(fmt.Sprintf("unexpected empty key in RangeFeedDeleteRange.Span: %v", t)) + log.Fatalf(ctx, "unexpected empty key in RangeFeedDeleteRange.Span: %v", t) } if t.Timestamp.IsEmpty() { - panic(fmt.Sprintf("unexpected empty RangeFeedDeleteRange.Timestamp: %v", t)) + log.Fatalf(ctx, "unexpected empty RangeFeedDeleteRange.Timestamp: %v", t) } default: - panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t)) + log.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t) } } @@ -232,7 +231,9 @@ func (r *registration) validateEvent(event *kvpb.RangeFeedEvent) { // applicable to the current registration. If so, it makes a copy of the event // and strips the incompatible information to match only what the registration // requested. -func (r *registration) maybeStripEvent(event *kvpb.RangeFeedEvent) *kvpb.RangeFeedEvent { +func (r *registration) maybeStripEvent( + ctx context.Context, event *kvpb.RangeFeedEvent, +) *kvpb.RangeFeedEvent { ret := event copyOnWrite := func() interface{} { if ret == event { @@ -264,7 +265,7 @@ func (r *registration) maybeStripEvent(event *kvpb.RangeFeedEvent) *kvpb.RangeFe // observed all values up to the checkpoint timestamp over a given // key span if any updates to that span have been filtered out. if !t.Span.Contains(r.span) { - panic(fmt.Sprintf("registration span %v larger than checkpoint span %v", r.span, t.Span)) + log.Fatalf(ctx, "registration span %v larger than checkpoint span %v", r.span, t.Span) } t = copyOnWrite().(*kvpb.RangeFeedCheckpoint) t.Span = r.span @@ -279,7 +280,7 @@ func (r *registration) maybeStripEvent(event *kvpb.RangeFeedEvent) *kvpb.RangeFe // SSTs are always sent in their entirety, it is up to the caller to // filter out irrelevant entries. default: - panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t)) + log.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t) } return ret } @@ -443,12 +444,13 @@ func (reg *registry) NewFilter() *Filter { } // Register adds the provided registration to the registry. -func (reg *registry) Register(r *registration) { +func (reg *registry) Register(ctx context.Context, r *registration) { reg.metrics.RangeFeedRegistrations.Inc(1) r.id = reg.nextID() r.keys = r.span.AsRange() if err := reg.tree.Insert(r, false /* fast */); err != nil { - panic(err) + // TODO(erikgrinaker): these errors should arguably be returned. + log.Fatalf(ctx, "%v", err) } } @@ -484,10 +486,10 @@ func (reg *registry) PublishToOverlapping( // surprising. Revisit this once RangeFeed has more users. minTS = hlc.MaxTimestamp default: - panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t)) + log.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t) } - reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) { + reg.forOverlappingRegs(ctx, span, func(r *registration) (bool, *kvpb.Error) { // Don't publish events if they: // 1. are equal to or less than the registration's starting timestamp, or // 2. have OmitInRangefeeds = true and this registration has opted into filtering. @@ -507,7 +509,7 @@ func (reg *registry) PublishToOverlapping( func (reg *registry) Unregister(ctx context.Context, r *registration) { reg.metrics.RangeFeedRegistrations.Dec(1) if err := reg.tree.Delete(r, false /* fast */); err != nil { - panic(err) + log.Fatalf(ctx, "%v", err) } r.drainAllocations(ctx) } @@ -519,21 +521,21 @@ func (reg *registry) Unregister(ctx context.Context, r *registration) { // errors to registrations. // TODO: this should be revisited as part of // https://github.com/cockroachdb/cockroach/issues/110634 -func (reg *registry) DisconnectAllOnShutdown(pErr *kvpb.Error) { +func (reg *registry) DisconnectAllOnShutdown(ctx context.Context, pErr *kvpb.Error) { reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len())) - reg.DisconnectWithErr(all, pErr) + reg.DisconnectWithErr(ctx, all, pErr) } // Disconnect disconnects all registrations that overlap the specified span with // a nil error. -func (reg *registry) Disconnect(span roachpb.Span) { - reg.DisconnectWithErr(span, nil /* pErr */) +func (reg *registry) Disconnect(ctx context.Context, span roachpb.Span) { + reg.DisconnectWithErr(ctx, span, nil /* pErr */) } // DisconnectWithErr disconnects all registrations that overlap the specified // span with the provided error. -func (reg *registry) DisconnectWithErr(span roachpb.Span, pErr *kvpb.Error) { - reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) { +func (reg *registry) DisconnectWithErr(ctx context.Context, span roachpb.Span, pErr *kvpb.Error) { + reg.forOverlappingRegs(ctx, span, func(r *registration) (bool, *kvpb.Error) { return true /* disconned */, pErr }) } @@ -546,7 +548,9 @@ var all = roachpb.Span{Key: roachpb.KeyMin, EndKey: roachpb.KeyMax} // then that registration is unregistered and the error returned by the // function is send on its corresponding error channel. func (reg *registry) forOverlappingRegs( - span roachpb.Span, fn func(*registration) (disconnect bool, pErr *kvpb.Error), + ctx context.Context, + span roachpb.Span, + fn func(*registration) (disconnect bool, pErr *kvpb.Error), ) { var toDelete []interval.Interface matchFn := func(i interval.Interface) (done bool) { @@ -568,12 +572,12 @@ func (reg *registry) forOverlappingRegs( reg.tree.Clear() } else if len(toDelete) == 1 { if err := reg.tree.Delete(toDelete[0], false /* fast */); err != nil { - panic(err) + log.Fatalf(ctx, "%v", err) } } else if len(toDelete) > 1 { for _, i := range toDelete { if err := reg.tree.Delete(i, true /* fast */); err != nil { - panic(err) + log.Fatalf(ctx, "%v", err) } } reg.tree.AdjustRanges() @@ -581,14 +585,14 @@ func (reg *registry) forOverlappingRegs( } // Wait for this registration to completely process its internal buffer. -func (r *registration) waitForCaughtUp() error { +func (r *registration) waitForCaughtUp(ctx context.Context) error { opts := retry.Options{ InitialBackoff: 5 * time.Millisecond, Multiplier: 2, MaxBackoff: 10 * time.Second, MaxRetries: 50, } - for re := retry.Start(opts); re.Next(); { + for re := retry.StartWithCtx(ctx, opts); re.Next(); { r.mu.Lock() caughtUp := len(r.buf) == 0 && r.mu.caughtUp r.mu.Unlock() @@ -596,6 +600,9 @@ func (r *registration) waitForCaughtUp() error { return nil } } + if err := ctx.Err(); err != nil { + return err + } return errors.Errorf("registration %v failed to empty in time", r.Range()) } @@ -610,11 +617,11 @@ func (r *registration) detachCatchUpIter() *CatchUpIterator { // waitForCaughtUp waits for all registrations overlapping the given span to // completely process their internal buffers. -func (reg *registry) waitForCaughtUp(span roachpb.Span) error { +func (reg *registry) waitForCaughtUp(ctx context.Context, span roachpb.Span) error { var outerErr error - reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) { + reg.forOverlappingRegs(ctx, span, func(r *registration) (bool, *kvpb.Error) { if outerErr == nil { - outerErr = r.waitForCaughtUp() + outerErr = r.waitForCaughtUp(ctx) } return false, nil }) diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index fe4b2db1c28d..c61093112577 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -170,8 +170,8 @@ func TestRegistrationBasic(t *testing.T) { noCatchupReg.publish(ctx, ev1, nil /* alloc */) noCatchupReg.publish(ctx, ev2, nil /* alloc */) require.Equal(t, len(noCatchupReg.buf), 2) - go noCatchupReg.runOutputLoop(context.Background(), 0) - require.NoError(t, noCatchupReg.waitForCaughtUp()) + go noCatchupReg.runOutputLoop(ctx, 0) + require.NoError(t, noCatchupReg.waitForCaughtUp(ctx)) require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events()) noCatchupReg.disconnect(nil) @@ -186,8 +186,8 @@ func TestRegistrationBasic(t *testing.T) { catchupReg.publish(ctx, ev1, nil /* alloc */) catchupReg.publish(ctx, ev2, nil /* alloc */) require.Equal(t, len(catchupReg.buf), 2) - go catchupReg.runOutputLoop(context.Background(), 0) - require.NoError(t, catchupReg.waitForCaughtUp()) + go catchupReg.runOutputLoop(ctx, 0) + require.NoError(t, catchupReg.waitForCaughtUp(ctx)) events := catchupReg.stream.Events() require.Equal(t, 5, len(events)) require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, events[3:]) @@ -199,8 +199,8 @@ func TestRegistrationBasic(t *testing.T) { false /* withDiff */, false /* withFiltering */) disconnectReg.publish(ctx, ev1, nil /* alloc */) disconnectReg.publish(ctx, ev2, nil /* alloc */) - go disconnectReg.runOutputLoop(context.Background(), 0) - require.NoError(t, disconnectReg.waitForCaughtUp()) + go disconnectReg.runOutputLoop(ctx, 0) + require.NoError(t, disconnectReg.waitForCaughtUp(ctx)) discErr := kvpb.NewError(fmt.Errorf("disconnection error")) disconnectReg.disconnect(discErr) require.Equal(t, discErr.GoError(), disconnectReg.Err()) @@ -212,7 +212,7 @@ func TestRegistrationBasic(t *testing.T) { disconnectEarlyReg.publish(ctx, ev1, nil /* alloc */) disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */) disconnectEarlyReg.disconnect(discErr) - go disconnectEarlyReg.runOutputLoop(context.Background(), 0) + go disconnectEarlyReg.runOutputLoop(ctx, 0) require.Equal(t, discErr.GoError(), disconnectEarlyReg.Err()) require.Equal(t, 0, len(disconnectEarlyReg.stream.Events())) @@ -222,7 +222,7 @@ func TestRegistrationBasic(t *testing.T) { for i := 0; i < cap(overflowReg.buf)+3; i++ { overflowReg.publish(ctx, ev1, nil /* alloc */) } - go overflowReg.runOutputLoop(context.Background(), 0) + go overflowReg.runOutputLoop(ctx, 0) require.Equal(t, newErrBufferCapacityExceeded().GoError(), overflowReg.Err()) require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events())) @@ -231,7 +231,7 @@ func TestRegistrationBasic(t *testing.T) { false /* withDiff */, false /* withFiltering */) streamErr := fmt.Errorf("stream error") streamErrReg.stream.SetSendErr(streamErr) - go streamErrReg.runOutputLoop(context.Background(), 0) + go streamErrReg.runOutputLoop(ctx, 0) streamErrReg.publish(ctx, ev1, nil /* alloc */) require.Equal(t, streamErr.Error(), streamErrReg.Err().Error()) @@ -239,8 +239,8 @@ func TestRegistrationBasic(t *testing.T) { streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ false /* withDiff */, false /* withFiltering */) streamCancelReg.stream.Cancel() - go streamCancelReg.runOutputLoop(context.Background(), 0) - require.NoError(t, streamCancelReg.waitForCaughtUp()) + go streamCancelReg.runOutputLoop(ctx, 0) + require.NoError(t, streamCancelReg.waitForCaughtUp(ctx)) require.Equal(t, streamCancelReg.stream.Context().Err(), streamCancelReg.Err()) } @@ -387,20 +387,20 @@ func TestRegistryBasic(t *testing.T) { reg := makeRegistry(NewMetrics()) require.Equal(t, 0, reg.Len()) - require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev1, false /* omitInRangefeeds */, nil /* alloc */) }) - require.NotPanics(t, func() { reg.Disconnect(spAB) }) - require.NotPanics(t, func() { reg.DisconnectWithErr(spAB, err1) }) + reg.PublishToOverlapping(ctx, spAB, ev1, false /* omitInRangefeeds */, nil /* alloc */) + reg.Disconnect(ctx, spAB) + reg.DisconnectWithErr(ctx, spAB, err1) rAB := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */) rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */, false /* withFiltering */) rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */, false /* withFiltering */) rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */) rACFiltering := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */, true /* withFiltering */) - go rAB.runOutputLoop(context.Background(), 0) - go rBC.runOutputLoop(context.Background(), 0) - go rCD.runOutputLoop(context.Background(), 0) - go rAC.runOutputLoop(context.Background(), 0) - go rACFiltering.runOutputLoop(context.Background(), 0) + go rAB.runOutputLoop(ctx, 0) + go rBC.runOutputLoop(ctx, 0) + go rCD.runOutputLoop(ctx, 0) + go rAC.runOutputLoop(ctx, 0) + go rACFiltering.runOutputLoop(ctx, 0) defer rAB.disconnect(nil) defer rBC.disconnect(nil) defer rCD.disconnect(nil) @@ -408,15 +408,15 @@ func TestRegistryBasic(t *testing.T) { defer rACFiltering.disconnect(nil) // Register 4 registrations. - reg.Register(&rAB.registration) + reg.Register(ctx, &rAB.registration) require.Equal(t, 1, reg.Len()) - reg.Register(&rBC.registration) + reg.Register(ctx, &rBC.registration) require.Equal(t, 2, reg.Len()) - reg.Register(&rCD.registration) + reg.Register(ctx, &rCD.registration) require.Equal(t, 3, reg.Len()) - reg.Register(&rAC.registration) + reg.Register(ctx, &rAC.registration) require.Equal(t, 4, reg.Len()) - reg.Register(&rACFiltering.registration) + reg.Register(ctx, &rACFiltering.registration) require.Equal(t, 5, reg.Len()) // Publish to different spans. @@ -425,7 +425,7 @@ func TestRegistryBasic(t *testing.T) { reg.PublishToOverlapping(ctx, spCD, ev3, false /* omitInRangefeeds */, nil /* alloc */) reg.PublishToOverlapping(ctx, spAC, ev4, false /* omitInRangefeeds */, nil /* alloc */) reg.PublishToOverlapping(ctx, spAC, ev5, true /* omitInRangefeeds */, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(all)) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev4), noPrev(ev5)}, rAB.Events()) require.Equal(t, []*kvpb.RangeFeedEvent{ev2, ev4, ev5}, rBC.Events()) require.Equal(t, []*kvpb.RangeFeedEvent{ev3}, rCD.Events()) @@ -463,7 +463,7 @@ func TestRegistryBasic(t *testing.T) { require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyX})) // Disconnect span that overlaps with rCD. - reg.DisconnectWithErr(spCD, err1) + reg.DisconnectWithErr(ctx, spCD, err1) require.Equal(t, 4, reg.Len()) require.Equal(t, err1.GoError(), rCD.Err()) @@ -472,11 +472,11 @@ func TestRegistryBasic(t *testing.T) { reg.PublishToOverlapping(ctx, spBC, ev3, false /* omitInRangefeeds */, nil /* alloc */) reg.PublishToOverlapping(ctx, spCD, ev2, false /* omitInRangefeeds */, nil /* alloc */) reg.PublishToOverlapping(ctx, spAC, ev1, false /* omitInRangefeeds */, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(all)) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev4), noPrev(ev1)}, rAB.Events()) // Disconnect from rAB without error. - reg.Disconnect(spAB) + reg.Disconnect(ctx, spAB) require.Nil(t, rAC.Err()) require.Nil(t, rAB.Err()) require.Equal(t, 1, reg.Len()) @@ -509,59 +509,6 @@ func TestRegistryBasic(t *testing.T) { require.Equal(t, 0, reg.Len()) } -func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - reg := makeRegistry(NewMetrics()) - - rNoDiff := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */) - go rNoDiff.runOutputLoop(context.Background(), 0) - reg.Register(&rNoDiff.registration) - - rWithDiff := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */, false /* withFiltering */) - go rWithDiff.runOutputLoop(context.Background(), 0) - reg.Register(&rWithDiff.registration) - - key := roachpb.Key("a") - val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} - noVal := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}} - ev := new(kvpb.RangeFeedEvent) - - // Both registrations require RangeFeedValue events to have a Key. - ev.MustSetValue(&kvpb.RangeFeedValue{ - Key: nil, - Value: val, - PrevValue: val, - }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, false /* omitInRangefeeds */, nil /* alloc */) }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, false /* omitInRangefeeds */, nil /* alloc */) }) - require.NoError(t, reg.waitForCaughtUp(all)) - - // Both registrations require RangeFeedValue events to have a Value. - ev.MustSetValue(&kvpb.RangeFeedValue{ - Key: key, - Value: noVal, - PrevValue: val, - }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, false /* omitInRangefeeds */, nil /* alloc */) }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, false /* omitInRangefeeds */, nil /* alloc */) }) - require.NoError(t, reg.waitForCaughtUp(all)) - - // Neither registrations require RangeFeedValue events to have a PrevValue. - // Even when they are requested, the previous value can always be nil. - ev.MustSetValue(&kvpb.RangeFeedValue{ - Key: key, - Value: val, - PrevValue: roachpb.Value{}, - }) - require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, false /* omitInRangefeeds */, nil /* alloc */) }) - require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, false /* omitInRangefeeds */, nil /* alloc */) }) - require.NoError(t, reg.waitForCaughtUp(all)) - - rNoDiff.disconnect(nil) - rWithDiff.disconnect(nil) -} - func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -569,8 +516,8 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, /* catchup */ false /* withDiff */, false /* withFiltering */) - go r.runOutputLoop(context.Background(), 0) - reg.Register(&r.registration) + go r.runOutputLoop(ctx, 0) + reg.Register(ctx, &r.registration) // Publish a value with a timestamp beneath the registration's start // timestamp. Should be ignored. @@ -579,7 +526,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 5}}, }) reg.PublishToOverlapping(ctx, spAB, ev, false /* omitInRangefeeds */, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(all)) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) require.Nil(t, r.Events()) // Publish a value with a timestamp equal to the registration's start @@ -588,7 +535,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 10}}, }) reg.PublishToOverlapping(ctx, spAB, ev, false /* omitInRangefeeds */, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(all)) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) require.Nil(t, r.Events()) // Publish a checkpoint with a timestamp beneath the registration's. Should @@ -597,7 +544,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { Span: spAB, ResolvedTS: hlc.Timestamp{WallTime: 5}, }) reg.PublishToOverlapping(ctx, spAB, ev, false /* omitInRangefeeds */, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(all)) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) require.Equal(t, []*kvpb.RangeFeedEvent{ev}, r.Events()) r.disconnect(nil) @@ -645,18 +592,19 @@ func TestRegistrationString(t *testing.T) { // implicitly. func TestRegistryShutdownMetrics(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() reg := makeRegistry(NewMetrics()) regDoneC := make(chan interface{}) r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, /*catchup */ false /* withDiff */, false /* withFiltering */) go func() { - r.runOutputLoop(context.Background(), 0) + r.runOutputLoop(ctx, 0) close(regDoneC) }() - reg.Register(&r.registration) + reg.Register(ctx, &r.registration) - reg.DisconnectAllOnShutdown(nil) + reg.DisconnectAllOnShutdown(ctx, nil) <-regDoneC require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop") } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index eab7118558c6..19af3aa6e5af 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -110,14 +110,14 @@ func (rts *resolvedTimestamp) Get() hlc.Timestamp { // closed timestamp. Once initialized, the resolvedTimestamp can begin operating // in its steady state. The method returns whether this caused the resolved // timestamp to move forward. -func (rts *resolvedTimestamp) Init() bool { +func (rts *resolvedTimestamp) Init(ctx context.Context) bool { rts.init = true // Once the resolvedTimestamp is initialized, all prior written intents // should be accounted for, so reference counts for transactions that // would drop below zero will all be due to aborted transactions. These // can all be ignored. rts.intentQ.AllowNegRefCount(false) - return rts.recompute() + return rts.recompute(ctx) } // IsInit returns whether the resolved timestamp is initialized. @@ -128,11 +128,11 @@ func (rts *resolvedTimestamp) IsInit() bool { // ForwardClosedTS indicates that the closed timestamp that serves as the basis // for the resolved timestamp has advanced. The method returns whether this // caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool { +func (rts *resolvedTimestamp) ForwardClosedTS(ctx context.Context, newClosedTS hlc.Timestamp) bool { if rts.closedTS.Forward(newClosedTS) { - return rts.recompute() + return rts.recompute(ctx) } - rts.assertNoChange() + rts.assertNoChange(ctx) return false } @@ -144,9 +144,9 @@ func (rts *resolvedTimestamp) ConsumeLogicalOp( ctx context.Context, op enginepb.MVCCLogicalOp, ) bool { if rts.consumeLogicalOp(ctx, op) { - return rts.recompute() + return rts.recompute(ctx) } - rts.assertNoChange() + rts.assertNoChange(ctx) return false } @@ -234,20 +234,21 @@ func (rts *resolvedTimestamp) consumeLogicalOp( return rts.intentQ.Del(t.TxnID) default: - panic(errors.AssertionFailedf("unknown logical op %T", t)) + log.Fatalf(ctx, "unknown logical op %T", t) + return false } } // recompute computes the resolved timestamp based on its respective closed // timestamp and the in-flight intents that it is tracking. The method returns // whether this caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) recompute() bool { +func (rts *resolvedTimestamp) recompute(ctx context.Context) bool { if !rts.IsInit() { return false } if rts.closedTS.Less(rts.resolvedTS) { - panic(fmt.Sprintf("closed timestamp below resolved timestamp: %s < %s", - rts.closedTS, rts.resolvedTS)) + log.Fatalf(ctx, "closed timestamp below resolved timestamp: %s < %s", + rts.closedTS, rts.resolvedTS) } newTS := rts.closedTS @@ -255,8 +256,8 @@ func (rts *resolvedTimestamp) recompute() bool { // timestamps cannot be resolved yet. if txn := rts.intentQ.Oldest(); txn != nil { if txn.timestamp.LessEq(rts.resolvedTS) { - panic(fmt.Sprintf("unresolved txn equal to or below resolved timestamp: %s <= %s", - txn.timestamp, rts.resolvedTS)) + log.Fatalf(ctx, "unresolved txn equal to or below resolved timestamp: %s <= %s", + txn.timestamp, rts.resolvedTS) } // txn.timestamp cannot be resolved, so the resolved timestamp must be Prev. txnTS := txn.timestamp.Prev() @@ -267,8 +268,8 @@ func (rts *resolvedTimestamp) recompute() bool { newTS.Logical = 0 if newTS.Less(rts.resolvedTS) { - panic(fmt.Sprintf("resolved timestamp regression, was %s, recomputed as %s", - rts.resolvedTS, newTS)) + log.Fatalf(ctx, "resolved timestamp regression, was %s, recomputed as %s", + rts.resolvedTS, newTS) } return rts.resolvedTS.Forward(newTS) } @@ -276,12 +277,12 @@ func (rts *resolvedTimestamp) recompute() bool { // assertNoChange asserts that a recomputation of the resolved timestamp does // not change its value. A violation of this assertion would indicate a logic // error in the resolvedTimestamp implementation. -func (rts *resolvedTimestamp) assertNoChange() { +func (rts *resolvedTimestamp) assertNoChange(ctx context.Context) { before := rts.resolvedTS - changed := rts.recompute() + changed := rts.recompute(ctx) if changed || !before.EqOrdering(rts.resolvedTS) { - panic(fmt.Sprintf("unexpected resolved timestamp change on recomputation, "+ - "was %s, recomputed as %s", before, rts.resolvedTS)) + log.Fatalf(ctx, "unexpected resolved timestamp change on recomputation, "+ + "was %s, recomputed as %s", before, rts.resolvedTS) } } @@ -295,9 +296,7 @@ func (rts *resolvedTimestamp) assertOpAboveRTS( err := errors.AssertionFailedf( "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, op) if fatal { - // TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect - // it and to minimize code churn for backports. - panic(err) + log.Fatalf(ctx, "%v", err) } else { log.Errorf(ctx, "%v", err) } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index b13f54c17a64..466f9df6bfa2 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -183,7 +183,7 @@ func TestResolvedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) - rts.Init() + rts.Init(ctx) // Test empty resolved timestamp. require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -201,20 +201,10 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{}, rts.Get()) // Set a closed timestamp. Resolved timestamp advances. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 5}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) - // Write intent at earlier timestamp. Assertion failure. - require.Panics(t, func() { - rts.ConsumeLogicalOp(ctx, writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) - }) - - // Write value at earlier timestamp. Assertion failure. - require.Panics(t, func() { - rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 4})) - }) - // Write value at later timestamp. No effect on resolved timestamp. fwd = rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 6})) require.False(t, fwd) @@ -222,7 +212,7 @@ func TestResolvedTimestamp(t *testing.T) { // Forward closed timestamp. Resolved timestamp advances to the timestamp of // the earliest intent. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 15}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 15}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) @@ -237,7 +227,7 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Forward closed timestamp to same time as earliest intent. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 18}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 18}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) @@ -253,7 +243,7 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 18}, rts.Get()) // Forward closed timestamp. Resolved timestamp moves to earliest intent. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 30}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 30}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) @@ -304,7 +294,7 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 30}, rts.Get()) // Forward closed timestamp. Resolved timestamp moves to earliest intent. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 40}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 40}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 34}, rts.Get()) @@ -330,7 +320,7 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) // Forward closed timestamp. Resolved timestamp moves to earliest intent. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 50}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 50}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 45}, rts.Get()) @@ -356,7 +346,7 @@ func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) - rts.Init() + rts.Init(ctx) // Add a value. No closed timestamp so no resolved timestamp. fwd := rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) @@ -393,31 +383,32 @@ func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { func TestResolvedTimestampNoIntents(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) - rts.Init() + rts.Init(ctx) // Set a closed timestamp. Resolved timestamp advances. - fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 1}) + fwd := rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 1}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 1}, rts.Get()) // Forward closed timestamp. Resolved timestamp advances. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 3}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 3}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 3}, rts.Get()) // Smaller closed timestamp. Resolved timestamp does not advance. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 2}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 2}) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 3}, rts.Get()) // Equal closed timestamp. Resolved timestamp does not advance. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 3}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 3}) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 3}, rts.Get()) // Forward closed timestamp. Resolved timestamp advances. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 4}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 4}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 4}, rts.Get()) } @@ -431,12 +422,12 @@ func TestResolvedTimestampInit(t *testing.T) { rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) // Set a closed timestamp. Not initialized so no resolved timestamp. - fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) + fwd := rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 5}) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Init. Resolved timestamp moves to closed timestamp. - fwd = rts.Init() + fwd = rts.Init(ctx) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) }) @@ -450,7 +441,7 @@ func TestResolvedTimestampInit(t *testing.T) { require.Equal(t, hlc.Timestamp{}, rts.Get()) // Init. Resolved timestamp undefined. - fwd = rts.Init() + fwd = rts.Init(ctx) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) }) @@ -464,12 +455,12 @@ func TestResolvedTimestampInit(t *testing.T) { require.Equal(t, hlc.Timestamp{}, rts.Get()) // Set a closed timestamp. Not initialized so no resolved timestamp. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 5}) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Init. Resolved timestamp moves below first unresolved intent. - fwd = rts.Init() + fwd = rts.Init(ctx) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 2}, rts.Get()) }) @@ -496,12 +487,12 @@ func TestResolvedTimestampInit(t *testing.T) { require.Equal(t, hlc.Timestamp{}, rts.Get()) // Set a closed timestamp. Not initialized so no resolved timestamp. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 5}) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Init. Resolved timestamp moves to closed timestamp. - fwd = rts.Init() + fwd = rts.Init(ctx) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) }) @@ -516,7 +507,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Init. Negative txn ref count causes panic. Init should not have // been called because an intent must not have been accounted for. - require.Panics(t, func() { rts.Init() }) + require.Panics(t, func() { rts.Init(ctx) }) }) } @@ -524,10 +515,10 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) - rts.Init() + rts.Init(ctx) // Set a closed timestamp. Resolved timestamp advances. - fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) + fwd := rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 5}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -538,7 +529,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) // Set a new closed timestamp. Resolved timestamp advances. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 15}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 15}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) @@ -565,7 +556,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { // Set a new closed timestamp. Resolved timestamp advances, but only up to // the timestamp of txn1's intent, which we fail remember is uncommittable. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 25}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 25}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) @@ -581,10 +572,10 @@ func TestClosedTimestampLogicalPart(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) - rts.Init() + rts.Init(ctx) // Set a new closed timestamp. Resolved timestamp advances. - fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 10, Logical: 2}) + fwd := rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 10, Logical: 2}) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) @@ -597,7 +588,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // Set a new closed timestamp. Resolved timestamp doesn't advance, since it // could only theoretically advance up to 10.4, and it doesn't do logical // parts. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 11, Logical: 6}) + fwd = rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 11, Logical: 6}) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 3754f410e01f..f4a295d0ca8c 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -12,7 +12,6 @@ package rangefeed import ( "context" - "fmt" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -228,6 +227,7 @@ func (p *ScheduledProcessor) processStop() { } func (p *ScheduledProcessor) cleanup() { + ctx := p.AmbientContext.AnnotateCtx(context.Background()) // Cleanup is normally called when all registrations are disconnected and // unregistered or were not created yet (processor start failure). // However, there's a case where processor is stopped by replica action while @@ -237,14 +237,14 @@ func (p *ScheduledProcessor) cleanup() { // To avoid leaking any registry resources and metrics, processor performs // explicit registry termination in that case. pErr := kvpb.NewError(&kvpb.NodeUnavailableError{}) - p.reg.DisconnectAllOnShutdown(pErr) + p.reg.DisconnectAllOnShutdown(ctx, pErr) // Unregister callback from scheduler p.scheduler.Unregister() p.taskCancel() close(p.stoppedC) - p.MemBudget.Close(context.Background()) + p.MemBudget.Close(ctx) } // Stop shuts down the processor and closes all registrations. Safe to call on @@ -271,13 +271,13 @@ func (p *ScheduledProcessor) DisconnectSpanWithErr(span roachpb.Span, pErr *kvpb return } p.enqueueRequest(func(ctx context.Context) { - p.reg.DisconnectWithErr(span, pErr) + p.reg.DisconnectWithErr(ctx, span, pErr) }) } func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) { p.enqueueRequest(func(ctx context.Context) { - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectWithErr(ctx, all, pErr) // First set stopping flag to ensure that once all registrations are removed // processor should stop. p.stopping = true @@ -331,7 +331,7 @@ func (p *ScheduledProcessor) Register( } // Add the new registration to the registry. - p.reg.Register(&r) + p.reg.Register(ctx, &r) // Prep response with filter that includes the new registration. f := p.reg.NewFilter() @@ -645,7 +645,7 @@ func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) { p.consumeSSTable(ctx, e.sst.data, e.sst.span, e.sst.ts, e.alloc) case e.sync != nil: if e.sync.testRegCatchupSpan != nil { - if err := p.reg.waitForCaughtUp(*e.sync.testRegCatchupSpan); err != nil { + if err := p.reg.waitForCaughtUp(ctx, *e.sync.testRegCatchupSpan); err != nil { log.Errorf( ctx, "error waiting for registries to catch up during test, results might be impacted: %s", @@ -655,7 +655,7 @@ func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) { } close(e.sync.c) default: - panic(fmt.Sprintf("missing event variant: %+v", e)) + log.Fatalf(ctx, "missing event variant: %+v", e) } } @@ -693,7 +693,7 @@ func (p *ScheduledProcessor) consumeLogicalOps( // No updates to publish. default: - panic(errors.AssertionFailedf("unknown logical op %T", t)) + log.Fatalf(ctx, "unknown logical op %T", t) } // Determine whether the operation caused the resolved timestamp to @@ -715,13 +715,13 @@ func (p *ScheduledProcessor) consumeSSTable( } func (p *ScheduledProcessor) forwardClosedTS(ctx context.Context, newClosedTS hlc.Timestamp) { - if p.rts.ForwardClosedTS(newClosedTS) { + if p.rts.ForwardClosedTS(ctx, newClosedTS) { p.publishCheckpoint(ctx) } } func (p *ScheduledProcessor) initResolvedTS(ctx context.Context) { - if p.rts.Init() { + if p.rts.Init(ctx) { p.publishCheckpoint(ctx) } } @@ -781,10 +781,10 @@ func (p *ScheduledProcessor) publishSSTable( alloc *SharedBudgetAllocation, ) { if sstSpan.Equal(roachpb.Span{}) { - panic(errors.AssertionFailedf("received SSTable without span")) + log.Fatalf(ctx, "received SSTable without span") } if sstWTS.IsEmpty() { - panic(errors.AssertionFailedf("received SSTable without write timestamp")) + log.Fatalf(ctx, "received SSTable without write timestamp") } p.reg.PublishToOverlapping(ctx, sstSpan, &kvpb.RangeFeedEvent{ SST: &kvpb.RangeFeedSSTable{ From 401d70bf93bf51a7cb7658f8757de80640fca2d7 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 24 Jan 2024 11:08:35 +0000 Subject: [PATCH 2/5] rangefeed: use pointer receiver in MVCCLogicalOp assertion `String()` is only implemented for a pointer receiver, so the formatting breaks otherwise. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/resolved_timestamp.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 19af3aa6e5af..ad9689a8721e 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -293,8 +293,11 @@ func (rts *resolvedTimestamp) assertOpAboveRTS( ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool, ) { if opTS.LessEq(rts.resolvedTS) { + // NB: MVCCLogicalOp.String() is only implemented for pointer receiver. + // We shadow the variable to avoid it escaping to the heap. + op := op err := errors.AssertionFailedf( - "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, op) + "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op) if fatal { log.Fatalf(ctx, "%v", err) } else { From 051191c2dbae7867122fbf20ad01d55bd0ba6619 Mon Sep 17 00:00:00 2001 From: Michael Erickson Date: Wed, 31 Jan 2024 16:56:58 -0800 Subject: [PATCH 3/5] opt/exec: try deflaking distsql_tenant_locality again In #118168 we tried a retry, but that didn't work. Let's check whether the sql instances are alive. I think this is what `crdb_internal.sql_liveness_is_alive` is for.` Fixes: #117005 Epic: None Release note: None --- .../execbuilder/testdata/distsql_tenant_locality | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_tenant_locality b/pkg/sql/opt/exec/execbuilder/testdata/distsql_tenant_locality index 369922c60587..2d54e2edc43a 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_tenant_locality +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_tenant_locality @@ -40,19 +40,19 @@ statement ok SELECT * FROM t # Check sql instance locality in the secondary tenant. -query IT -SELECT id, locality FROM system.sql_instances WHERE locality IS NOT NULL ORDER BY id +query ITB retry +SELECT id, locality, crdb_internal.sql_liveness_is_alive(session_id) FROM system.sql_instances WHERE locality IS NOT NULL ORDER BY id ---- -1 {"Tiers": "region=test"} -2 {"Tiers": "region=test1"} -3 {"Tiers": "region=test2"} +1 {"Tiers": "region=test"} true +2 {"Tiers": "region=test1"} true +3 {"Tiers": "region=test2"} true # Ensure that we plan TableReaders in the regions according to the leaseholder # of each range, namely we want # - TableReader on SQL Instance 2 to scan Span /106/1/1/0 # - TableReader on SQL Instance 3 to scan Span /106/1/3/0 # - TableReader on SQL Instance 1 to scan Span /106/1/5/0. -query T retry +query T EXPLAIN (DISTSQL) SELECT * FROM t WHERE k IN (1, 3, 5) ---- distribution: full From a05283d955d836c197e3dca4094dffc2cfce67cd Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 22 Nov 2023 15:31:50 -0500 Subject: [PATCH 4/5] mixedversion: add public API to disable individual mutators With this API, tests are now able to disable individual mutators if they are incompatible with the test (e.g., setting a cluster setting that conflicts with what the test is trying to do). Tests should be able to disable mutators with something like the following: ```go mvt := mixedversion.NewTest(..., mixedversion.DisableMutators(mixedversion.ClusterSetting("foo")), ) ``` Note: this is just an example -- individual mutator implementatios are not yet available. Epic: none Release note: None --- .../mixedversion/mixedversion.go | 45 +++++++--- .../roachtestutil/mixedversion/planner.go | 15 +++- .../mixedversion/planner_test.go | 83 ++++++++++++++++++- .../testdata/planner/conflicting_mutators | 47 +++++++++++ .../testdata/planner/mutator_probabilities | 49 +++++++++++ 5 files changed, 222 insertions(+), 17 deletions(-) create mode 100644 pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators create mode 100644 pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index e8ae61004cd8..9171b559884a 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -154,12 +154,13 @@ var ( defaultTestOptions = testOptions{ // We use fixtures more often than not as they are more likely to // detect bugs, especially in migrations. - useFixturesProbability: 0.7, - upgradeTimeout: clusterupgrade.DefaultUpgradeTimeout, - minUpgrades: 1, - maxUpgrades: 4, - minimumSupportedVersion: OldestSupportedVersion, - predecessorFunc: randomPredecessorHistory, + useFixturesProbability: 0.7, + upgradeTimeout: clusterupgrade.DefaultUpgradeTimeout, + minUpgrades: 1, + maxUpgrades: 4, + minimumSupportedVersion: OldestSupportedVersion, + predecessorFunc: randomPredecessorHistory, + overriddenMutatorProbabilities: make(map[string]float64), } // OldestSupportedVersion is the oldest cockroachdb version @@ -249,13 +250,14 @@ type ( // testOptions contains some options that can be changed by the user // that expose some control over the generated test plan and behaviour. testOptions struct { - useFixturesProbability float64 - upgradeTimeout time.Duration - minUpgrades int - maxUpgrades int - minimumSupportedVersion *clusterupgrade.Version - predecessorFunc predecessorFunc - settings []install.ClusterSettingOption + useFixturesProbability float64 + upgradeTimeout time.Duration + minUpgrades int + maxUpgrades int + minimumSupportedVersion *clusterupgrade.Version + predecessorFunc predecessorFunc + settings []install.ClusterSettingOption + overriddenMutatorProbabilities map[string]float64 } CustomOption func(*testOptions) @@ -392,6 +394,23 @@ func AlwaysUseLatestPredecessors(opts *testOptions) { opts.predecessorFunc = latestPredecessorHistory } +// WithMutatorProbability allows tests to override the default +// probability that a mutator will be applied to a test plan. +func WithMutatorProbability(name string, probability float64) CustomOption { + return func(opts *testOptions) { + opts.overriddenMutatorProbabilities[name] = probability + } +} + +// DisableMutators disables all mutators with the names passed. +func DisableMutators(names ...string) CustomOption { + return func(opts *testOptions) { + for _, name := range names { + WithMutatorProbability(name, 0)(opts) + } + } +} + // NewTest creates a Test struct that users can use to create and run // a mixed-version roachtest. func NewTest( diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go index 04e02720b393..6aa43b5f08a0 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go @@ -98,7 +98,9 @@ type ( // Probability returns the probability that this mutator will run // in any given test run. Every mutator should run under some // probability. Making this an explicit part of the interface - // makes that more prominent. + // makes that more prominent. Note that tests are able to override + // this probability for specific mutator implementations as + // needed. Probability() float64 // Generate takes a test plan and a RNG and returns the list of // mutations that should be applied to the plan. @@ -242,7 +244,7 @@ func (p *testPlanner) Plan() *TestPlan { // Probabilistically enable some of of the mutators on the base test // plan generated above. for _, mut := range planMutators { - if p.prng.Float64() < mut.Probability() { + if p.mutatorEnabled(mut) { mutations := mut.Generate(p.prng, testPlan) testPlan.applyMutations(p.prng, mutations) testPlan.enabledMutators = append(testPlan.enabledMutators, mut) @@ -460,6 +462,15 @@ func (p *testPlanner) newRNG() *rand.Rand { return rngFromRNG(p.prng) } +func (p *testPlanner) mutatorEnabled(mut mutator) bool { + probability := mut.Probability() + if p, ok := p.options.overriddenMutatorProbabilities[mut.Name()]; ok { + probability = p + } + + return p.prng.Float64() < probability +} + func newUpgradePlan(from, to *clusterupgrade.Version) *upgradePlan { return &upgradePlan{ from: from, diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go index d29ed0e8bf97..a2021e27c81e 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go @@ -57,11 +57,22 @@ var ( const seed = 12345 // expectations are based on this seed func TestTestPlanner(t *testing.T) { - reset := setBuildVersion() - defer reset() + // Tests run from an empty list of mutators; the only way to add + // mutators is by using the `add-mutators` directive in the + // test. This allows the output to remain stable when new mutators + // are added, and also allows us to test mutators explicitly and in + // isolation. + resetMutators := func() { planMutators = nil } + resetBuildVersion := setBuildVersion() + defer func() { + resetBuildVersion() + resetMutators() + }() datadriven.Walk(t, datapathutils.TestDataPath(t, "planner"), func(t *testing.T, path string) { + resetMutators() mvt := newTest() + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { if d.Cmd == "plan" { plan, err := mvt.plan() @@ -76,6 +87,20 @@ func TestTestPlanner(t *testing.T) { } switch d.Cmd { + case "add-mutators": + for _, arg := range d.CmdArgs { + var mut mutator + switch mutatorName := arg.Key; mutatorName { + case "concurrent_user_hooks_mutator": + mut = concurrentUserHooksMutator{} + case "remove_user_hooks_mutator": + mut = removeUserHooksMutator{} + default: + t.Fatalf("unknown mutator: %s", mutatorName) + } + + planMutators = append(planMutators, mut) + } case "mixed-version-test": mvt = createDataDrivenMixedVersionTest(t, d.CmdArgs) case "on-startup": @@ -353,6 +378,24 @@ func createDataDrivenMixedVersionTest(t *testing.T, args []datadriven.CmdArg) *T require.NoError(t, err) isLocal = boolP(b) + case "mutator_probabilities": + if len(arg.Vals)%2 != 0 { + t.Fatalf("even number of values required for %s directive", arg.Key) + } + + var j int + for j < len(arg.Vals) { + name, probStr := arg.Vals[j], arg.Vals[j+1] + prob, err := strconv.ParseFloat(probStr, 64) + require.NoError(t, err) + opts = append(opts, WithMutatorProbability(name, prob)) + + j += 2 + } + + case "disable_mutator": + opts = append(opts, DisableMutators(arg.Vals[0])) + default: t.Errorf("unknown mixed-version-test option: %s", arg.Key) } @@ -684,6 +727,42 @@ NEXT_STEP: return fmt.Errorf("no concurrent step that includes: %#v", names) } +// concurrentUserHooksMutator is a test mutator that inserts a step +// concurrently with every user-provided hook. +type concurrentUserHooksMutator struct{} + +func (concurrentUserHooksMutator) Name() string { return "concurrent_user_hooks_mutator" } +func (concurrentUserHooksMutator) Probability() float64 { return 0.5 } + +func (concurrentUserHooksMutator) Generate(rng *rand.Rand, plan *TestPlan) []mutation { + // Insert our `testSingleStep` implementation concurrently with every + // user-provided function. + return plan. + newStepSelector(). + Filter(func(s *singleStep) bool { + _, ok := s.impl.(runHookStep) + return ok + }). + InsertConcurrent(&testSingleStep{}) +} + +// removeUserHooksMutator is a test mutator that removes every +// user-provided hook from the plan. +type removeUserHooksMutator struct{} + +func (removeUserHooksMutator) Name() string { return "remove_user_hooks_mutator" } +func (removeUserHooksMutator) Probability() float64 { return 0.5 } + +func (removeUserHooksMutator) Generate(rng *rand.Rand, plan *TestPlan) []mutation { + return plan. + newStepSelector(). + Filter(func(s *singleStep) bool { + _, ok := s.impl.(runHookStep) + return ok + }). + Remove() +} + func dummyHook(context.Context, *logger.Logger, *rand.Rand, *Helper) error { return nil } diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators new file mode 100644 index 000000000000..e4ac2ce90b44 --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators @@ -0,0 +1,47 @@ +# Tests that we are able to deal with mutators that insert relative to +# steps that are later removed by subsequent mutations. The initial +# insertion should create a concurrent step with user-hooks (see +# `mutator_probabilities` test) and the second mutator removes user +# hooks, flattening that concurrent run. + +add-mutators concurrent_user_hooks_mutator remove_user_hooks_mutator +---- +ok + +# ensure both mutators are always applied +mixed-version-test num_upgrades=1 mutator_probabilities=(concurrent_user_hooks_mutator, 1, remove_user_hooks_mutator, 1) +---- +ok + +in-mixed-version name=(my mixed version feature) +---- +ok + +plan debug=true +---- +mixed-version test plan for upgrading from "v22.2.8" to "" with mutators {concurrent_user_hooks_mutator, remove_user_hooks_mutator}: +├── install fixtures for version "v22.2.8" (1) [stage=cluster-setup] +├── start cluster at version "v22.2.8" (2) [stage=cluster-setup] +├── wait for nodes :1-4 to reach cluster version '22.2' (3) [stage=cluster-setup] +└── upgrade cluster from "v22.2.8" to "" + ├── prevent auto-upgrades by setting `preserve_downgrade_option` (4) [stage=init] + ├── upgrade nodes :1-4 from "v22.2.8" to "" + │ ├── restart node 4 with binary version (5) [stage=temporary-upgrade] + │ ├── restart node 2 with binary version (6) [stage=temporary-upgrade] + │ ├── restart node 3 with binary version (7) [stage=temporary-upgrade] + │ ├── testSingleStep (8) [stage=temporary-upgrade] + │ └── restart node 1 with binary version (9) [stage=temporary-upgrade] + ├── downgrade nodes :1-4 from "" to "v22.2.8" + │ ├── restart node 2 with binary version v22.2.8 (10) [stage=rollback-upgrade] + │ ├── restart node 4 with binary version v22.2.8 (11) [stage=rollback-upgrade] + │ ├── testSingleStep (12) [stage=rollback-upgrade] + │ ├── restart node 1 with binary version v22.2.8 (13) [stage=rollback-upgrade] + │ └── restart node 3 with binary version v22.2.8 (14) [stage=rollback-upgrade] + ├── upgrade nodes :1-4 from "v22.2.8" to "" + │ ├── restart node 2 with binary version (15) [stage=last-upgrade] + │ ├── restart node 4 with binary version (16) [stage=last-upgrade] + │ ├── testSingleStep (17) [stage=last-upgrade] + │ ├── restart node 1 with binary version (18) [stage=last-upgrade] + │ └── restart node 3 with binary version (19) [stage=last-upgrade] + ├── finalize upgrade by resetting `preserve_downgrade_option` (20) [stage=running-upgrade-migrations] + └── wait for nodes :1-4 to reach cluster version (21) [stage=running-upgrade-migrations] diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities new file mode 100644 index 000000000000..8406139e2439 --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities @@ -0,0 +1,49 @@ +# Tests that we are able to override probabilities for specific mutators. + +add-mutators concurrent_user_hooks_mutator +---- +ok + +# ensure our `test_mutator` is always applied +mixed-version-test num_upgrades=1 mutator_probabilities=(concurrent_user_hooks_mutator, 1) +---- +ok + +in-mixed-version name=(my mixed version feature) +---- +ok + +plan debug=true +---- +mixed-version test plan for upgrading from "v22.2.8" to "" with mutators {concurrent_user_hooks_mutator}: +├── install fixtures for version "v22.2.8" (1) [stage=cluster-setup] +├── start cluster at version "v22.2.8" (2) [stage=cluster-setup] +├── wait for nodes :1-4 to reach cluster version '22.2' (3) [stage=cluster-setup] +└── upgrade cluster from "v22.2.8" to "" + ├── prevent auto-upgrades by setting `preserve_downgrade_option` (4) [stage=init] + ├── upgrade nodes :1-4 from "v22.2.8" to "" + │ ├── restart node 4 with binary version (5) [stage=temporary-upgrade] + │ ├── restart node 2 with binary version (6) [stage=temporary-upgrade] + │ ├── restart node 3 with binary version (7) [stage=temporary-upgrade] + │ ├── run following steps concurrently + │ │ ├── run "my mixed version feature", after 500ms delay (8) [stage=temporary-upgrade] + │ │ └── testSingleStep, after 5s delay (9) [stage=temporary-upgrade] + │ └── restart node 1 with binary version (10) [stage=temporary-upgrade] + ├── downgrade nodes :1-4 from "" to "v22.2.8" + │ ├── restart node 2 with binary version v22.2.8 (11) [stage=rollback-upgrade] + │ ├── restart node 4 with binary version v22.2.8 (12) [stage=rollback-upgrade] + │ ├── run following steps concurrently + │ │ ├── run "my mixed version feature", after 0s delay (13) [stage=rollback-upgrade] + │ │ └── testSingleStep, after 30s delay (14) [stage=rollback-upgrade] + │ ├── restart node 1 with binary version v22.2.8 (15) [stage=rollback-upgrade] + │ └── restart node 3 with binary version v22.2.8 (16) [stage=rollback-upgrade] + ├── upgrade nodes :1-4 from "v22.2.8" to "" + │ ├── restart node 2 with binary version (17) [stage=last-upgrade] + │ ├── restart node 4 with binary version (18) [stage=last-upgrade] + │ ├── run following steps concurrently + │ │ ├── run "my mixed version feature", after 500ms delay (19) [stage=last-upgrade] + │ │ └── testSingleStep, after 30s delay (20) [stage=last-upgrade] + │ ├── restart node 1 with binary version (21) [stage=last-upgrade] + │ └── restart node 3 with binary version (22) [stage=last-upgrade] + ├── finalize upgrade by resetting `preserve_downgrade_option` (23) [stage=running-upgrade-migrations] + └── wait for nodes :1-4 to reach cluster version (24) [stage=running-upgrade-migrations] From 9647c5448ce3db55009fd8ec633c98e45f766e4a Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Thu, 23 Nov 2023 13:17:09 -0500 Subject: [PATCH 5/5] mixedversion: rename `finalizeUpgrade` step to `allowUpgrade` In the near future, we will randomly change the timing of this step so that it is not immediately followed by migrations, as the name previously suggested. Epic: none Release note: None --- .../roachtestutil/mixedversion/mixedversion.go | 12 ++++++------ .../roachtest/roachtestutil/mixedversion/planner.go | 2 +- .../testdata/planner/basic_test_mixed_version_hooks | 2 +- .../testdata/planner/conflicting_mutators | 2 +- .../testdata/planner/local_runs_reduced_wait_time | 6 +++--- .../testdata/planner/minimum_supported_version | 10 +++++----- .../mixedversion/testdata/planner/multiple_upgrades | 6 +++--- .../testdata/planner/mutator_probabilities | 2 +- .../mixedversion/testdata/planner/step_stages | 10 +++++----- 9 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index 9171b559884a..2582344fd4fc 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -860,21 +860,21 @@ func (s restartWithNewBinaryStep) Run( ) } -// finalizeUpgradeStep resets the `preserve_downgrade_option` cluster +// allowUpgradeStep resets the `preserve_downgrade_option` cluster // setting, allowing the upgrade migrations to run and the cluster // version to eventually reach the binary version on the nodes. -type finalizeUpgradeStep struct { +type allowUpgradeStep struct { crdbNodes option.NodeListOption prng *rand.Rand } -func (s finalizeUpgradeStep) Background() shouldStop { return nil } +func (s allowUpgradeStep) Background() shouldStop { return nil } -func (s finalizeUpgradeStep) Description() string { - return "finalize upgrade by resetting `preserve_downgrade_option`" +func (s allowUpgradeStep) Description() string { + return "allow upgrade to happen by resetting `preserve_downgrade_option`" } -func (s finalizeUpgradeStep) Run( +func (s allowUpgradeStep) Run( ctx context.Context, l *logger.Logger, c cluster.Cluster, h *Helper, ) error { node, db := h.RandomDB(s.prng, s.crdbNodes) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go index 6aa43b5f08a0..98fdee901db4 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go @@ -410,7 +410,7 @@ func (p *testPlanner) finalizeUpgradeSteps( p.currentContext.Finalizing = true p.currentContext.Stage = RunningUpgradeMigrationsStage runMigrations := p.newSingleStep( - finalizeUpgradeStep{prng: p.newRNG(), crdbNodes: p.crdbNodes}, + allowUpgradeStep{prng: p.newRNG(), crdbNodes: p.crdbNodes}, ) var mixedVersionStepsDuringMigrations []testStep if scheduleHooks { diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/basic_test_mixed_version_hooks b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/basic_test_mixed_version_hooks index 125c5d534c8d..718e5840a71e 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/basic_test_mixed_version_hooks +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/basic_test_mixed_version_hooks @@ -43,6 +43,6 @@ mixed-version test plan for upgrading from "v22.2.8" to "": │ ├── restart node 1 with binary version (13) │ ├── run "mixed-version 1" (14) │ └── restart node 3 with binary version (15) - ├── finalize upgrade by resetting `preserve_downgrade_option` (16) + ├── allow upgrade to happen by resetting `preserve_downgrade_option` (16) ├── run "mixed-version 2" (17) └── wait for nodes :1-4 to reach cluster version (18) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators index e4ac2ce90b44..b6406b11a3a7 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/conflicting_mutators @@ -43,5 +43,5 @@ mixed-version test plan for upgrading from "v22.2.8" to "" with mutator │ ├── testSingleStep (17) [stage=last-upgrade] │ ├── restart node 1 with binary version (18) [stage=last-upgrade] │ └── restart node 3 with binary version (19) [stage=last-upgrade] - ├── finalize upgrade by resetting `preserve_downgrade_option` (20) [stage=running-upgrade-migrations] + ├── allow upgrade to happen by resetting `preserve_downgrade_option` (20) [stage=running-upgrade-migrations] └── wait for nodes :1-4 to reach cluster version (21) [stage=running-upgrade-migrations] diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/local_runs_reduced_wait_time b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/local_runs_reduced_wait_time index d3dbea49e5ec..103264e1d21f 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/local_runs_reduced_wait_time +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/local_runs_reduced_wait_time @@ -49,7 +49,7 @@ mixed-version test plan for upgrading from "v22.2.3" to "v23.1.4" to "v23.2.0" t │ │ ├── wait for 30s (15) │ │ ├── restart node 4 with binary version v23.1.4 (16) │ │ └── restart node 2 with binary version v23.1.4 (17) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (18) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (18) │ └── wait for nodes :1-4 to reach cluster version '23.1' (19) ├── run "initialize bank workload" (20) ├── start background hooks concurrently @@ -65,7 +65,7 @@ mixed-version test plan for upgrading from "v22.2.3" to "v23.1.4" to "v23.2.0" t │ │ ├── restart node 1 with binary version v23.2.0 (27) │ │ ├── restart node 2 with binary version v23.2.0 (28) │ │ └── restart node 3 with binary version v23.2.0 (29) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (30) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (30) │ ├── wait for nodes :1-4 to reach cluster version '23.2' (31) │ └── run "validate upgrade" (32) └── upgrade cluster from "v23.2.0" to "" @@ -91,7 +91,7 @@ mixed-version test plan for upgrading from "v22.2.3" to "v23.1.4" to "v23.2.0" t │ ├── run "mixed-version 2" (49) │ ├── restart node 2 with binary version (50) │ └── restart node 3 with binary version (51) - ├── finalize upgrade by resetting `preserve_downgrade_option` (52) + ├── allow upgrade to happen by resetting `preserve_downgrade_option` (52) ├── run "mixed-version 2" (53) ├── wait for nodes :1-4 to reach cluster version (54) └── run "validate upgrade" (55) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/minimum_supported_version b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/minimum_supported_version index f94f89c848ee..9a9f13c5f38b 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/minimum_supported_version +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/minimum_supported_version @@ -49,7 +49,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── wait for 5m0s (15) │ │ ├── restart node 4 with binary version v22.1.8 (16) │ │ └── restart node 2 with binary version v22.1.8 (17) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (18) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (18) │ └── wait for nodes :1-4 to reach cluster version '22.1' (19) ├── upgrade cluster from "v22.1.8" to "v22.2.3" │ ├── prevent auto-upgrades by setting `preserve_downgrade_option` (20) @@ -59,7 +59,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── restart node 1 with binary version v22.2.3 (23) │ │ ├── restart node 2 with binary version v22.2.3 (24) │ │ └── restart node 3 with binary version v22.2.3 (25) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (26) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (26) │ └── wait for nodes :1-4 to reach cluster version '22.2' (27) ├── upgrade cluster from "v22.2.3" to "v23.1.4" │ ├── prevent auto-upgrades by setting `preserve_downgrade_option` (28) @@ -69,7 +69,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── restart node 3 with binary version v23.1.4 (31) │ │ ├── restart node 4 with binary version v23.1.4 (32) │ │ └── wait for 10m0s (33) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (34) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (34) │ └── wait for nodes :1-4 to reach cluster version '23.1' (35) ├── run "initialize bank workload" (36) ├── start background hooks concurrently @@ -84,7 +84,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── restart node 2 with binary version v23.2.0 (43) │ │ ├── run "mixed-version 2" (44) │ │ └── restart node 3 with binary version v23.2.0 (45) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (46) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (46) │ ├── wait for nodes :1-4 to reach cluster version '23.2' (47) │ └── run "validate upgrade" (48) └── upgrade cluster from "v23.2.0" to "" @@ -111,7 +111,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ ├── run "mixed-version 1" (65) │ ├── restart node 4 with binary version (66) │ └── restart node 1 with binary version (67) - ├── finalize upgrade by resetting `preserve_downgrade_option` (68) + ├── allow upgrade to happen by resetting `preserve_downgrade_option` (68) ├── run "mixed-version 1" (69) ├── wait for nodes :1-4 to reach cluster version (70) └── run "validate upgrade" (71) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/multiple_upgrades b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/multiple_upgrades index d3b3b2b86569..a8b66f4ec3f8 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/multiple_upgrades +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/multiple_upgrades @@ -37,7 +37,7 @@ mixed-version test plan for upgrading from "v22.1.8" to "v22.2.3" to "v23.1.4" t │ │ ├── wait for 5m0s (15) │ │ ├── restart node 4 with binary version v22.2.3 (16) │ │ └── restart node 2 with binary version v22.2.3 (17) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (18) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (18) │ └── wait for nodes :1-4 to reach cluster version '22.2' (19) ├── run "initialize bank workload" (20) ├── run "bank workload" (21) @@ -49,7 +49,7 @@ mixed-version test plan for upgrading from "v22.1.8" to "v22.2.3" to "v23.1.4" t │ │ ├── restart node 1 with binary version v23.1.4 (25) │ │ ├── restart node 2 with binary version v23.1.4 (26) │ │ └── restart node 3 with binary version v23.1.4 (27) -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (28) +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (28) │ ├── run "mixed-version 1" (29) │ └── wait for nodes :1-4 to reach cluster version '23.1' (30) └── upgrade cluster from "v23.1.4" to "" @@ -71,6 +71,6 @@ mixed-version test plan for upgrading from "v22.1.8" to "v22.2.3" to "v23.1.4" t │ ├── restart node 4 with binary version (43) │ ├── run "mixed-version 1" (44) │ └── restart node 2 with binary version (45) - ├── finalize upgrade by resetting `preserve_downgrade_option` (46) + ├── allow upgrade to happen by resetting `preserve_downgrade_option` (46) ├── run "mixed-version 1" (47) └── wait for nodes :1-4 to reach cluster version (48) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities index 8406139e2439..ff067bfb1f4d 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/mutator_probabilities @@ -45,5 +45,5 @@ mixed-version test plan for upgrading from "v22.2.8" to "" with mutator │ │ └── testSingleStep, after 30s delay (20) [stage=last-upgrade] │ ├── restart node 1 with binary version (21) [stage=last-upgrade] │ └── restart node 3 with binary version (22) [stage=last-upgrade] - ├── finalize upgrade by resetting `preserve_downgrade_option` (23) [stage=running-upgrade-migrations] + ├── allow upgrade to happen by resetting `preserve_downgrade_option` (23) [stage=running-upgrade-migrations] └── wait for nodes :1-4 to reach cluster version (24) [stage=running-upgrade-migrations] diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/step_stages b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/step_stages index 3a9fd1e5a5f6..79a5dbc31fc0 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/step_stages +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/testdata/planner/step_stages @@ -49,7 +49,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── wait for 5m0s (15) [stage=last-upgrade] │ │ ├── restart node 4 with binary version v22.1.8 (16) [stage=last-upgrade] │ │ └── restart node 2 with binary version v22.1.8 (17) [stage=last-upgrade] -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (18) [stage=running-upgrade-migrations] +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (18) [stage=running-upgrade-migrations] │ └── wait for nodes :1-4 to reach cluster version '22.1' (19) [stage=running-upgrade-migrations] ├── upgrade cluster from "v22.1.8" to "v22.2.3" │ ├── prevent auto-upgrades by setting `preserve_downgrade_option` (20) [stage=init] @@ -59,7 +59,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── restart node 1 with binary version v22.2.3 (23) [stage=last-upgrade] │ │ ├── restart node 2 with binary version v22.2.3 (24) [stage=last-upgrade] │ │ └── restart node 3 with binary version v22.2.3 (25) [stage=last-upgrade] -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (26) [stage=running-upgrade-migrations] +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (26) [stage=running-upgrade-migrations] │ └── wait for nodes :1-4 to reach cluster version '22.2' (27) [stage=running-upgrade-migrations] ├── run "initialize bank workload" (28) [stage=on-startup] ├── start background hooks concurrently @@ -74,7 +74,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── run "mixed-version 1" (35) [stage=last-upgrade] │ │ ├── restart node 4 with binary version v23.1.4 (36) [stage=last-upgrade] │ │ └── run "mixed-version 2" (37) [stage=last-upgrade] -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (38) [stage=running-upgrade-migrations] +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (38) [stage=running-upgrade-migrations] │ ├── run "mixed-version 1" (39) [stage=running-upgrade-migrations] │ ├── wait for nodes :1-4 to reach cluster version '23.1' (40) [stage=running-upgrade-migrations] │ └── run "validate upgrade" (41) [stage=after-upgrade-finished] @@ -101,7 +101,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ │ ├── restart node 1 with binary version v23.2.0 (58) [stage=last-upgrade] │ │ ├── run "mixed-version 1" (59) [stage=last-upgrade] │ │ └── restart node 4 with binary version v23.2.0 (60) [stage=last-upgrade] -│ ├── finalize upgrade by resetting `preserve_downgrade_option` (61) [stage=running-upgrade-migrations] +│ ├── allow upgrade to happen by resetting `preserve_downgrade_option` (61) [stage=running-upgrade-migrations] │ ├── wait for nodes :1-4 to reach cluster version '23.2' (62) [stage=running-upgrade-migrations] │ └── run "validate upgrade" (63) [stage=after-upgrade-finished] └── upgrade cluster from "v23.2.0" to "" @@ -127,7 +127,7 @@ mixed-version test plan for upgrading from "v21.2.11" to "v22.1.8" to "v22.2.3" │ ├── restart node 1 with binary version (80) [stage=last-upgrade] │ ├── restart node 2 with binary version (81) [stage=last-upgrade] │ └── run "mixed-version 1" (82) [stage=last-upgrade] - ├── finalize upgrade by resetting `preserve_downgrade_option` (83) [stage=running-upgrade-migrations] + ├── allow upgrade to happen by resetting `preserve_downgrade_option` (83) [stage=running-upgrade-migrations] ├── run "mixed-version 2" (84) [stage=running-upgrade-migrations] ├── wait for nodes :1-4 to reach cluster version (85) [stage=running-upgrade-migrations] └── run "validate upgrade" (86) [stage=after-upgrade-finished]