Skip to content

Commit

Permalink
universe: extract fed envoy push request handler method
Browse files Browse the repository at this point in the history
This commit extracts a push request handler method from the federation
envoy event handler routine. The goal of this commit is code
simplification and to gain some line length breathing room.
  • Loading branch information
ffranr committed Dec 11, 2023
1 parent a5d9aca commit e606f1f
Showing 1 changed file with 75 additions and 68 deletions.
143 changes: 75 additions & 68 deletions universe/auto_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,79 +417,18 @@ func (f *FederationEnvoy) syncer() {
err)
}

// A new push request has just arrived. We'll perform a
// asynchronous registration with the local Universe registrar,
// then push it out in an async manner to the federation
// members.
// Handle a new push request.
case pushReq := <-f.pushRequests:
log.Debug("Federation envoy handling push request")
ctx, cancel := f.WithCtxQuit()

// First, we'll attempt to registrar the proof leaf with
// the local registrar server.
newProof, err := f.cfg.LocalRegistrar.UpsertProofLeaf(
ctx, pushReq.ID, pushReq.Key, pushReq.Leaf,
)
cancel()
if err != nil {
err := fmt.Errorf("unable to insert proof "+
"into local universe: %w", err)

log.Warnf(err.Error())

pushReq.err <- err
continue
}

// Now that we know we were able to register the proof,
// we'll return back to the caller, and push the new
// proof out to the federation in the background.
pushReq.resp <- newProof

// Fetch all universe servers in our federation.
fedServers, err := f.tryFetchServers()
err := f.handlePushRequest(pushReq)
if err != nil {
err := fmt.Errorf("unable to fetch "+
"federation servers: %w", err)
log.Warnf(err.Error())
pushReq.err <- err
continue
}

if len(fedServers) == 0 {
log.Warnf("could not find any federation " +
"servers")
continue
}

if pushReq.LogProofSync {
// We are attempting to sync using the
// logged proof sync procedure. We will
// therefore narrow down the set of target
// servers based on the sync log. Only servers
// that are not yet push sync complete will be
// targeted.
fedServers, err = f.filterProofSyncPending(
fedServers, pushReq.ID, pushReq.Key,
)
if err != nil {
log.Warnf("failed to filter " +
"federation servers")
continue
}
// Warn, but don't exit the syncer. The syncer
// should continue to run and attempt handle
// more events.
log.Warnf("Unable to handle push request: %v",
err)
}

// With the response sent above, we'll push this
// out to all the Universe servers in the
// background.
ctxPush, cancelPush := f.WithCtxQuitNoTimeout()
f.pushProofToFederation(
ctxPush, pushReq.ID, pushReq.Key,
pushReq.Leaf, fedServers,
pushReq.LogProofSync,
)
cancelPush()

case pushReq := <-f.batchPushRequests:
log.Debug("Federation envoy handling batch push " +
"request")
Expand Down Expand Up @@ -626,6 +565,74 @@ func (f *FederationEnvoy) handleTickEvent() error {
return nil
}

// handlePushRequest is called each time a new push request is received. It will
// perform an asynchronous registration with the local Universe registrar, then
// push the proof leaf out in an async manner to the federation members.
func (f *FederationEnvoy) handlePushRequest(pushReq *FederationPushReq) error {
if pushReq == nil {
return fmt.Errorf("nil push request")
}

// First, we'll attempt to registrar the proof leaf with the local
// registrar server.
ctx, cancel := f.WithCtxQuit()
defer cancel()
newProof, err := f.cfg.LocalRegistrar.UpsertProofLeaf(
ctx, pushReq.ID, pushReq.Key, pushReq.Leaf,
)
if err != nil {
err = fmt.Errorf("unable to insert proof into local "+
"universe: %w", err)
pushReq.err <- err
return err
}

// Now that we know we were able to register the proof, we'll return
// back to the caller, and push the new proof out to the federation in
// the background.
pushReq.resp <- newProof

// Fetch all universe servers in our federation.
fedServers, err := f.tryFetchServers()
if err != nil {
err = fmt.Errorf("unable to fetch federation servers: %w", err)
pushReq.err <- err
return err
}

if len(fedServers) == 0 {
log.Warnf("could not find any federation servers")
return nil
}

if pushReq.LogProofSync {
// We are attempting to sync using the logged proof sync
// procedure. We will therefore narrow down the set of target
// servers based on the sync log. Only servers that are not yet
// push sync complete will be targeted.
fedServers, err = f.filterProofSyncPending(
fedServers, pushReq.ID, pushReq.Key,
)
if err != nil {
err = fmt.Errorf("failed to filter federation "+
"servers: %w", err)
pushReq.err <- err
return err
}
}

// With the response sent above, we'll push this out to all the Universe
// servers in the background.
ctx, cancel = f.WithCtxQuitNoTimeout()
defer cancel()
f.pushProofToFederation(
ctx, pushReq.ID, pushReq.Key, pushReq.Leaf, fedServers,
pushReq.LogProofSync,
)

return nil
}

// UpsertProofLeaf upserts a proof leaf within the target universe tree. This
// can be used to first push out a new update to the local registrar,
// ultimately queuing it to also be sent to the set of active universe servers.
Expand Down

0 comments on commit e606f1f

Please sign in to comment.