diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/BUILD.bazel b/pkg/kv/kvclient/rangefeed/rangefeedcache/BUILD.bazel index 443e776d9dc6..e12a14d5c618 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/BUILD.bazel @@ -30,11 +30,13 @@ go_test( "cache_impl_test.go", "cache_test.go", "main_test.go", + "watcher_test.go", ], args = ["-test.timeout=295s"], embed = [":rangefeedcache"], deps = [ "//pkg/base", + "//pkg/ccl/kvccl/kvtenantccl", "//pkg/kv", "//pkg/kv/kvclient/rangecache", # keep "//pkg/kv/kvclient/rangefeed", @@ -49,6 +51,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/encoding", + "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go index 9bfec6f0f3d3..d789a291b6a4 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go @@ -312,6 +312,13 @@ func (s *Watcher) Run(ctx context.Context) error { // settings watcher. if grpcutil.IsAuthError(err) || strings.Contains(err.Error(), "rpc error: code = Unauthenticated") { + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is closed by the + // main handler goroutine. It's closed after we stop listening + // to errCh. + case errCh <- err: + } return true } return false diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go new file mode 100644 index 000000000000..cd16d98783fc --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go @@ -0,0 +1,69 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeedcache_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestWatchAuthErr(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + host, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer host.Stopper().Stop(ctx) + + var _ = kvtenantccl.Connector{} + tenant, _ := serverutils.StartTenant(t, host, base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + }) + + hostScratchRange, err := host.ScratchRange() + require.NoError(t, err) + hostScratchSpan := roachpb.Span{ + Key: hostScratchRange, + EndKey: hostScratchRange.PrefixEnd(), + } + + w := rangefeedcache.NewWatcher( + "test", + tenant.Clock(), + tenant.RangeFeedFactory().(*rangefeed.Factory), + 1024, + []roachpb.Span{hostScratchSpan}, + false, /*=withPrevValue*/ + func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event { + t.Fatalf("rangefeed should fail before producing results") + return nil + }, + func(ctx context.Context, update rangefeedcache.Update) { + t.Fatalf("rangefeed should fail before producing results") + }, + &rangefeedcache.TestingKnobs{}) + + err = w.Run(ctx) + require.True(t, grpcutil.IsAuthError(err), "expected %v to be an auth error", err) +}