Skip to content

Commit

Permalink
Merge pull request #108202 from knz/backport23.1-105660
Browse files Browse the repository at this point in the history
release-23.1: kvtenant: relax the startup synchronization condition
  • Loading branch information
knz authored Aug 5, 2023
2 parents a17f8e8 + 796865d commit aad4fd4
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
32 changes: 27 additions & 5 deletions pkg/kv/kvclient/kvtenant/setting_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
c.tryForgetClient(ctx, client)
continue
}

// Reset the sentinel checks. We start a new sequence of messages
// from the server every time we (re)connect.
func() {
c.settingsMu.Lock()
defer c.settingsMu.Unlock()
c.settingsMu.receivedFirstAllTenantOverrides = false
c.settingsMu.receivedFirstSpecificOverrides = false
}()

for {
e, err := stream.Recv()
if err != nil {
Expand All @@ -51,7 +61,8 @@ func (c *connector) runTenantSettingsSubscription(ctx context.Context, startupCh
}
if e.Error != (errorspb.EncodedError{}) {
// Hard logical error. We expect io.EOF next.
log.Errorf(ctx, "error consuming TenantSettings RPC: %v", e.Error)
err := errors.DecodeError(ctx, e.Error)
log.Errorf(ctx, "error consuming TenantSettings RPC: %v", err)
continue
}

Expand Down Expand Up @@ -83,18 +94,29 @@ func (c *connector) processSettingsEvent(
c.settingsMu.Lock()
defer c.settingsMu.Unlock()

if (!c.settingsMu.receivedFirstAllTenantOverrides || !c.settingsMu.receivedFirstSpecificOverrides) && e.Incremental {
return false, errors.Newf("need to receive non-incremental setting events first")
}

var m map[string]settings.EncodedValue
switch e.Precedence {
case kvpb.TenantSettingsEvent_ALL_TENANTS_OVERRIDES:
if !c.settingsMu.receivedFirstAllTenantOverrides && e.Incremental {
return false, errors.Newf(
"need to receive non-incremental setting event first for precedence %v",
e.Precedence,
)
}

c.settingsMu.receivedFirstAllTenantOverrides = true
m = c.settingsMu.allTenantOverrides

case kvpb.TenantSettingsEvent_TENANT_SPECIFIC_OVERRIDES:
if !c.settingsMu.receivedFirstSpecificOverrides && e.Incremental {
return false, errors.Newf(
"need to receive non-incremental setting events first for precedence %v",
e.Precedence,
)
}
c.settingsMu.receivedFirstSpecificOverrides = true
m = c.settingsMu.specificOverrides

default:
return false, errors.Newf("unknown precedence value %d", e.Precedence)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,9 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) {
if key == versionSettingKey {
var newVersion clusterversion.ClusterVersion
if err := protoutil.Unmarshal([]byte(val.Value), &newVersion); err != nil {
log.Warningf(ctx, "ignoring invalid cluster version: %newVersion - "+
"the lack of a refreshed storage cluster version in a secondary tenant may prevent tenant upgrade", err)
log.Warningf(ctx, "ignoring invalid cluster version: %s - %v\n"+
"Note: the lack of a refreshed storage cluster version in a secondary tenant may prevent tenant upgrade.",
newVersion, err)
} else {
// We don't want to fully process the override in the case
// where we're dealing with the "version" setting, as we want
Expand Down

0 comments on commit aad4fd4

Please sign in to comment.