Skip to content

Commit

Permalink
todo: fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Mar 16, 2021
1 parent 8f8d2a3 commit 4b84c14
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 45 deletions.
76 changes: 45 additions & 31 deletions go/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,40 @@ func (r *runtime) stop() {
r.history.Close()
}

func (r *runtime) updateActiveDescriptor(ctx context.Context) bool {
state, err := r.consensus.RootHash().GetRuntimeState(ctx, r.id, consensus.HeightLatest)
if err != nil {
r.logger.Error("querying roothash state",
"err", err,
)
return false
}

h := hash.NewFrom(state.Runtime)
// This is only called from the watchUpdates thread and activeDescriptorHash
// is only mutated bellow, so no need for a lock here.
if h.Equal(&r.activeDescriptorHash) {
r.logger.Debug("active runtime descriptor didn't change",
"runtime", state.Runtime,
"hash", h,
)
return false
}

r.logger.Debug("updating active runtime descriptor",
"runtime", state.Runtime,
"hash", h,
)
r.Lock()
r.activeDescriptor = state.Runtime
r.activeDescriptorHash = h
r.Unlock()

r.activeDescriptorNotifier.Broadcast(state.Runtime)

return true
}

func (r *runtime) watchUpdates(
ctx context.Context,
epoCh <-chan beacon.EpochTime,
Expand All @@ -280,46 +314,18 @@ func (r *runtime) watchUpdates(
case <-ctx.Done():
return
case <-epoCh:
state, err := r.consensus.RootHash().GetRuntimeState(ctx, r.id, consensus.HeightLatest)
if err != nil {
r.logger.Error("querying roothash state",
"err", err,
)
continue
}

h := hash.NewFrom(state.Runtime)
// Only mutated bellow, so no need for a lock here.
if h.Equal(&r.activeDescriptorHash) {
r.logger.Debug("active runtime descriptor didn't change",
"runtime", state.Runtime,
"hash", h,
)
continue
}

r.logger.Debug("updating active runtime descriptor",
"runtime", state.Runtime,
"hash", h,
)
r.Lock()
r.activeDescriptor = state.Runtime
r.activeDescriptorHash = h
r.Unlock()

if !activeInitialized {
if up := r.updateActiveDescriptor(ctx); up && !activeInitialized {
close(r.activeDescriptorCh)
activeInitialized = true
}

r.activeDescriptorNotifier.Broadcast(state.Runtime)
case rt := <-regCh:
if !rt.ID.Equal(&r.id) {
continue
}

r.logger.Debug("updating registry runtime descriptor",
"runtime", rt,
"kind", rt.Kind,
)

r.Lock()
Expand All @@ -330,8 +336,16 @@ func (r *runtime) watchUpdates(
close(r.registryDescriptorCh)
regInitialized = true
}

r.registryDescriptorNotifier.Broadcast(rt)

// If this is a compute runtime and the active descriptor is not
// initialized, update the active descriptor.
if !activeInitialized && rt.Kind == registry.KindCompute {
if up := r.updateActiveDescriptor(ctx); up && !activeInitialized {
close(r.activeDescriptorCh)
activeInitialized = true
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (n *Node) worker() {
n.logger.Info("consensus has finished initial synchronization")

// Wait for the runtime.
rt, err := n.Runtime.RegistryDescriptor(n.ctx)
rt, err := n.Runtime.ActiveDescriptor(n.ctx)
if err != nil {
n.logger.Error("failed to wait for registry descriptor",
"err", err,
Expand Down
2 changes: 1 addition & 1 deletion go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ func (n *Node) worker() {
// is the version at the last epoch transition.
// This case will be handled by the first tick from the descriptor updates
// channel bellow, which will update the parameters.
runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx)
runtime, err := n.commonNode.Runtime.ActiveDescriptor(n.ctx)
if err != nil {
n.logger.Error("failed to fetch runtime registry descriptor",
"err", err,
Expand Down
13 changes: 2 additions & 11 deletions go/worker/storage/committee/checkpoint_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,18 +379,9 @@ func (n *Node) syncCheckpoints() (*blockSummary, error) {
var syncState blockSummary

// Try getting the active descriptor first.
adCtx, cancel := context.WithTimeout(n.ctx, 2*time.Second)
descriptor, err := n.commonNode.Runtime.ActiveDescriptor(adCtx)
cancel()
descriptor, err := n.commonNode.Runtime.ActiveDescriptor(n.ctx)
if err != nil {
n.logger.Warn("couldn't get active runtime descriptor, trying registry descriptor",
"err", err,
)
// Otherwise get the descriptor from registry.
descriptor, err = n.commonNode.Runtime.RegistryDescriptor(n.ctx)
if err != nil {
return nil, fmt.Errorf("can't get runtime descriptor: %w", err)
}
return nil, fmt.Errorf("can't get runtime descriptor: %w", err)
}

// Fetch metadata from the current committee.
Expand Down
2 changes: 1 addition & 1 deletion go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func (n *Node) worker() { // nolint: gocyclo
// Initialize genesis from the runtime descriptor.
if cachedLastRound == n.undefinedRound {
var rt *registryApi.Runtime
rt, err = n.commonNode.Runtime.RegistryDescriptor(n.ctx)
rt, err = n.commonNode.Runtime.ActiveDescriptor(n.ctx)
if err != nil {
n.logger.Error("failed to retrieve runtime registry descriptor",
"err", err,
Expand Down

0 comments on commit 4b84c14

Please sign in to comment.