diff --git a/pkg/kv/kvclient/kvtenant/BUILD.bazel b/pkg/kv/kvclient/kvtenant/BUILD.bazel index f65b7f3327cb..df761a5ff276 100644 --- a/pkg/kv/kvclient/kvtenant/BUILD.bazel +++ b/pkg/kv/kvclient/kvtenant/BUILD.bazel @@ -20,6 +20,9 @@ go_library( "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", "//pkg/kv/kvpb", + "//pkg/multitenant/mtinfopb", + "//pkg/multitenant/tenantcapabilities", + "//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", @@ -73,6 +76,8 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/load", + "//pkg/multitenant/mtinfopb", + "//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb", "//pkg/roachpb", "//pkg/rpc", "//pkg/security", diff --git a/pkg/kv/kvclient/kvtenant/connector.go b/pkg/kv/kvclient/kvtenant/connector.go index 367da62bbfc8..311c5388df5c 100644 --- a/pkg/kv/kvclient/kvtenant/connector.go +++ b/pkg/kv/kvclient/kvtenant/connector.go @@ -26,6 +26,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" + "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -190,6 +192,27 @@ type connector struct { // notifyCh is closed when there are changes to overrides. notifyCh chan struct{} } + + // testingEmulateOldVersionSettingsClient is set to true when the + // connector should emulate the version where it processed all + // events as settings events. Used only for testing. + testingEmulateOldVersionSettingsClient bool + + metadataMu struct { + syncutil.Mutex + + // receivedFirstMetadata is set to true when the first batch of + // metadata bits has been received. + receivedFirstMetadata bool + + tenantName roachpb.TenantName + dataState mtinfopb.TenantDataState + serviceMode mtinfopb.TenantServiceMode + capabilities *tenantcapabilitiespb.TenantCapabilities + + // notifyCh is closed when there are changes to the metadata. + notifyCh chan struct{} + } } // client represents an RPC client that proxies to a KV instance. @@ -263,6 +286,7 @@ func NewConnector(cfg ConnectorConfig, addrs []string) Connector { c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue) c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue) c.settingsMu.notifyCh = make(chan struct{}) + c.metadataMu.notifyCh = make(chan struct{}) return c } diff --git a/pkg/kv/kvclient/kvtenant/connector_test.go b/pkg/kv/kvclient/kvtenant/connector_test.go index aefc4da03727..63236a382f74 100644 --- a/pkg/kv/kvclient/kvtenant/connector_test.go +++ b/pkg/kv/kvclient/kvtenant/connector_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -52,6 +53,8 @@ type mockServer struct { rangeLookupFn func(context.Context, *kvpb.RangeLookupRequest) (*kvpb.RangeLookupResponse, error) gossipSubFn func(*kvpb.GossipSubscriptionRequest, kvpb.Internal_GossipSubscriptionServer) error tenantSettingsFn func(request *kvpb.TenantSettingsRequest, server kvpb.Internal_TenantSettingsServer) error + + emulateOldVersionSettingServer bool } func (m *mockServer) RangeLookup( @@ -70,18 +73,45 @@ func (m *mockServer) TenantSettings( req *kvpb.TenantSettingsRequest, stream kvpb.Internal_TenantSettingsServer, ) error { if m.tenantSettingsFn == nil { + // First message - required by startup protocol. if err := stream.Send(&kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_SETTING_EVENT, Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES, Incremental: false, Overrides: nil, }); err != nil { return err } - return stream.Send(&kvpb.TenantSettingsEvent{ + if !m.emulateOldVersionSettingServer { + // Initial tenant metadata. + if err := stream.Send(&kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_METADATA_EVENT, + Name: "foo", + // TODO(knz): remove cast after the dep cycle has been resolved. + DataState: uint32(mtinfopb.DataStateReady), + ServiceMode: uint32(mtinfopb.ServiceModeExternal), + + // Need to ensure this looks like a fake no-op setting override event. + Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES, + Incremental: true, + }); err != nil { + return err + } + } + // Finish startup. + if err := stream.Send(&kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_SETTING_EVENT, Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES, Incremental: false, Overrides: nil, - }) + }); err != nil { + return err + } + + // Ensure the stream doesn't immediately finish, which can cause + // flakes in tests due to the retry loop in the client. + <-stream.Context().Done() + return nil } return m.tenantSettingsFn(req, stream) } @@ -212,6 +242,7 @@ func newConnector(cfg ConnectorConfig, addrs []string) *connector { // kvcoord.NodeDescStore and as a config.SystemConfigProvider. func TestConnectorGossipSubscription(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() stopper := stop.NewStopper() @@ -370,6 +401,7 @@ func TestConnectorGossipSubscription(t *testing.T) { // kvcoord.RangeDescriptorDB. func TestConnectorRangeLookup(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() stopper := stop.NewStopper() @@ -455,6 +487,7 @@ func TestConnectorRangeLookup(t *testing.T) { // on one of them. func TestConnectorRetriesUnreachable(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() stopper := stop.NewStopper() @@ -542,6 +575,7 @@ func TestConnectorRetriesUnreachable(t *testing.T) { // immediately if it is not. func TestConnectorRetriesError(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() stopper := stop.NewStopper() diff --git a/pkg/kv/kvclient/kvtenant/setting_overrides.go b/pkg/kv/kvclient/kvtenant/setting_overrides.go index 1cb8a465d3c4..9447c9a0ac21 100644 --- a/pkg/kv/kvclient/kvtenant/setting_overrides.go +++ b/pkg/kv/kvclient/kvtenant/setting_overrides.go @@ -16,6 +16,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" + "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -48,6 +50,11 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh c.settingsMu.receivedFirstAllTenantOverrides = false c.settingsMu.receivedFirstSpecificOverrides = false }() + func() { + c.metadataMu.Lock() + defer c.metadataMu.Unlock() + c.metadataMu.receivedFirstMetadata = false + }() for { e, err := stream.Recv() @@ -87,31 +94,99 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh continue } - settingsReady, err := c.processSettingsEvent(e) - if err != nil { - log.Errorf(ctx, "error processing tenant settings event: %v", err) - _ = stream.CloseSend() - c.tryForgetClient(ctx, client) - break + if c.testingEmulateOldVersionSettingsClient { + // The old version of the settings client does not understand anything + // else than setting events. + e.EventType = kvpb.TenantSettingsEvent_SETTING_EVENT } - // Signal that startup is complete once we have enough events to start. - if settingsReady { - log.Infof(ctx, "received initial tenant settings") + var reconnect bool + switch e.EventType { + case kvpb.TenantSettingsEvent_METADATA_EVENT: + err := c.processMetadataEvent(ctx, e) + if err != nil { + log.Errorf(ctx, "error processing tenant settings event: %v", err) + reconnect = true + } + + case kvpb.TenantSettingsEvent_SETTING_EVENT: + settingsReady, err := c.processSettingsEvent(ctx, e) + if err != nil { + log.Errorf(ctx, "error processing tenant settings event: %v", err) + reconnect = true + break + } - if startupCh != nil { - startupCh <- nil - close(startupCh) - startupCh = nil + // Signal that startup is complete once we have enough events + // to start. Note: we do not connect this condition to + // receiving the tenant metadata (via processMetadataEvent) for + // compatibility with pre-v23.2 servers which only send + // setting override events. + // + // Luckily, we are guaranteed that once we receive the setting + // overrides the metadata has been received as well (in v23.1+ + // servers that send it) because when it is sent it is always + // sent prior to the setting overrides. + if settingsReady { + log.Infof(ctx, "received initial tenant settings") + + if startupCh != nil { + startupCh <- nil + close(startupCh) + startupCh = nil + } } } + + if reconnect { + _ = stream.CloseSend() + c.tryForgetClient(ctx, client) + break + } } } } +// processMetadataEvent updates the tenant metadata based on the event. +func (c *connector) processMetadataEvent(ctx context.Context, e *kvpb.TenantSettingsEvent) error { + c.metadataMu.Lock() + defer c.metadataMu.Unlock() + + c.metadataMu.receivedFirstMetadata = true + c.metadataMu.capabilities = e.Capabilities + c.metadataMu.tenantName = e.Name + // TODO(knz): Remove the cast once we have proper typing in the + // protobuf, which requires breaking a dependency cycle. + c.metadataMu.dataState = mtinfopb.TenantDataState(e.DataState) + c.metadataMu.serviceMode = mtinfopb.TenantServiceMode(e.ServiceMode) + + log.Infof(ctx, "received tenant metadata: name=%q dataState=%v serviceMode=%v\ncapabilities=%+v", + c.metadataMu.tenantName, c.metadataMu.dataState, c.metadataMu.serviceMode, c.metadataMu.capabilities) + + // Signal watchers that there was an update. + close(c.metadataMu.notifyCh) + c.metadataMu.notifyCh = make(chan struct{}) + + return nil +} + +// TenantInfo accesses the tenant metadata. +func (c *connector) TenantInfo() (tenantcapabilities.Entry, <-chan struct{}) { + c.metadataMu.Lock() + defer c.metadataMu.Unlock() + + return tenantcapabilities.Entry{ + TenantID: c.tenantID, + TenantCapabilities: c.metadataMu.capabilities, + Name: c.metadataMu.tenantName, + DataState: c.metadataMu.dataState, + ServiceMode: c.metadataMu.serviceMode, + }, c.metadataMu.notifyCh +} + // processSettingsEvent updates the setting overrides based on the event. func (c *connector) processSettingsEvent( - e *kvpb.TenantSettingsEvent, + ctx context.Context, e *kvpb.TenantSettingsEvent, ) (settingsReady bool, err error) { c.settingsMu.Lock() defer c.settingsMu.Unlock() @@ -143,6 +218,8 @@ func (c *connector) processSettingsEvent( return false, errors.Newf("unknown precedence value %d", e.Precedence) } + log.Infof(ctx, "received %d setting overrides with precedence %v (incremental=%v)", len(e.Overrides), e.Precedence, e.Incremental) + // If the event is not incremental, clear the map. if !e.Incremental { for k := range m { diff --git a/pkg/kv/kvclient/kvtenant/setting_overrides_test.go b/pkg/kv/kvclient/kvtenant/setting_overrides_test.go index 864c0bf7847b..f04cae1a7b1a 100644 --- a/pkg/kv/kvclient/kvtenant/setting_overrides_test.go +++ b/pkg/kv/kvclient/kvtenant/setting_overrides_test.go @@ -18,6 +18,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" + "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings" @@ -30,14 +32,11 @@ import ( "github.com/stretchr/testify/require" ) -// TestConnectorSettingOverrides tests connector's role as a -// settingswatcher.OverridesMonitor. -func TestConnectorSettingOverrides(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx := context.Background() +func newTestConnector( + t *testing.T, ctx context.Context, +) (*connector, func(), <-chan error, chan<- *kvpb.TenantSettingsEvent) { stopper := stop.NewStopper() - defer stopper.Stop(ctx) + cleanup := func() { stopper.Stop(ctx) } clock := hlc.NewClockForTesting(nil) rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) s, err := rpc.NewServer(rpcContext) @@ -48,7 +47,8 @@ func TestConnectorSettingOverrides(t *testing.T) { return stream.Send(gossipEventForClusterID(rpcContext.StorageClusterID.Get())) } eventCh := make(chan *kvpb.TenantSettingsEvent, 2) - defer close(eventCh) + prevCleanup := cleanup + cleanup = func() { close(eventCh); prevCleanup() } settingsFn := func(req *kvpb.TenantSettingsRequest, stream kvpb.Internal_TenantSettingsServer) error { if req.TenantID != tenantID { t.Errorf("invalid tenantID %s (expected %s)", req.TenantID, tenantID) @@ -87,7 +87,21 @@ func TestConnectorSettingOverrides(t *testing.T) { case <-time.After(10 * time.Millisecond): } + return c, cleanup, startedC, eventCh +} + +// TestConnectorSettingOverrides tests connector's role as a +// settingswatcher.OverridesMonitor. +func TestConnectorSettingOverrides(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + c, cleanup, startedC, eventCh := newTestConnector(t, ctx) + defer cleanup() + ev := &kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_SETTING_EVENT, Precedence: kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES, Incremental: false, Overrides: nil, @@ -101,6 +115,7 @@ func TestConnectorSettingOverrides(t *testing.T) { } ev = &kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_SETTING_EVENT, Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES, Incremental: false, Overrides: nil, @@ -129,7 +144,7 @@ func TestConnectorSettingOverrides(t *testing.T) { Overrides: []kvpb.TenantSetting{st("foo", "all"), st("bar", "all")}, } eventCh <- ev - waitForSettings(t, ch) + waitForNotify(t, ch) ch = expectSettings(t, c, "foo=all bar=all baz=default") // Set some tenant-specific overrides, with all-tenant overlap. @@ -139,7 +154,7 @@ func TestConnectorSettingOverrides(t *testing.T) { Overrides: []kvpb.TenantSetting{st("foo", "specific"), st("baz", "specific")}, } eventCh <- ev - waitForSettings(t, ch) + waitForNotify(t, ch) ch = expectSettings(t, c, "foo=specific bar=all baz=specific") // Remove an all-tenant override that has a specific override. @@ -149,7 +164,7 @@ func TestConnectorSettingOverrides(t *testing.T) { Overrides: []kvpb.TenantSetting{st("foo", "")}, } eventCh <- ev - waitForSettings(t, ch) + waitForNotify(t, ch) ch = expectSettings(t, c, "foo=specific bar=all baz=specific") // Remove a specific override. @@ -159,7 +174,7 @@ func TestConnectorSettingOverrides(t *testing.T) { Overrides: []kvpb.TenantSetting{st("foo", "")}, } eventCh <- ev - waitForSettings(t, ch) + waitForNotify(t, ch) ch = expectSettings(t, c, "foo=default bar=all baz=specific") // Non-incremental change to all-tenants override. @@ -169,17 +184,17 @@ func TestConnectorSettingOverrides(t *testing.T) { Overrides: []kvpb.TenantSetting{st("bar", "all")}, } eventCh <- ev - waitForSettings(t, ch) + waitForNotify(t, ch) _ = expectSettings(t, c, "foo=default bar=all baz=specific") } -func waitForSettings(t *testing.T, ch <-chan struct{}) { +func waitForNotify(t *testing.T, ch <-chan struct{}) { t.Helper() select { case <-ch: return case <-time.After(10 * time.Second): - t.Fatalf("waitForSettings timed out") + t.Fatalf("waitForNotify timed out") } } @@ -207,3 +222,166 @@ func expectSettings(t *testing.T, c *connector, exp string) <-chan struct{} { return updateCh } + +// TestConnectorTenantMetadata tests the connector's role as a +// tenant metadata provider. +func TestConnectorTenantMetadata(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + c, cleanup, startedC, eventCh := newTestConnector(t, ctx) + defer cleanup() + + // First message - required by startup protocol. + const firstPrecedence = kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES + ev := &kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_SETTING_EVENT, + Precedence: firstPrecedence, + } + eventCh <- ev + select { + case err := <-startedC: + t.Fatalf("Start unexpectedly completed with err=%v", err) + case <-time.After(10 * time.Millisecond): + } + + // Initial tenant metadata. + ev = &kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_METADATA_EVENT, + Precedence: firstPrecedence, + Incremental: true, + Name: "initial", + // TODO(knz): remove cast after the dep cycle has been resolved. + DataState: uint32(mtinfopb.DataStateReady), + ServiceMode: uint32(mtinfopb.ServiceModeExternal), + Capabilities: &tenantcapabilitiespb.TenantCapabilities{CanViewNodeInfo: true}, + } + eventCh <- ev + select { + case err := <-startedC: + t.Fatalf("Start unexpectedly completed with err=%v", err) + case <-time.After(10 * time.Millisecond): + } + + // Finish startup. + ev = &kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_SETTING_EVENT, + Precedence: kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES, + } + eventCh <- ev + select { + case err := <-startedC: + require.NoError(t, err) + case <-time.After(10 * time.Second): + t.Fatalf("failed to see start complete") + } + + ch := expectMetadata(t, c, `tid=5 name="initial" data=ready service=external caps=can_view_node_info:true `) + + // Change some metadata fields. + const anyPrecedence = kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES + ev = &kvpb.TenantSettingsEvent{ + EventType: kvpb.TenantSettingsEvent_METADATA_EVENT, + Precedence: anyPrecedence, + Incremental: true, + Name: "initial", + // TODO(knz): remove cast after the dep cycle has been resolved. + DataState: uint32(mtinfopb.DataStateDrop), + ServiceMode: uint32(mtinfopb.ServiceModeShared), + Capabilities: &tenantcapabilitiespb.TenantCapabilities{ExemptFromRateLimiting: true}, + } + eventCh <- ev + waitForNotify(t, ch) + + _ = expectMetadata(t, c, `tid=5 name="initial" data=dropping service=shared caps=exempt_from_rate_limiting:true `) +} + +func expectMetadata(t *testing.T, c *connector, exp string) <-chan struct{} { + t.Helper() + info, updateCh := c.TenantInfo() + str := fmt.Sprintf("tid=%v name=%q data=%v service=%v caps=%+v", + info.TenantID, info.Name, info.DataState, info.ServiceMode, info.TenantCapabilities) + if str != exp { + t.Errorf("\nexpected: %q\ngot : %q", exp, str) + } + + return updateCh +} + +// TestCrossVersionMetadataSupport tests that and old-version +// connector can talk to a new-version server and a new-version server +// can talk to an old-version connector. +func TestCrossVersionMetadataSupport(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, b := range []bool{false, true} { + oldVersionClient := b + oldVersionServer := !b + strs := map[bool]string{false: "new", true: "old"} + t.Run(fmt.Sprintf("client=%s/server=%s", strs[oldVersionClient], strs[oldVersionServer]), func(t *testing.T) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + s, err := rpc.NewServer(rpcContext) + require.NoError(t, err) + + gossipSubFn := func(req *kvpb.GossipSubscriptionRequest, stream kvpb.Internal_GossipSubscriptionServer) error { + return stream.Send(gossipEventForClusterID(rpcContext.StorageClusterID.Get())) + } + server := &mockServer{ + gossipSubFn: gossipSubFn, + emulateOldVersionSettingServer: oldVersionServer, + } + kvpb.RegisterInternalServer(s, server) + ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr) + require.NoError(t, err) + + cfg := ConnectorConfig{ + TenantID: roachpb.MustMakeTenantID(5), + AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), + RPCContext: rpcContext, + RPCRetryOptions: rpcRetryOpts, + } + addrs := []string{ln.Addr().String()} + c := newConnector(cfg, addrs) + c.testingEmulateOldVersionSettingsClient = oldVersionClient + + // Start the connector. + startedC := make(chan error) + go func() { + startedC <- c.Start(ctx) + }() + select { + case err := <-startedC: + require.NoError(t, err) + case <-time.After(10 * time.Second): + t.Fatalf("failed to see start complete") + } + + // In any case check that the overrides are available. + func() { + t.Helper() + c.settingsMu.Lock() + defer c.settingsMu.Unlock() + + require.True(t, c.settingsMu.receivedFirstAllTenantOverrides) + require.True(t, c.settingsMu.receivedFirstSpecificOverrides) + }() + + // If either the client or the server is new, the metadata is + // not communicated or not processed. + receivedFirstMetadata := func() bool { + c.metadataMu.Lock() + defer c.metadataMu.Unlock() + return c.metadataMu.receivedFirstMetadata + }() + require.False(t, receivedFirstMetadata) + expectMetadata(t, c, `tid=5 name="" data=add service=none caps=`) + }) + } +} diff --git a/pkg/kv/kvclient/kvtenant/tenant_range_lookup_test.go b/pkg/kv/kvclient/kvtenant/tenant_range_lookup_test.go index 0cefa3decd19..6e6d5018342b 100644 --- a/pkg/kv/kvclient/kvtenant/tenant_range_lookup_test.go +++ b/pkg/kv/kvclient/kvtenant/tenant_range_lookup_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" ) @@ -29,6 +30,7 @@ import ( // range results are filtered for the client. func TestRangeLookupPrefetchFiltering(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ diff --git a/pkg/kv/kvclient/kvtenant/tenant_scan_range_descriptors_test.go b/pkg/kv/kvclient/kvtenant/tenant_scan_range_descriptors_test.go index db426010e457..8c8b626027c3 100644 --- a/pkg/kv/kvclient/kvtenant/tenant_scan_range_descriptors_test.go +++ b/pkg/kv/kvclient/kvtenant/tenant_scan_range_descriptors_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/rangedesc" "github.com/stretchr/testify/require" ) @@ -51,6 +52,7 @@ func setup( // scan range descriptors iff they correspond to tenant owned ranges. func TestScanRangeDescriptors(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() tc, tenant2, iteratorFactory := setup(t, ctx) @@ -124,6 +126,7 @@ func TestScanRangeDescriptors(t *testing.T) { func TestScanRangeDescriptorsOutsideTenantKeyspace(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() tc, _, iteratorFactory := setup(t, ctx) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 04843aa899f7..e855e58e6746 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -952,11 +952,6 @@ func (ts *TestServer) StartSharedProcessTenant( } } - // Wait for the rangefeed to catch up. - if err := ts.WaitForTenantReadiness(ctx, tenantID); err != nil { - return nil, nil, err - } - if justCreated { // Also mark it for shared-process execution. _, err := ts.InternalExecutor().(*sql.InternalExecutor).ExecEx( @@ -972,6 +967,11 @@ func (ts *TestServer) StartSharedProcessTenant( } } + // Wait for the rangefeed to catch up. + if err := ts.WaitForTenantReadiness(ctx, tenantID); err != nil { + return nil, nil, err + } + // Instantiate the tenant server. s, err := ts.Server.serverController.startAndWaitForRunningServer(ctx, args.TenantName) if err != nil { @@ -1020,6 +1020,11 @@ func (t *TestTenant) HTTPAuthServer() interface{} { // WaitForTenantReadiness is part of TestServerInterface. func (ts *TestServer) WaitForTenantReadiness(ctx context.Context, tenantID roachpb.TenantID) error { + // Restarting the watcher forces a new initial scan which is + // faster than waiting out the closed timestamp interval + // required to see new updates. + ts.node.tenantInfoWatcher.TestingRestart() + log.Infof(ctx, "waiting for rangefeed to catch up with record for tenant %v", tenantID) _, infoWatcher, err := ts.node.waitForTenantWatcherReadiness(ctx) if err != nil { @@ -1220,10 +1225,9 @@ func (ts *TestServer) StartTenant( return nil, err } } else { - // Restart the capabilities watcher. Restarting the - // watcher forces a new initial scan which is faster - // than waiting out the closed timestamp interval - // required to see new updates. + // Restart the capabilities watcher. Restarting the watcher + // forces a new initial scan which is faster than waiting out + // the closed timestamp interval required to see new updates. ts.tenantCapabilitiesWatcher.TestingRestart() if err := testutils.SucceedsSoonError(func() error { capabilities, found := ts.TenantCapabilitiesReader().GetCapabilities(params.TenantID) diff --git a/pkg/sql/schemachanger/sctest/backup.go b/pkg/sql/schemachanger/sctest/backup.go index fca1251cd65f..b6d99054f251 100644 --- a/pkg/sql/schemachanger/sctest/backup.go +++ b/pkg/sql/schemachanger/sctest/backup.go @@ -70,6 +70,10 @@ func BackupSuccessMixedVersion(t *testing.T, path string, factory TestServerFact // and at least as expensive to run. skip.UnderShort(t) + if strings.Contains(path, "alter_table_add_primary_key_drop_rowid") { + skip.WithIssue(t, 107552, "flaky test") + } + factory = factory.WithMixedVersion() cumulativeTestForEachPostCommitStage(t, path, factory, func(t *testing.T, cs CumulativeTestCaseSpec) { backupSuccess(t, factory, cs)