Skip to content

Commit

Permalink
Merge pull request #114818 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-114719

release-23.2: tenantcapabilitieswatcher: make the watcher react faster
  • Loading branch information
stevendanna authored Jan 22, 2024
2 parents 7564d50 + 05f294d commit ed3624c
Show file tree
Hide file tree
Showing 18 changed files with 144 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error {
defaultBatchSize,
roachpb.Spans{s.spec.Span},
true, // withPrevValue
true, // withRowTSInInitialScan
spanconfigkvsubscriber.NewSpanConfigDecoder().TranslateEvent,
s.handleUpdate,
rangefeedCacheKnobs,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/cache_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func NewCache(
// many rows.
const bufferSize = 1 << 20 // infinite?
const withPrevValue = false
const withRowTSInInitialScan = true
c := Cache{}
c.w = NewWatcher(
name, clock, f,
bufferSize,
spans,
withPrevValue,
withRowTSInInitialScan,
passThroughTranslation,
c.handleUpdate,
nil)
Expand Down
35 changes: 19 additions & 16 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ import (
// react to the row-level events as they arrive can hijack the translateEvent
// function to trigger some non-blocking action.
type Watcher struct {
name redact.SafeString
clock *hlc.Clock
rangefeedFactory *rangefeed.Factory
spans []roachpb.Span
bufferSize int
withPrevValue bool
name redact.SafeString
clock *hlc.Clock
rangefeedFactory *rangefeed.Factory
spans []roachpb.Span
bufferSize int
withPrevValue bool
withRowTSInInitialScan bool

started int32 // accessed atomically

Expand Down Expand Up @@ -168,20 +169,22 @@ func NewWatcher(
bufferSize int,
spans []roachpb.Span,
withPrevValue bool,
withRowTSInInitialScan bool,
translateEvent TranslateEventFunc,
onUpdate OnUpdateFunc,
knobs *TestingKnobs,
) *Watcher {
w := &Watcher{
name: name,
clock: clock,
rangefeedFactory: rangeFeedFactory,
spans: spans,
bufferSize: bufferSize,
withPrevValue: withPrevValue,
translateEvent: translateEvent,
onUpdate: onUpdate,
restartErrCh: make(chan error),
name: name,
clock: clock,
rangefeedFactory: rangeFeedFactory,
spans: spans,
bufferSize: bufferSize,
withPrevValue: withPrevValue,
withRowTSInInitialScan: withRowTSInInitialScan,
translateEvent: translateEvent,
onUpdate: onUpdate,
restartErrCh: make(chan error),
}
if knobs != nil {
w.knobs = *knobs
Expand Down Expand Up @@ -310,7 +313,7 @@ func (s *Watcher) Run(ctx context.Context) error {
// where a higher admission-pri makes sense.
rangefeed.WithSystemTablePriority(),
rangefeed.WithDiff(s.withPrevValue),
rangefeed.WithRowTimestampInInitialScan(true),
rangefeed.WithRowTimestampInInitialScan(s.withRowTSInInitialScan),
rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) {
// TODO(irfansharif): Consider if there are other errors which we
// want to treat as permanent. This was cargo culted from the
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestWatchAuthErr(t *testing.T) {
tenant.RangeFeedFactory().(*rangefeed.Factory),
1024,
[]roachpb.Span{hostScratchSpan},
false, /*=withPrevValue*/
false, /* withPrevValue */
true, /* withRowTSInInitialScan */
func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event {
t.Fatalf("rangefeed should fail before producing results")
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/multitenant/tenantcapabilities/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb",
"//pkg/roachpb",
"//pkg/spanconfig/spanconfigbounds",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
Expand Down
4 changes: 3 additions & 1 deletion pkg/multitenant/tenantcapabilities/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// Reader provides access to the global tenant capability state. The global
Expand Down Expand Up @@ -94,7 +95,8 @@ func (e Entry) Ready() bool {
// Update represents an update to the global tenant capability state.
type Update struct {
Entry
Deleted bool // whether the entry was deleted or not
Deleted bool // whether the entry was deleted or not
Timestamp hlc.Timestamp
}

func (u Update) String() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfo"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
Expand Down Expand Up @@ -96,7 +95,7 @@ func (d *decoder) decode(

func (d *decoder) translateEvent(
ctx context.Context, ev *kvpb.RangeFeedValue,
) rangefeedbuffer.Event {
) (hasEvent bool, event tenantcapabilities.Update) {
deleted := !ev.Value.IsPresent()
var value roachpb.Value
// The event corresponds to a deletion. The capabilities being deleted must
Expand All @@ -105,7 +104,7 @@ func (d *decoder) translateEvent(
// There's nothing for us to do if this event corresponds to a deletion
// tombstone being removed (GC).
if !ev.PrevValue.IsPresent() {
return nil
return false, event
}

value = ev.PrevValue
Expand All @@ -122,15 +121,12 @@ func (d *decoder) translateEvent(
// This should never happen: the rangefeed should only ever deliver valid SQL rows.
err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode row %v", ev.Key)
logcrash.ReportOrPanic(ctx, &d.st.SV, "%w", err)
return nil
return false, event
}

return &bufferEvent{
update: tenantcapabilities.Update{
Entry: entry,
Deleted: deleted,
},
ts: ev.Value.Timestamp,
return true, tenantcapabilities.Update{
Entry: entry,
Deleted: deleted,
Timestamp: ev.Value.Timestamp,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -32,6 +33,7 @@ import (
// TenantCapabilities stored in the system.tenants table.
func TestDecodeCapabilities(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
ts, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ ok

updates
----
Incremental Update
update: ten=10 name=foo state=add service=shared cap={can_admin_unsplit:true}
update: ten=11 name=bar state=ready service=none cap={default}

Expand All @@ -32,7 +31,6 @@ ok

updates
----
Incremental Update
update: ten=11 name= state=add service=none cap={can_admin_unsplit:true}

get-capabilities ten=11
Expand All @@ -45,7 +43,6 @@ ok

updates
----
Incremental Update
delete: ten=10

get-capabilities ten=10
Expand All @@ -72,7 +69,6 @@ ok

updates
----
Incremental Update
update: ten=15 name=bli state=add service=external cap={can_admin_unsplit:true}

# Ensure only the last update is applied, even when there are multiple updates
Expand All @@ -91,7 +87,6 @@ ok

updates
----
Incremental Update
delete: ten=11

get-capabilities ten=11
Expand All @@ -117,7 +112,6 @@ ok

updates
----
Incremental Update
update: ten=15 name=blax state=ready service=none cap={default}

flush-state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ ok

updates
----
Complete Update
update: ten=11 name=bar state=add service=external cap={default}
update: ten=15 name=woo state=add service=none cap={can_admin_unsplit:true}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ ok

updates
----
Incremental Update
update: ten=10 name=foo state=add service=shared cap={can_admin_unsplit:true}
update: ten=11 name=bar state=add service=none cap={default}
update: ten=12 name=baz state=ready service=none cap={default}
Expand Down Expand Up @@ -81,7 +80,6 @@ ok
# it is able to restart.
updates
----
Complete Update
update: ten=11 name=bar state=add service=none cap={default}
update: ten=12 name=bli state=add service=none cap={can_admin_unsplit:true}
update: ten=50 name=blax state=add service=none cap={default}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package tenantcapabilitieswatcher

import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
)

Expand All @@ -24,9 +23,7 @@ type TestingKnobs struct {

// WatcherUpdatesInterceptor, if set, is called each time the Watcher
// receives a set of updates.
WatcherUpdatesInterceptor func(
updateType rangefeedcache.UpdateType, updates []tenantcapabilities.Update,
)
WatcherUpdatesInterceptor func(update tenantcapabilities.Update)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
Loading

0 comments on commit ed3624c

Please sign in to comment.