From 55fde19c7cf1ac10794e4c7de3d919808318fe29 Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 11 Dec 2023 12:38:08 +0000 Subject: [PATCH 1/3] universe: extract fed envoy tick handler method This commit extracts a tick handler method from the federation envoy event handler routine. The goal of this commit is code simplification and to gain some line length breathing room. --- universe/auto_syncer.go | 160 +++++++++++++++++++++------------------- 1 file changed, 83 insertions(+), 77 deletions(-) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 688b599b3..12be88522 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -403,88 +403,18 @@ func (f *FederationEnvoy) syncer() { syncTicker := time.NewTicker(f.cfg.SyncInterval) defer syncTicker.Stop() - // We'll use a timeout that's slightly less than the sync interval to - // help avoid ticking into a new sync event before the previous event - // has finished. - syncContextTimeout := f.cfg.SyncInterval - 1*time.Second - if syncContextTimeout < 0 { - // If the sync interval is less than a second, then we'll use - // the sync interval as the timeout. - syncContextTimeout = f.cfg.SyncInterval - } - for { select { - // A new sync event has just been triggered, so we'll attempt - // to synchronize state with all the active universe servers in - // the federation. + // Handle a new sync tick event. case <-syncTicker.C: log.Debug("Federation envoy handling new tick event") - - // Error propagation is handled in tryFetchServers, we - // only need to exit here. - fedServers, err := f.tryFetchServers() + err := f.handleTickEvent() if err != nil { - log.Warnf("unable to fetch set of universe "+ - "servers: %v", err) - continue - } - - log.Infof("Synchronizing with %v federation members", - len(fedServers)) - err = f.SyncServers(fedServers) - if err != nil { - log.Warnf("unable to sync with federation "+ - "server: %v", err) - continue - } - - // After we've synced with the federation, we'll - // attempt to push out any pending proofs that we - // haven't yet completed. - ctxFetchLog, cancelFetchLog := f.WithCtxQuitNoTimeout() - syncDirection := SyncDirectionPush - db := f.cfg.FederationDB - logEntries, err := db.FetchPendingProofsSyncLog( - ctxFetchLog, &syncDirection, - ) - cancelFetchLog() - if err != nil { - log.Warnf("unable to query pending push "+ - "sync log: %w", err) - continue - } - - if len(logEntries) > 0 { - log.Debugf("Handling pending proof sync log "+ - "entries (entries_count=%d)", - len(logEntries)) - } - - // TODO(ffranr): Take account of any new servers that - // have been added since the last time we populated the - // log for a given proof leaf. Pending proof sync log - // entries are only relevant for the set of servers - // that existed at the time the log entry was created. - // If a new server is added, then we should create a - // new log entry for the new server. - - for idx := range logEntries { - entry := logEntries[idx] - - servers := []ServerAddr{ - entry.ServerAddr, - } - - ctxPush, cancelPush := - f.CtxBlockingCustomTimeout( - syncContextTimeout, - ) - f.pushProofToFederation( - ctxPush, entry.UniID, entry.LeafKey, - &entry.Leaf, servers, true, - ) - cancelPush() + // Warn, but don't exit the syncer. The syncer + // should continue to run and attempt handle + // more events. + log.Warnf("Unable to handle tick event: %v", + err) } // A new push request has just arrived. We'll perform a @@ -620,6 +550,82 @@ func (f *FederationEnvoy) syncer() { } } +// handleTickEvent is called each time the sync ticker fires. It will attempt +// to synchronize state with all the active universe servers in the federation. +func (f *FederationEnvoy) handleTickEvent() error { + // Error propagation is handled in tryFetchServers, we only need to exit + // here. + fedServers, err := f.tryFetchServers() + if err != nil { + return fmt.Errorf("unable to fetch set of universe servers: "+ + "%w", err) + } + + log.Infof("Synchronizing with %v federation members", len(fedServers)) + err = f.SyncServers(fedServers) + if err != nil { + return fmt.Errorf("unable to sync with federation server: %w", + err) + } + + // After we've synced with the federation, we'll attempt to push out any + // pending proofs that we haven't yet completed. + ctx, cancel := f.WithCtxQuitNoTimeout() + defer cancel() + + syncDirection := SyncDirectionPush + db := f.cfg.FederationDB + + logEntries, err := db.FetchPendingProofsSyncLog( + ctx, &syncDirection, + ) + if err != nil { + return fmt.Errorf("unable to query pending push sync log: %w", + err) + } + + if len(logEntries) > 0 { + log.Debugf("Handling pending proof sync log entries "+ + "(entries_count=%d)", len(logEntries)) + } + + // TODO(ffranr): Take account of any new servers that have been added + // since the last time we populated the log for a given proof leaf. + // Pending proof sync log entries are only relevant for the set of + // servers that existed at the time the log entry was created. If a new + // server is added, then we should create a new log entry for the new + // server. + + // We'll use a timeout that's slightly less than the sync interval to + // help avoid ticking into a new sync event before the previous event + // has finished. + syncContextTimeout := f.cfg.SyncInterval - 1*time.Second + if syncContextTimeout < 0 { + // If the sync interval is less than a second, then we'll use + // the sync interval as the timeout. + syncContextTimeout = f.cfg.SyncInterval + } + + for idx := range logEntries { + entry := logEntries[idx] + + servers := []ServerAddr{ + entry.ServerAddr, + } + + ctxPush, cancelPush := f.CtxBlockingCustomTimeout( + syncContextTimeout, + ) + f.pushProofToFederation( + ctxPush, entry.UniID, entry.LeafKey, &entry.Leaf, + servers, true, + ) + cancelPush() + } + + return nil +} + // UpsertProofLeaf upserts a proof leaf within the target universe tree. This // can be used to first push out a new update to the local registrar, // ultimately queuing it to also be sent to the set of active universe servers. From c2d662a6960499687bd412bd5d6ac38aea5a0c36 Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 11 Dec 2023 12:51:49 +0000 Subject: [PATCH 2/3] universe: extract fed envoy push request handler method This commit extracts a push request handler method from the federation envoy event handler routine. The goal of this commit is code simplification and to gain some line length breathing room. --- universe/auto_syncer.go | 143 +++++++++++++++++++++------------------- 1 file changed, 75 insertions(+), 68 deletions(-) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 12be88522..762633459 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -417,79 +417,18 @@ func (f *FederationEnvoy) syncer() { err) } - // A new push request has just arrived. We'll perform a - // asynchronous registration with the local Universe registrar, - // then push it out in an async manner to the federation - // members. + // Handle a new push request. case pushReq := <-f.pushRequests: log.Debug("Federation envoy handling push request") - ctx, cancel := f.WithCtxQuit() - - // First, we'll attempt to registrar the proof leaf with - // the local registrar server. - newProof, err := f.cfg.LocalRegistrar.UpsertProofLeaf( - ctx, pushReq.ID, pushReq.Key, pushReq.Leaf, - ) - cancel() - if err != nil { - err := fmt.Errorf("unable to insert proof "+ - "into local universe: %w", err) - - log.Warnf(err.Error()) - - pushReq.err <- err - continue - } - - // Now that we know we were able to register the proof, - // we'll return back to the caller, and push the new - // proof out to the federation in the background. - pushReq.resp <- newProof - - // Fetch all universe servers in our federation. - fedServers, err := f.tryFetchServers() + err := f.handlePushRequest(pushReq) if err != nil { - err := fmt.Errorf("unable to fetch "+ - "federation servers: %w", err) - log.Warnf(err.Error()) - pushReq.err <- err - continue - } - - if len(fedServers) == 0 { - log.Warnf("could not find any federation " + - "servers") - continue - } - - if pushReq.LogProofSync { - // We are attempting to sync using the - // logged proof sync procedure. We will - // therefore narrow down the set of target - // servers based on the sync log. Only servers - // that are not yet push sync complete will be - // targeted. - fedServers, err = f.filterProofSyncPending( - fedServers, pushReq.ID, pushReq.Key, - ) - if err != nil { - log.Warnf("failed to filter " + - "federation servers") - continue - } + // Warn, but don't exit the syncer. The syncer + // should continue to run and attempt handle + // more events. + log.Warnf("Unable to handle push request: %v", + err) } - // With the response sent above, we'll push this - // out to all the Universe servers in the - // background. - ctxPush, cancelPush := f.WithCtxQuitNoTimeout() - f.pushProofToFederation( - ctxPush, pushReq.ID, pushReq.Key, - pushReq.Leaf, fedServers, - pushReq.LogProofSync, - ) - cancelPush() - case pushReq := <-f.batchPushRequests: log.Debug("Federation envoy handling batch push " + "request") @@ -626,6 +565,74 @@ func (f *FederationEnvoy) handleTickEvent() error { return nil } +// handlePushRequest is called each time a new push request is received. It will +// perform an asynchronous registration with the local Universe registrar, then +// push the proof leaf out in an async manner to the federation members. +func (f *FederationEnvoy) handlePushRequest(pushReq *FederationPushReq) error { + if pushReq == nil { + return fmt.Errorf("nil push request") + } + + // First, we'll attempt to registrar the proof leaf with the local + // registrar server. + ctx, cancel := f.WithCtxQuit() + defer cancel() + newProof, err := f.cfg.LocalRegistrar.UpsertProofLeaf( + ctx, pushReq.ID, pushReq.Key, pushReq.Leaf, + ) + if err != nil { + err = fmt.Errorf("unable to insert proof into local "+ + "universe: %w", err) + pushReq.err <- err + return err + } + + // Now that we know we were able to register the proof, we'll return + // back to the caller, and push the new proof out to the federation in + // the background. + pushReq.resp <- newProof + + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err = fmt.Errorf("unable to fetch federation servers: %w", err) + pushReq.err <- err + return err + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation servers") + return nil + } + + if pushReq.LogProofSync { + // We are attempting to sync using the logged proof sync + // procedure. We will therefore narrow down the set of target + // servers based on the sync log. Only servers that are not yet + // push sync complete will be targeted. + fedServers, err = f.filterProofSyncPending( + fedServers, pushReq.ID, pushReq.Key, + ) + if err != nil { + err = fmt.Errorf("failed to filter federation "+ + "servers: %w", err) + pushReq.err <- err + return err + } + } + + // With the response sent above, we'll push this out to all the Universe + // servers in the background. + ctx, cancel = f.WithCtxQuitNoTimeout() + defer cancel() + f.pushProofToFederation( + ctx, pushReq.ID, pushReq.Key, pushReq.Leaf, fedServers, + pushReq.LogProofSync, + ) + + return nil +} + // UpsertProofLeaf upserts a proof leaf within the target universe tree. This // can be used to first push out a new update to the local registrar, // ultimately queuing it to also be sent to the set of active universe servers. From 52ac13fb30cd8584cb41f052bdc714b11c59853a Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 11 Dec 2023 13:02:38 +0000 Subject: [PATCH 3/3] universe: extract fed envoy batch push request handler method This commit extracts a batch push request handler method from the federation envoy event handler routine. The goal of this commit is code simplification and to gain some line length breathing room. --- universe/auto_syncer.go | 109 ++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 762633459..01e36e3a4 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -429,60 +429,18 @@ func (f *FederationEnvoy) syncer() { err) } + // Handle a new batch push request. case pushReq := <-f.batchPushRequests: log.Debug("Federation envoy handling batch push " + "request") - ctx, cancel := f.WithCtxQuitNoTimeout() - - // First, we'll attempt to registrar the proof leaf with - // the local registrar server. - err := f.cfg.LocalRegistrar.UpsertProofLeafBatch( - ctx, pushReq.Batch, - ) - cancel() - if err != nil { - err := fmt.Errorf("unable to insert proof "+ - "batch into local universe: %w", err) - - log.Warnf(err.Error()) - - pushReq.err <- err - continue - } - - // Now that we know we were able to register the proof, - // we'll return back to the caller. - pushReq.resp <- struct{}{} - - // Fetch all universe servers in our federation. - fedServers, err := f.tryFetchServers() + err := f.handleBatchPushRequest(pushReq) if err != nil { - err := fmt.Errorf("unable to fetch "+ - "federation servers: %w", err) - log.Warnf(err.Error()) - pushReq.err <- err - continue - } - - if len(fedServers) == 0 { - log.Warnf("could not find any federation " + - "servers") - continue + // Warn, but don't exit the event handler + // routine. + log.Warnf("Unable to handle batch push "+ + "request: %v", err) } - // With the response sent above, we'll push this out to - // all the Universe servers in the background. - ctxPush, cancelPush := f.WithCtxQuitNoTimeout() - for idx := range pushReq.Batch { - item := pushReq.Batch[idx] - - f.pushProofToFederation( - ctxPush, item.ID, item.Key, item.Leaf, - fedServers, item.LogProofSync, - ) - } - cancelPush() - case <-f.Quit: return } @@ -633,6 +591,61 @@ func (f *FederationEnvoy) handlePushRequest(pushReq *FederationPushReq) error { return nil } +// handleBatchPushRequest is called each time a new batch push request is +// received. It will perform an asynchronous registration with the local +// Universe registrar, then push each leaf from the batch out in an async manner +// to the federation members. +func (f *FederationEnvoy) handleBatchPushRequest( + pushReq *FederationProofBatchPushReq) error { + + if pushReq == nil { + return fmt.Errorf("nil batch push request") + } + + ctx, cancel := f.WithCtxQuitNoTimeout() + defer cancel() + + // First, we'll attempt to registrar the proof leaf with the local + // registrar server. + err := f.cfg.LocalRegistrar.UpsertProofLeafBatch(ctx, pushReq.Batch) + if err != nil { + err = fmt.Errorf("unable to insert proof batch into local "+ + "universe: %w", err) + pushReq.err <- err + return err + } + + // Now that we know we were able to register the proof, we'll return + // back to the caller. + pushReq.resp <- struct{}{} + + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err = fmt.Errorf("unable to fetch federation servers: %w", err) + pushReq.err <- err + return err + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation servers") + return nil + } + + // With the response sent above, we'll push this out to all the Universe + // servers in the background. + for idx := range pushReq.Batch { + item := pushReq.Batch[idx] + + f.pushProofToFederation( + ctx, item.ID, item.Key, item.Leaf, fedServers, + item.LogProofSync, + ) + } + + return nil +} + // UpsertProofLeaf upserts a proof leaf within the target universe tree. This // can be used to first push out a new update to the local registrar, // ultimately queuing it to also be sent to the set of active universe servers.