diff --git a/pkg/kv/kvclient/kvtenant/setting_overrides.go b/pkg/kv/kvclient/kvtenant/setting_overrides.go index 278d88cf3f19..bb8a9fb5d1ee 100644 --- a/pkg/kv/kvclient/kvtenant/setting_overrides.go +++ b/pkg/kv/kvclient/kvtenant/setting_overrides.go @@ -38,6 +38,16 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh c.tryForgetClient(ctx, client) continue } + + // Reset the sentinel checks. We start a new sequence of messages + // from the server every time we (re)connect. + func() { + c.settingsMu.Lock() + defer c.settingsMu.Unlock() + c.settingsMu.receivedFirstAllTenantOverrides = false + c.settingsMu.receivedFirstSpecificOverrides = false + }() + for { e, err := stream.Recv() if err != nil { @@ -51,7 +61,8 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh } if e.Error != (errorspb.EncodedError{}) { // Hard logical error. We expect io.EOF next. - log.Errorf(ctx, "error consuming TenantSettings RPC: %v", e.Error) + err := errors.DecodeError(ctx, e.Error) + log.Errorf(ctx, "error consuming TenantSettings RPC: %v", err) continue } @@ -83,18 +94,29 @@ func (c *connector) processSettingsEvent( c.settingsMu.Lock() defer c.settingsMu.Unlock() - if (!c.settingsMu.receivedFirstAllTenantOverrides || !c.settingsMu.receivedFirstSpecificOverrides) && e.Incremental { - return false, errors.Newf("need to receive non-incremental setting events first") - } - var m map[string]settings.EncodedValue switch e.Precedence { case kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES: + if !c.settingsMu.receivedFirstAllTenantOverrides && e.Incremental { + return false, errors.Newf( + "need to receive non-incremental setting event first for precedence %v", + e.Precedence, + ) + } + c.settingsMu.receivedFirstAllTenantOverrides = true m = c.settingsMu.allTenantOverrides + case kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES: + if !c.settingsMu.receivedFirstSpecificOverrides && e.Incremental { + return false, errors.Newf( + "need to receive non-incremental setting events first for precedence %v", + e.Precedence, + ) + } c.settingsMu.receivedFirstSpecificOverrides = true m = c.settingsMu.specificOverrides + default: return false, errors.Newf("unknown precedence value %d", e.Precedence) } diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 0154c5bd864b..2a2997006632 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -366,8 +366,9 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) { if key == versionSettingKey { var newVersion clusterversion.ClusterVersion if err := protoutil.Unmarshal([]byte(val.Value), &newVersion); err != nil { - log.Warningf(ctx, "ignoring invalid cluster version: %newVersion - "+ - "the lack of a refreshed storage cluster version in a secondary tenant may prevent tenant upgrade", err) + log.Warningf(ctx, "ignoring invalid cluster version: %s - %v\n"+ + "Note: the lack of a refreshed storage cluster version in a secondary tenant may prevent tenant upgrade.", + newVersion, err) } else { // We don't want to fully process the override in the case // where we're dealing with the "version" setting, as we want