Skip to content

Commit

Permalink
rangefeedcache: handle auth error in initial scan
Browse files Browse the repository at this point in the history
Previously, if the rangfeedcache.Watcher ran into an auth error, it was
never reported to the caller. This was detected as part of cockroachdb#97991.
Moving the settingswatcher to be the first component initialized in the
sql server caused the TestTenantUnauthenticatedAccess test to fail. Now,
the error is reported to the caller and rebasing cockroachdb#97991 on top of this
change allows the test to pass.

Part of cockroachdb#94843

Release note: None
  • Loading branch information
jeffswenson committed Mar 7, 2023
1 parent cd80204 commit 8166c88
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ go_test(
"cache_impl_test.go",
"cache_test.go",
"main_test.go",
"watcher_test.go",
],
args = ["-test.timeout=295s"],
embed = [":rangefeedcache"],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/kv",
"//pkg/kv/kvclient/rangecache", # keep
"//pkg/kv/kvclient/rangefeed",
Expand All @@ -49,6 +51,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ func (s *Watcher) Run(ctx context.Context) error {
// settings watcher.
if grpcutil.IsAuthError(err) ||
strings.Contains(err.Error(), "rpc error: code = Unauthenticated") {
select {
case <-ctx.Done():
// The context is canceled when the rangefeed is closed by the
// main handler goroutine. It's closed after we stop listening
// to errCh.
case errCh <- err:
}
return true
}
return false
Expand Down
69 changes: 69 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeedcache_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestWatchAuthErr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
host, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer host.Stopper().Stop(ctx)

var _ = kvtenantccl.Connector{}
tenant, _ := serverutils.StartTenant(t, host, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
})

hostScratchRange, err := host.ScratchRange()
require.NoError(t, err)
hostScratchSpan := roachpb.Span{
Key: hostScratchRange,
EndKey: hostScratchRange.PrefixEnd(),
}

w := rangefeedcache.NewWatcher(
"test",
tenant.Clock(),
tenant.RangeFeedFactory().(*rangefeed.Factory),
1024,
[]roachpb.Span{hostScratchSpan},
false, /*=withPrevValue*/
func(ctx context.Context, kv *kvpb.RangeFeedValue) rangefeedbuffer.Event {
t.Fatalf("rangefeed should fail before producing results")
return nil
},
func(ctx context.Context, update rangefeedcache.Update) {
t.Fatalf("rangefeed should fail before producing results")
},
&rangefeedcache.TestingKnobs{})

err = w.Run(ctx)
require.True(t, grpcutil.IsAuthError(err), "expected %v to be an auth error", err)
}

0 comments on commit 8166c88

Please sign in to comment.