Skip to content

Commit

Permalink
tenantcapabilitieswatcher: make the watcher react faster
Browse files Browse the repository at this point in the history
Prior to this patch, the tenant info watcher would only react to
changes to `system.tenants` upon rangefeed cache flushes, which could
be (in default config) up to 3 seconds after the change is committed.

This commit accelerates the behavior by processing updates as soon as
the rangefeed observes the change.

This new behavior is similar to the way that cluster settings changes
are processed immediately in the settings
watcher (pkg/settingswatcher).

In order to handle deletions that occur during errors that aren't
automatically retried inside the rangefeed library (and are instead
retried by the watcher resulting in a new full scan), we emit any
scan-generated rangefeed events at their scan timestamp, allowing us a
means of clearing any stale data from the cache.

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
stevendanna and knz committed Nov 20, 2023
1 parent 938224a commit 05f294d
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error {
defaultBatchSize,
roachpb.Spans{s.spec.Span},
true, // withPrevValue
true, // withRowTSInInitialScan
spanconfigkvsubscriber.NewSpanConfigDecoder().TranslateEvent,
s.handleUpdate,
rangefeedCacheKnobs,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func NewCache(
// many rows.
const bufferSize = 1 << 20 // infinite?
const withPrevValue = false
const withRowTSInInitialScan = true
c := Cache{}
c.w = NewWatcher(
name, clock, f,
bufferSize,
spans,
withPrevValue,
withRowTSInInitialScan,
passThroughTranslation,
c.handleUpdate,
nil)
Expand Down
35 changes: 19 additions & 16 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ import (
// react to the row-level events as they arrive can hijack the translateEvent
// function to trigger some non-blocking action.
type Watcher struct {
name redact.SafeString
clock *hlc.Clock
rangefeedFactory *rangefeed.Factory
spans []roachpb.Span
bufferSize int
withPrevValue bool
name redact.SafeString
clock *hlc.Clock
rangefeedFactory *rangefeed.Factory
spans []roachpb.Span
bufferSize int
withPrevValue bool
withRowTSInInitialScan bool

started int32 // accessed atomically

Expand Down Expand Up @@ -168,20 +169,22 @@ func NewWatcher(
bufferSize int,
spans []roachpb.Span,
withPrevValue bool,
withRowTSInInitialScan bool,
translateEvent TranslateEventFunc,
onUpdate OnUpdateFunc,
knobs *TestingKnobs,
) *Watcher {
w := &Watcher{
name: name,
clock: clock,
rangefeedFactory: rangeFeedFactory,
spans: spans,
bufferSize: bufferSize,
withPrevValue: withPrevValue,
translateEvent: translateEvent,
onUpdate: onUpdate,
restartErrCh: make(chan error),
name: name,
clock: clock,
rangefeedFactory: rangeFeedFactory,
spans: spans,
bufferSize: bufferSize,
withPrevValue: withPrevValue,
withRowTSInInitialScan: withRowTSInInitialScan,
translateEvent: translateEvent,
onUpdate: onUpdate,
restartErrCh: make(chan error),
}
if knobs != nil {
w.knobs = *knobs
Expand Down Expand Up @@ -310,7 +313,7 @@ func (s *Watcher) Run(ctx context.Context) error {
// where a higher admission-pri makes sense.
rangefeed.WithSystemTablePriority(),
rangefeed.WithDiff(s.withPrevValue),
rangefeed.WithRowTimestampInInitialScan(true),
rangefeed.WithRowTimestampInInitialScan(s.withRowTSInInitialScan),
rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) {
// TODO(irfansharif): Consider if there are other errors which we
// want to treat as permanent. This was cargo culted from the
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestWatchAuthErr(t *testing.T) {
tenant.RangeFeedFactory().(*rangefeed.Factory),
1024,
[]roachpb.Span{hostScratchSpan},
false, /*=withPrevValue*/
false, /* withPrevValue */
true, /* withRowTSInInitialScan */
func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event {
t.Fatalf("rangefeed should fail before producing results")
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/multitenant/tenantcapabilities/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb",
"//pkg/roachpb",
"//pkg/spanconfig/spanconfigbounds",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
Expand Down
4 changes: 3 additions & 1 deletion pkg/multitenant/tenantcapabilities/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// Reader provides access to the global tenant capability state. The global
Expand Down Expand Up @@ -94,7 +95,8 @@ func (e Entry) Ready() bool {
// Update represents an update to the global tenant capability state.
type Update struct {
Entry
Deleted bool // whether the entry was deleted or not
Deleted bool // whether the entry was deleted or not
Timestamp hlc.Timestamp
}

func (u Update) String() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfo"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
Expand Down Expand Up @@ -96,7 +95,7 @@ func (d *decoder) decode(

func (d *decoder) translateEvent(
ctx context.Context, ev *kvpb.RangeFeedValue,
) rangefeedbuffer.Event {
) (hasEvent bool, event tenantcapabilities.Update) {
deleted := !ev.Value.IsPresent()
var value roachpb.Value
// The event corresponds to a deletion. The capabilities being deleted must
Expand All @@ -105,7 +104,7 @@ func (d *decoder) translateEvent(
// There's nothing for us to do if this event corresponds to a deletion
// tombstone being removed (GC).
if !ev.PrevValue.IsPresent() {
return nil
return false, event
}

value = ev.PrevValue
Expand All @@ -122,15 +121,12 @@ func (d *decoder) translateEvent(
// This should never happen: the rangefeed should only ever deliver valid SQL rows.
err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode row %v", ev.Key)
logcrash.ReportOrPanic(ctx, &d.st.SV, "%w", err)
return nil
return false, event
}

return &bufferEvent{
update: tenantcapabilities.Update{
Entry: entry,
Deleted: deleted,
},
ts: ev.Value.Timestamp,
return true, tenantcapabilities.Update{
Entry: entry,
Deleted: deleted,
Timestamp: ev.Value.Timestamp,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ ok

updates
----
Incremental Update
update: ten=10 name=foo state=add service=shared cap={can_admin_unsplit:true}
update: ten=11 name=bar state=ready service=none cap={default}

Expand All @@ -32,7 +31,6 @@ ok

updates
----
Incremental Update
update: ten=11 name= state=add service=none cap={can_admin_unsplit:true}

get-capabilities ten=11
Expand All @@ -45,7 +43,6 @@ ok

updates
----
Incremental Update
delete: ten=10

get-capabilities ten=10
Expand All @@ -72,7 +69,6 @@ ok

updates
----
Incremental Update
update: ten=15 name=bli state=add service=external cap={can_admin_unsplit:true}

# Ensure only the last update is applied, even when there are multiple updates
Expand All @@ -91,7 +87,6 @@ ok

updates
----
Incremental Update
delete: ten=11

get-capabilities ten=11
Expand All @@ -117,7 +112,6 @@ ok

updates
----
Incremental Update
update: ten=15 name=blax state=ready service=none cap={default}

flush-state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ ok

updates
----
Complete Update
update: ten=11 name=bar state=add service=external cap={default}
update: ten=15 name=woo state=add service=none cap={can_admin_unsplit:true}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ ok

updates
----
Incremental Update
update: ten=10 name=foo state=add service=shared cap={can_admin_unsplit:true}
update: ten=11 name=bar state=add service=none cap={default}
update: ten=12 name=baz state=ready service=none cap={default}
Expand Down Expand Up @@ -81,7 +80,6 @@ ok
# it is able to restart.
updates
----
Complete Update
update: ten=11 name=bar state=add service=none cap={default}
update: ten=12 name=bli state=add service=none cap={can_admin_unsplit:true}
update: ten=50 name=blax state=add service=none cap={default}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package tenantcapabilitieswatcher

import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
)

Expand All @@ -24,9 +23,7 @@ type TestingKnobs struct {

// WatcherUpdatesInterceptor, if set, is called each time the Watcher
// receives a set of updates.
WatcherUpdatesInterceptor func(
updateType rangefeedcache.UpdateType, updates []tenantcapabilities.Update,
)
WatcherUpdatesInterceptor func(update tenantcapabilities.Update)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
Loading

0 comments on commit 05f294d

Please sign in to comment.