Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeed: improve assertions #118265

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/rangefeed/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -249,10 +250,10 @@ type SharedBudgetAllocation struct {
// Use increases usage count for the allocation. It should be called by each
// new consumer that plans to retain allocation after returning to a caller
// that passed this allocation.
func (a *SharedBudgetAllocation) Use() {
func (a *SharedBudgetAllocation) Use(ctx context.Context) {
if a != nil {
if atomic.AddInt32(&a.refCount, 1) == 1 {
panic("unexpected shared memory allocation usage increase after free")
log.Fatalf(ctx, "unexpected shared memory allocation usage increase after free")
}
}
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,8 @@ type Config struct {
// SetDefaults initializes unset fields in Config to values
// suitable for use by a Processor.
func (sc *Config) SetDefaults() {
if sc.TxnPusher == nil {
if sc.PushTxnsAge != 0 {
panic("nil TxnPusher with non-zero PushTxnsAge")
}
} else {
if sc.PushTxnsAge == 0 {
sc.PushTxnsAge = defaultPushTxnsAge
}
if sc.PushTxnsAge == 0 {
sc.PushTxnsAge = defaultPushTxnsAge
}
}

Expand Down
83 changes: 45 additions & 38 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,15 @@ func newRegistration(
func (r *registration) publish(
ctx context.Context, event *kvpb.RangeFeedEvent, alloc *SharedBudgetAllocation,
) {
r.validateEvent(event)
e := getPooledSharedEvent(sharedEvent{event: r.maybeStripEvent(event), alloc: alloc})
r.assertEvent(ctx, event)
e := getPooledSharedEvent(sharedEvent{event: r.maybeStripEvent(ctx, event), alloc: alloc})

r.mu.Lock()
defer r.mu.Unlock()
if r.mu.overflowed {
return
}
alloc.Use()
alloc.Use(ctx)
select {
case r.buf <- e:
r.mu.caughtUp = false
Expand Down Expand Up @@ -188,51 +188,52 @@ func (r *registration) publish(
}
}

// validateEvent checks that the event contains enough information for the
// registation.
func (r *registration) validateEvent(event *kvpb.RangeFeedEvent) {
// assertEvent asserts that the event contains the necessary data.
func (r *registration) assertEvent(ctx context.Context, event *kvpb.RangeFeedEvent) {
switch t := event.GetValue().(type) {
case *kvpb.RangeFeedValue:
if t.Key == nil {
panic(fmt.Sprintf("unexpected empty RangeFeedValue.Key: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedValue.Key: %v", t)
}
if t.Value.RawBytes == nil {
panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.RawBytes: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.RawBytes: %v", t)
}
if t.Value.Timestamp.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.Timestamp: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.Timestamp: %v", t)
}
case *kvpb.RangeFeedCheckpoint:
if t.Span.Key == nil {
panic(fmt.Sprintf("unexpected empty RangeFeedCheckpoint.Span.Key: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedCheckpoint.Span.Key: %v", t)
}
case *kvpb.RangeFeedSSTable:
if len(t.Data) == 0 {
panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Data: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Data: %v", t)
}
if len(t.Span.Key) == 0 {
panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Span: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Span: %v", t)
}
if t.WriteTS.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Timestamp: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Timestamp: %v", t)
}
case *kvpb.RangeFeedDeleteRange:
if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 {
panic(fmt.Sprintf("unexpected empty key in RangeFeedDeleteRange.Span: %v", t))
log.Fatalf(ctx, "unexpected empty key in RangeFeedDeleteRange.Span: %v", t)
}
if t.Timestamp.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedDeleteRange.Timestamp: %v", t))
log.Fatalf(ctx, "unexpected empty RangeFeedDeleteRange.Timestamp: %v", t)
}
default:
panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t))
log.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
}
}

// maybeStripEvent determines whether the event contains excess information not
// applicable to the current registration. If so, it makes a copy of the event
// and strips the incompatible information to match only what the registration
// requested.
func (r *registration) maybeStripEvent(event *kvpb.RangeFeedEvent) *kvpb.RangeFeedEvent {
func (r *registration) maybeStripEvent(
ctx context.Context, event *kvpb.RangeFeedEvent,
) *kvpb.RangeFeedEvent {
ret := event
copyOnWrite := func() interface{} {
if ret == event {
Expand Down Expand Up @@ -264,7 +265,7 @@ func (r *registration) maybeStripEvent(event *kvpb.RangeFeedEvent) *kvpb.RangeFe
// observed all values up to the checkpoint timestamp over a given
// key span if any updates to that span have been filtered out.
if !t.Span.Contains(r.span) {
panic(fmt.Sprintf("registration span %v larger than checkpoint span %v", r.span, t.Span))
log.Fatalf(ctx, "registration span %v larger than checkpoint span %v", r.span, t.Span)
}
t = copyOnWrite().(*kvpb.RangeFeedCheckpoint)
t.Span = r.span
Expand All @@ -279,7 +280,7 @@ func (r *registration) maybeStripEvent(event *kvpb.RangeFeedEvent) *kvpb.RangeFe
// SSTs are always sent in their entirety, it is up to the caller to
// filter out irrelevant entries.
default:
panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t))
log.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
}
return ret
}
Expand Down Expand Up @@ -443,12 +444,13 @@ func (reg *registry) NewFilter() *Filter {
}

// Register adds the provided registration to the registry.
func (reg *registry) Register(r *registration) {
func (reg *registry) Register(ctx context.Context, r *registration) {
reg.metrics.RangeFeedRegistrations.Inc(1)
r.id = reg.nextID()
r.keys = r.span.AsRange()
if err := reg.tree.Insert(r, false /* fast */); err != nil {
panic(err)
// TODO(erikgrinaker): these errors should arguably be returned.
log.Fatalf(ctx, "%v", err)
}
}

Expand Down Expand Up @@ -484,10 +486,10 @@ func (reg *registry) PublishToOverlapping(
// surprising. Revisit this once RangeFeed has more users.
minTS = hlc.MaxTimestamp
default:
panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t))
log.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
}

reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) {
reg.forOverlappingRegs(ctx, span, func(r *registration) (bool, *kvpb.Error) {
// Don't publish events if they:
// 1. are equal to or less than the registration's starting timestamp, or
// 2. have OmitInRangefeeds = true and this registration has opted into filtering.
Expand All @@ -507,7 +509,7 @@ func (reg *registry) PublishToOverlapping(
func (reg *registry) Unregister(ctx context.Context, r *registration) {
reg.metrics.RangeFeedRegistrations.Dec(1)
if err := reg.tree.Delete(r, false /* fast */); err != nil {
panic(err)
log.Fatalf(ctx, "%v", err)
}
r.drainAllocations(ctx)
}
Expand All @@ -519,21 +521,21 @@ func (reg *registry) Unregister(ctx context.Context, r *registration) {
// errors to registrations.
// TODO: this should be revisited as part of
// https://github.com/cockroachdb/cockroach/issues/110634
func (reg *registry) DisconnectAllOnShutdown(pErr *kvpb.Error) {
func (reg *registry) DisconnectAllOnShutdown(ctx context.Context, pErr *kvpb.Error) {
reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len()))
reg.DisconnectWithErr(all, pErr)
reg.DisconnectWithErr(ctx, all, pErr)
}

// Disconnect disconnects all registrations that overlap the specified span with
// a nil error.
func (reg *registry) Disconnect(span roachpb.Span) {
reg.DisconnectWithErr(span, nil /* pErr */)
func (reg *registry) Disconnect(ctx context.Context, span roachpb.Span) {
reg.DisconnectWithErr(ctx, span, nil /* pErr */)
}

// DisconnectWithErr disconnects all registrations that overlap the specified
// span with the provided error.
func (reg *registry) DisconnectWithErr(span roachpb.Span, pErr *kvpb.Error) {
reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) {
func (reg *registry) DisconnectWithErr(ctx context.Context, span roachpb.Span, pErr *kvpb.Error) {
reg.forOverlappingRegs(ctx, span, func(r *registration) (bool, *kvpb.Error) {
return true /* disconned */, pErr
})
}
Expand All @@ -546,7 +548,9 @@ var all = roachpb.Span{Key: roachpb.KeyMin, EndKey: roachpb.KeyMax}
// then that registration is unregistered and the error returned by the
// function is send on its corresponding error channel.
func (reg *registry) forOverlappingRegs(
span roachpb.Span, fn func(*registration) (disconnect bool, pErr *kvpb.Error),
ctx context.Context,
span roachpb.Span,
fn func(*registration) (disconnect bool, pErr *kvpb.Error),
) {
var toDelete []interval.Interface
matchFn := func(i interval.Interface) (done bool) {
Expand All @@ -568,34 +572,37 @@ func (reg *registry) forOverlappingRegs(
reg.tree.Clear()
} else if len(toDelete) == 1 {
if err := reg.tree.Delete(toDelete[0], false /* fast */); err != nil {
panic(err)
log.Fatalf(ctx, "%v", err)
}
} else if len(toDelete) > 1 {
for _, i := range toDelete {
if err := reg.tree.Delete(i, true /* fast */); err != nil {
panic(err)
log.Fatalf(ctx, "%v", err)
}
}
reg.tree.AdjustRanges()
}
}

// Wait for this registration to completely process its internal buffer.
func (r *registration) waitForCaughtUp() error {
func (r *registration) waitForCaughtUp(ctx context.Context) error {
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
MaxRetries: 50,
}
for re := retry.Start(opts); re.Next(); {
for re := retry.StartWithCtx(ctx, opts); re.Next(); {
r.mu.Lock()
caughtUp := len(r.buf) == 0 && r.mu.caughtUp
r.mu.Unlock()
if caughtUp {
return nil
}
}
if err := ctx.Err(); err != nil {
return err
}
return errors.Errorf("registration %v failed to empty in time", r.Range())
}

Expand All @@ -610,11 +617,11 @@ func (r *registration) detachCatchUpIter() *CatchUpIterator {

// waitForCaughtUp waits for all registrations overlapping the given span to
// completely process their internal buffers.
func (reg *registry) waitForCaughtUp(span roachpb.Span) error {
func (reg *registry) waitForCaughtUp(ctx context.Context, span roachpb.Span) error {
var outerErr error
reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) {
reg.forOverlappingRegs(ctx, span, func(r *registration) (bool, *kvpb.Error) {
if outerErr == nil {
outerErr = r.waitForCaughtUp()
outerErr = r.waitForCaughtUp(ctx)
}
return false, nil
})
Expand Down
Loading
Loading