Skip to content

Commit

Permalink
Merge branch 'vishal/connection_manager' of github.com:onflow/flow-go…
Browse files Browse the repository at this point in the history
… into vishal/connection_manager
  • Loading branch information
vishalchangrani committed Jun 15, 2021
2 parents ea0b075 + f2b0ce7 commit 08e9413
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 16 deletions.
1 change: 1 addition & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type EngineMetrics interface {

type ComplianceMetrics interface {
FinalizedHeight(height uint64)
CommittedEpochFinalView(view uint64)
SealedHeight(height uint64)
BlockFinalized(*flow.Block)
BlockSealed(*flow.Block)
Expand Down
12 changes: 12 additions & 0 deletions module/metrics/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ type ComplianceCollector struct {
sealedPayload *prometheus.CounterVec
lastBlockFinalizedAt time.Time
finalizedBlocksPerSecond prometheus.Summary
committedEpochFinalView prometheus.Gauge
}

func NewComplianceCollector() *ComplianceCollector {

cc := &ComplianceCollector{

committedEpochFinalView: promauto.NewGauge(prometheus.GaugeOpts{
Name: "committed_epoch_final_view",
Namespace: namespaceConsensus,
Subsystem: subsystemCompliance,
Help: "the final view of the committed epoch with the greatest counter",
}),

finalizedHeight: promauto.NewGauge(prometheus.GaugeOpts{
Name: "finalized_height",
Namespace: namespaceConsensus,
Expand Down Expand Up @@ -128,3 +136,7 @@ func (cc *ComplianceCollector) BlockSealed(block *flow.Block) {
func (cc *ComplianceCollector) BlockProposalDuration(duration time.Duration) {
cc.blockProposalDuration.Add(duration.Seconds())
}

func (cc *ComplianceCollector) CommittedEpochFinalView(view uint64) {
cc.committedEpochFinalView.Set(float64(view))
}
1 change: 1 addition & 0 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (nc *NoopCollector) BlockProposed(*flow.Block)
func (nc *NoopCollector) BlockFinalized(*flow.Block) {}
func (nc *NoopCollector) BlockSealed(*flow.Block) {}
func (nc *NoopCollector) BlockProposalDuration(duration time.Duration) {}
func (nc *NoopCollector) CommittedEpochFinalView(view uint64) {}
func (nc *NoopCollector) CacheEntries(resource string, entries uint) {}
func (nc *NoopCollector) CacheHit(resource string) {}
func (nc *NoopCollector) CacheNotFound(resource string) {}
Expand Down
5 changes: 5 additions & 0 deletions module/mock/compliance_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions state/protocol/badger/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
// the non-consensus nodes have to perform.
type FollowerState struct {
*State

index storage.Index
payloads storage.Payloads
tracer module.Tracer
Expand Down Expand Up @@ -502,13 +503,13 @@ func (m *FollowerState) Finalize(blockID flow.Identifier) error {
if err != nil {
return fmt.Errorf("could not retrieve epoch state: %w", err)
}
setup, err := m.epoch.setups.ByID(epochStatus.CurrentEpoch.SetupID)
currentEpochSetup, err := m.epoch.setups.ByID(epochStatus.CurrentEpoch.SetupID)
if err != nil {
return fmt.Errorf("could not retrieve setup event for current epoch: %w", err)
}

payload := block.Payload
// track protocol events that should be emitted
// track service event driven metrics and protocol events that should be emitted
var events []func()
for _, seal := range payload.Seals {
result, err := m.results.ByID(seal.ResultID)
Expand All @@ -518,9 +519,17 @@ func (m *FollowerState) Finalize(blockID flow.Identifier) error {
for _, event := range result.ServiceEvents {
switch ev := event.Event.(type) {
case *flow.EpochSetup:
// track epoch phase transition (staking->setup)
events = append(events, func() { m.consumer.EpochSetupPhaseStarted(ev.Counter-1, header) })
case *flow.EpochCommit:
// track epoch phase transition (setup->committed)
events = append(events, func() { m.consumer.EpochCommittedPhaseStarted(ev.Counter-1, header) })
// track final view of committed epoch
nextEpochSetup, err := m.epoch.setups.ByID(epochStatus.NextEpoch.SetupID)
if err != nil {
return fmt.Errorf("could not retrieve setup event for next epoch: %w", err)
}
events = append(events, func() { m.metrics.CommittedEpochFinalView(nextEpochSetup.FinalView) })
default:
return fmt.Errorf("invalid service event type in payload (%T)", event)
}
Expand All @@ -536,7 +545,7 @@ func (m *FollowerState) Finalize(blockID flow.Identifier) error {
// if this block's view exceeds the final view of its parent's current epoch,
// this block begins the next epoch
if header.View > finalView {
events = append(events, func() { m.consumer.EpochTransition(setup.Counter, header) })
events = append(events, func() { m.consumer.EpochTransition(currentEpochSetup.Counter, header) })
}

// FINALLY: any block that is finalized is already a valid extension;
Expand Down
29 changes: 28 additions & 1 deletion state/protocol/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,29 @@ func TestExtendEpochTransitionValid(t *testing.T) {
consumer.On("BlockFinalized", mock.Anything)
rootSnapshot := unittest.RootSnapshotFixture(participants)

util.RunWithFullProtocolStateAndConsumer(t, rootSnapshot, consumer, func(db *badger.DB, state *protocol.MutableState) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {

// set up state and mock ComplianceMetrics object
metrics := new(mockmodule.ComplianceMetrics)
metrics.On("BlockSealed", mock.Anything)
metrics.On("SealedHeight", mock.Anything)
metrics.On("FinalizedHeight", mock.Anything)
metrics.On("BlockFinalized", mock.Anything)

// expect committed epoch final view metric at bootstrap
finalView, err := rootSnapshot.Epochs().Current().FinalView()
require.NoError(t, err)
metrics.On("CommittedEpochFinalView", finalView).Once()

tracer := trace.NewNoopTracer()
headers, _, seals, index, payloads, blocks, setups, commits, statuses, results := storeutil.StorageLayer(t, db)
protoState, err := protocol.Bootstrap(metrics, db, headers, seals, results, blocks, setups, commits, statuses, rootSnapshot)
require.NoError(t, err)
receiptValidator := util.MockReceiptValidator()
sealValidator := util.MockSealValidator(seals)
state, err := protocol.NewFullConsensusState(protoState, index, payloads, tracer, consumer, receiptValidator, sealValidator)
require.NoError(t, err)

head, err := rootSnapshot.Head()
require.NoError(t, err)
result, _, err := rootSnapshot.SealedResult()
Expand Down Expand Up @@ -598,9 +620,12 @@ func TestExtendEpochTransitionValid(t *testing.T) {

// expect epoch phase transition once we finalize block 6
consumer.On("EpochCommittedPhaseStarted", epoch2Setup.Counter-1, block6.Header)
// expect committed final view to be updated, since we are committing epoch 2
metrics.On("CommittedEpochFinalView", epoch2Setup.FinalView)
err = state.Finalize(block6.ID())
require.NoError(t, err)
consumer.AssertCalled(t, "EpochCommittedPhaseStarted", epoch2Setup.Counter-1, block6.Header)
metrics.AssertCalled(t, "CommittedEpochFinalView", epoch2Setup.FinalView)

// finalize block 7 so we can finalize subsequent blocks
err = state.Finalize(block7.ID())
Expand Down Expand Up @@ -651,6 +676,8 @@ func TestExtendEpochTransitionValid(t *testing.T) {
err = state.Finalize(block9.ID())
require.NoError(t, err)
consumer.AssertCalled(t, "EpochTransition", epoch2Setup.Counter, block9.Header)

metrics.AssertExpectations(t)
})
}

Expand Down
50 changes: 50 additions & 0 deletions state/protocol/badger/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func Bootstrap(
return fmt.Errorf("could not bootstrap epoch values: %w", err)
}

// 6) set metric values
err = state.updateCommittedEpochFinalView(root)
if err != nil {
return fmt.Errorf("could not set epoch final view value: %w", err)
}
state.metrics.BlockSealed(tail)
state.metrics.SealedHeight(tail.Header.Height)
state.metrics.FinalizedHeight(head.Header.Height)
Expand Down Expand Up @@ -389,6 +394,12 @@ func OpenState(
}
state := newState(metrics, db, headers, seals, results, blocks, setups, commits, statuses)

// update committed final view metric
err = state.updateCommittedEpochFinalView(state.Final())
if err != nil {
return nil, fmt.Errorf("failed to update committed epoch final view: %w", err)
}

return state, nil
}

Expand Down Expand Up @@ -476,3 +487,42 @@ func IsBootstrapped(db *badger.DB) (bool, error) {
}
return true, nil
}

// updateCommittedEpochFinalView updates the `committed_epoch_final_view` metric
// based on the current epoch phase of the input snapshot. It should be called
// at startup and during transitions between EpochSetup and EpochCommitted phases.
//
// For example, suppose we have epochs N and N+1.
// If we are in epoch N's Staking or Setup Phase, then epoch N's final view should be the value of the metric.
// If we are in epoch N's Committed Phase, then epoch N+1's final view should be the value of the metric.
func (state *State) updateCommittedEpochFinalView(snap protocol.Snapshot) error {

phase, err := snap.Phase()
if err != nil {
return fmt.Errorf("could not get epoch phase: %w", err)
}

// update metric based of epoch phase
switch phase {
case flow.EpochPhaseStaking, flow.EpochPhaseSetup:

// if we are in Staking or Setup phase, then set the metric value to the current epoch's final view
finalView, err := snap.Epochs().Current().FinalView()
if err != nil {
return fmt.Errorf("could not get current epoch final view from snapshot: %w", err)
}
state.metrics.CommittedEpochFinalView(finalView)
case flow.EpochPhaseCommitted:

// if we are in Committed phase, then set the metric value to the next epoch's final view
finalView, err := snap.Epochs().Next().FinalView()
if err != nil {
return fmt.Errorf("could not get next epoch final view from snapshot: %w", err)
}
state.metrics.CommittedEpochFinalView(finalView)
default:
return fmt.Errorf("invalid phase: %s", phase)
}

return nil
}
73 changes: 61 additions & 12 deletions state/protocol/badger/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
mock "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/state/protocol"
bprotocol "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/inmem"
Expand All @@ -32,25 +33,73 @@ func TestBootstrapAndOpen(t *testing.T) {
})

protoutil.RunWithBootstrapState(t, rootSnapshot, func(db *badger.DB, _ *bprotocol.State) {

// expect the final view metric to be set to current epoch's final view
finalView, err := rootSnapshot.Epochs().Current().FinalView()
require.NoError(t, err)
complianceMetrics := new(mock.ComplianceMetrics)
complianceMetrics.On("CommittedEpochFinalView", finalView).Once()

noopMetrics := new(metrics.NoopCollector)
all := storagebadger.InitAll(noopMetrics, db)
// protocol state has been bootstrapped, now open a protocol state with the database
metrics := new(metrics.NoopCollector)
all := storagebadger.InitAll(metrics, db)
state, err := bprotocol.OpenState(
metrics,
db,
all.Headers,
all.Seals,
all.Results,
all.Blocks,
all.Setups,
all.EpochCommits,
all.Statuses)
state, err := bprotocol.OpenState(complianceMetrics, db, all.Headers, all.Seals, all.Results, all.Blocks, all.Setups, all.EpochCommits, all.Statuses)
require.NoError(t, err)

// assert update final view was called
complianceMetrics.AssertExpectations(t)

unittest.AssertSnapshotsEqual(t, rootSnapshot, state.Final())
})
}

// TestBootstrapAndOpen_EpochCommitted verifies after bootstrapping with a
// root snapshot from EpochCommitted phase we should be able to open it and
// got the same state.
func TestBootstrapAndOpen_EpochCommitted(t *testing.T) {

// create a state root and bootstrap the protocol state with it
participants := unittest.CompleteIdentitySet()
rootSnapshot := unittest.RootSnapshotFixture(participants, func(block *flow.Block) {
block.Header.ParentID = unittest.IdentifierFixture()
})
rootBlock, err := rootSnapshot.Head()
require.NoError(t, err)

// build an epoch on the root state and return a snapshot from the committed phase
committedPhaseSnapshot := snapshotAfter(t, rootSnapshot, func(state *bprotocol.FollowerState) protocol.Snapshot {
unittest.NewEpochBuilder(t, state).BuildEpoch().CompleteEpoch()

// find the point where we transition to the epoch committed phase
for height := rootBlock.Height + 1; ; height++ {
phase, err := state.AtHeight(height).Phase()
require.NoError(t, err)
if phase == flow.EpochPhaseCommitted {
return state.AtHeight(height)
}
}
})

protoutil.RunWithBootstrapState(t, committedPhaseSnapshot, func(db *badger.DB, _ *bprotocol.State) {

// expect the final view metric to be set to next epoch's final view
finalView, err := committedPhaseSnapshot.Epochs().Next().FinalView()
require.NoError(t, err)
complianceMetrics := new(mock.ComplianceMetrics)
complianceMetrics.On("CommittedEpochFinalView", finalView).Once()

noopMetrics := new(metrics.NoopCollector)
all := storagebadger.InitAll(noopMetrics, db)
state, err := bprotocol.OpenState(complianceMetrics, db, all.Headers, all.Seals, all.Results, all.Blocks, all.Setups, all.EpochCommits, all.Statuses)
require.NoError(t, err)

// assert update final view was called
complianceMetrics.AssertExpectations(t)

unittest.AssertSnapshotsEqual(t, committedPhaseSnapshot, state.Final())
})
}

// TestBootstrapNonRoot tests bootstrapping the protocol state from arbitrary states.
//
// NOTE: for all these cases, we build a final child block (CHILD). This is
Expand Down

0 comments on commit 08e9413

Please sign in to comment.