From a10775243b5cac993b52ca2b22a13170fb95967c Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 11 Dec 2023 13:02:38 +0000 Subject: [PATCH] universe: extract fed envoy batch push request handler method This commit extracts a batch push request handler method from the federation envoy event handler routine. The goal of this commit is code simplification and to gain some line length breathing room. --- universe/auto_syncer.go | 109 ++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 762633459..01e36e3a4 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -429,60 +429,18 @@ func (f *FederationEnvoy) syncer() { err) } + // Handle a new batch push request. case pushReq := <-f.batchPushRequests: log.Debug("Federation envoy handling batch push " + "request") - ctx, cancel := f.WithCtxQuitNoTimeout() - - // First, we'll attempt to registrar the proof leaf with - // the local registrar server. - err := f.cfg.LocalRegistrar.UpsertProofLeafBatch( - ctx, pushReq.Batch, - ) - cancel() - if err != nil { - err := fmt.Errorf("unable to insert proof "+ - "batch into local universe: %w", err) - - log.Warnf(err.Error()) - - pushReq.err <- err - continue - } - - // Now that we know we were able to register the proof, - // we'll return back to the caller. - pushReq.resp <- struct{}{} - - // Fetch all universe servers in our federation. - fedServers, err := f.tryFetchServers() + err := f.handleBatchPushRequest(pushReq) if err != nil { - err := fmt.Errorf("unable to fetch "+ - "federation servers: %w", err) - log.Warnf(err.Error()) - pushReq.err <- err - continue - } - - if len(fedServers) == 0 { - log.Warnf("could not find any federation " + - "servers") - continue + // Warn, but don't exit the event handler + // routine. + log.Warnf("Unable to handle batch push "+ + "request: %v", err) } - // With the response sent above, we'll push this out to - // all the Universe servers in the background. - ctxPush, cancelPush := f.WithCtxQuitNoTimeout() - for idx := range pushReq.Batch { - item := pushReq.Batch[idx] - - f.pushProofToFederation( - ctxPush, item.ID, item.Key, item.Leaf, - fedServers, item.LogProofSync, - ) - } - cancelPush() - case <-f.Quit: return } @@ -633,6 +591,61 @@ func (f *FederationEnvoy) handlePushRequest(pushReq *FederationPushReq) error { return nil } +// handleBatchPushRequest is called each time a new batch push request is +// received. It will perform an asynchronous registration with the local +// Universe registrar, then push each leaf from the batch out in an async manner +// to the federation members. +func (f *FederationEnvoy) handleBatchPushRequest( + pushReq *FederationProofBatchPushReq) error { + + if pushReq == nil { + return fmt.Errorf("nil batch push request") + } + + ctx, cancel := f.WithCtxQuitNoTimeout() + defer cancel() + + // First, we'll attempt to registrar the proof leaf with the local + // registrar server. + err := f.cfg.LocalRegistrar.UpsertProofLeafBatch(ctx, pushReq.Batch) + if err != nil { + err = fmt.Errorf("unable to insert proof batch into local "+ + "universe: %w", err) + pushReq.err <- err + return err + } + + // Now that we know we were able to register the proof, we'll return + // back to the caller. + pushReq.resp <- struct{}{} + + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err = fmt.Errorf("unable to fetch federation servers: %w", err) + pushReq.err <- err + return err + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation servers") + return nil + } + + // With the response sent above, we'll push this out to all the Universe + // servers in the background. + for idx := range pushReq.Batch { + item := pushReq.Batch[idx] + + f.pushProofToFederation( + ctx, item.ID, item.Key, item.Leaf, fedServers, + item.LogProofSync, + ) + } + + return nil +} + // UpsertProofLeaf upserts a proof leaf within the target universe tree. This // can be used to first push out a new update to the local registrar, // ultimately queuing it to also be sent to the set of active universe servers.