Skip to content

Commit

Permalink
universe: extract fed envoy batch push request handler method
Browse files Browse the repository at this point in the history
This commit extracts a batch push request handler method from the
federation envoy event handler routine. The goal of this commit is code
simplification and to gain some line length breathing room.
  • Loading branch information
ffranr committed Dec 11, 2023
1 parent e606f1f commit a107752
Showing 1 changed file with 61 additions and 48 deletions.
109 changes: 61 additions & 48 deletions universe/auto_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,60 +429,18 @@ func (f *FederationEnvoy) syncer() {
err)
}

// Handle a new batch push request.
case pushReq := <-f.batchPushRequests:
log.Debug("Federation envoy handling batch push " +
"request")
ctx, cancel := f.WithCtxQuitNoTimeout()

// First, we'll attempt to registrar the proof leaf with
// the local registrar server.
err := f.cfg.LocalRegistrar.UpsertProofLeafBatch(
ctx, pushReq.Batch,
)
cancel()
if err != nil {
err := fmt.Errorf("unable to insert proof "+
"batch into local universe: %w", err)

log.Warnf(err.Error())

pushReq.err <- err
continue
}

// Now that we know we were able to register the proof,
// we'll return back to the caller.
pushReq.resp <- struct{}{}

// Fetch all universe servers in our federation.
fedServers, err := f.tryFetchServers()
err := f.handleBatchPushRequest(pushReq)
if err != nil {
err := fmt.Errorf("unable to fetch "+
"federation servers: %w", err)
log.Warnf(err.Error())
pushReq.err <- err
continue
}

if len(fedServers) == 0 {
log.Warnf("could not find any federation " +
"servers")
continue
// Warn, but don't exit the event handler
// routine.
log.Warnf("Unable to handle batch push "+
"request: %v", err)
}

// With the response sent above, we'll push this out to
// all the Universe servers in the background.
ctxPush, cancelPush := f.WithCtxQuitNoTimeout()
for idx := range pushReq.Batch {
item := pushReq.Batch[idx]

f.pushProofToFederation(
ctxPush, item.ID, item.Key, item.Leaf,
fedServers, item.LogProofSync,
)
}
cancelPush()

case <-f.Quit:
return
}
Expand Down Expand Up @@ -633,6 +591,61 @@ func (f *FederationEnvoy) handlePushRequest(pushReq *FederationPushReq) error {
return nil
}

// handleBatchPushRequest is called each time a new batch push request is
// received. It will perform an asynchronous registration with the local
// Universe registrar, then push each leaf from the batch out in an async manner
// to the federation members.
func (f *FederationEnvoy) handleBatchPushRequest(
pushReq *FederationProofBatchPushReq) error {

if pushReq == nil {
return fmt.Errorf("nil batch push request")
}

ctx, cancel := f.WithCtxQuitNoTimeout()
defer cancel()

// First, we'll attempt to registrar the proof leaf with the local
// registrar server.
err := f.cfg.LocalRegistrar.UpsertProofLeafBatch(ctx, pushReq.Batch)
if err != nil {
err = fmt.Errorf("unable to insert proof batch into local "+
"universe: %w", err)
pushReq.err <- err
return err
}

// Now that we know we were able to register the proof, we'll return
// back to the caller.
pushReq.resp <- struct{}{}

// Fetch all universe servers in our federation.
fedServers, err := f.tryFetchServers()
if err != nil {
err = fmt.Errorf("unable to fetch federation servers: %w", err)
pushReq.err <- err
return err
}

if len(fedServers) == 0 {
log.Warnf("could not find any federation servers")
return nil
}

// With the response sent above, we'll push this out to all the Universe
// servers in the background.
for idx := range pushReq.Batch {
item := pushReq.Batch[idx]

f.pushProofToFederation(
ctx, item.ID, item.Key, item.Leaf, fedServers,
item.LogProofSync,
)
}

return nil
}

// UpsertProofLeaf upserts a proof leaf within the target universe tree. This
// can be used to first push out a new update to the local registrar,
// ultimately queuing it to also be sent to the set of active universe servers.
Expand Down

0 comments on commit a107752

Please sign in to comment.