Skip to content

Commit

Permalink
Add prometheus metrics test into E2E (#5673)
Browse files Browse the repository at this point in the history
* Progress on metrics tests
* Progress on metrics test
* Get metrics E2E working
* Merge branch 'master' of https://github.com/prysmaticlabs/prysm into e2e-metrics
* Complete most of metrics tests
* Change E2E polling to the middle of a slot, instead of at the start of the middle
* Add metrics to all E2E
* Remove extra types
* Update endtoend/evaluators/metrics.go

Co-Authored-By: Preston Van Loon <[email protected]>
* Merge branch 'master' into e2e-metrics
* Add more comments, address feedback
* Merge branch 'e2e-metrics' of https://github.com/prysmaticlabs/prysm into e2e-metrics
* Fix build
* Remove unneeded comment
* Set E2E_EPOCHS back
* Improve sync testing reliability
* Remove metrics check from slashing
* Improve time allotted to sync
* Remove possibly flaky sync test change
  • Loading branch information
0xKiwi authored Apr 29, 2020
1 parent 3b2c514 commit 84e51a5
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 34 deletions.
40 changes: 21 additions & 19 deletions endtoend/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ func runEndToEndTest(t *testing.T, config *types.E2EConfig) {
return
}

if config.TestSlasher {
slasherPIDs := components.StartSlashers(t)
defer helpers.KillProcesses(t, slasherPIDs)
}
if config.TestDeposits {
valCount := int(params.BeaconConfig().MinGenesisActiveValidatorCount) / e2e.TestParams.BeaconNodeCount
valPid := components.StartNewValidatorClient(t, config, valCount, e2e.TestParams.BeaconNodeCount)
defer helpers.KillProcesses(t, []int{valPid})
components.SendAndMineDeposits(t, keystorePath, valCount, int(params.BeaconConfig().MinGenesisActiveValidatorCount))
}

conns := make([]*grpc.ClientConn, e2e.TestParams.BeaconNodeCount)
for i := 0; i < len(conns); i++ {
conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", e2e.TestParams.BeaconNodeRPCPort+i), grpc.WithInsecure())
Expand All @@ -69,22 +80,14 @@ func runEndToEndTest(t *testing.T, config *types.E2EConfig) {
if err != nil {
t.Fatal(err)
}
// Small offset so evaluators perform in the middle of an epoch.
epochSeconds := params.BeaconConfig().SecondsPerSlot * params.BeaconConfig().SlotsPerEpoch
genesisTime := time.Unix(genesis.GenesisTime.Seconds+int64(epochSeconds/2), 0)

if config.TestSlasher {
slasherPIDs := components.StartSlashers(t)
defer helpers.KillProcesses(t, slasherPIDs)
}
if config.TestDeposits {
valCount := int(params.BeaconConfig().MinGenesisActiveValidatorCount) / e2e.TestParams.BeaconNodeCount
valPid := components.StartNewValidatorClient(t, config, valCount, e2e.TestParams.BeaconNodeCount)
defer helpers.KillProcesses(t, []int{valPid})
components.SendAndMineDeposits(t, keystorePath, valCount, int(params.BeaconConfig().MinGenesisActiveValidatorCount))
}
epochSeconds := params.BeaconConfig().SecondsPerSlot * params.BeaconConfig().SlotsPerEpoch
// Adding a half slot here to ensure the requests are in the middle of an epoch.
middleOfEpoch := int64(epochSeconds/2 + (params.BeaconConfig().SecondsPerSlot / 2))
// Offsetting the ticker from genesis so it ticks in the middle of an epoch, in order to keep results consistent.
tickingStartTime := time.Unix(genesis.GenesisTime.Seconds+middleOfEpoch, 0)

ticker := helpers.GetEpochTicker(genesisTime, epochSeconds)
ticker := helpers.GetEpochTicker(tickingStartTime, epochSeconds)
for currentEpoch := range ticker.C() {
for _, evaluator := range config.Evaluators {
// Only run if the policy says so.
Expand Down Expand Up @@ -119,11 +122,10 @@ func runEndToEndTest(t *testing.T, config *types.E2EConfig) {
}
conns = append(conns, syncConn)

// Sleep for a few epochs to give time for the newly started node to sync.
extraTimeToSync := (config.EpochsToRun+config.EpochsToRun/2)*epochSeconds + 60
genesisTime.Add(time.Duration(extraTimeToSync) * time.Second)
// Wait until middle of epoch to request to prevent conflicts.
time.Sleep(time.Until(genesisTime))
// Sleep a second for every 4 blocks that need to be synced for the newly started node.
extraSecondsToSync := (config.EpochsToRun)*epochSeconds + (params.BeaconConfig().SlotsPerEpoch / 4 * config.EpochsToRun)
waitForSync := tickingStartTime.Add(time.Duration(extraSecondsToSync) * time.Second)
time.Sleep(time.Until(waitForSync))

syncLogFile, err := os.Open(path.Join(e2e.TestParams.LogPath, fmt.Sprintf(e2e.BeaconNodeLogFileName, index)))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions endtoend/evaluators/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
testonly = True,
srcs = [
"finality.go",
"metrics.go",
"node.go",
"slashing.go",
"validator.go",
Expand All @@ -16,6 +17,7 @@ go_library(
"//endtoend/params:go_default_library",
"//endtoend/types:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/p2putils:go_default_library",
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/testutil:go_default_library",
Expand Down
206 changes: 206 additions & 0 deletions endtoend/evaluators/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package evaluators

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
e2e "github.com/prysmaticlabs/prysm/endtoend/params"
"github.com/prysmaticlabs/prysm/endtoend/types"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"google.golang.org/grpc"
)

// MetricsCheck performs a check on metrics to make sure caches are functioning, and
// overall health is good. Not checking the first epoch so the sample size isn't too small.
var MetricsCheck = types.Evaluator{
Name: "metrics_check_epoch_%d",
Policy: afterNthEpoch(0),
Evaluation: metricsTest,
}

type equalityTest struct {
name string
topic string
value int
}

type comparisonTest struct {
name string
topic1 string
topic2 string
expectedComparison float64
}

var metricLessThanTests = []equalityTest{
{
name: "memory usage",
topic: "go_memstats_alloc_bytes",
value: 100000000, // 100 Mb
},
}

var metricComparisonTests = []comparisonTest{
{
name: "beacon aggregate and proof",
topic1: "p2p_message_failed_validation_total{topic=\"/eth2/%x/beacon_aggregate_and_proof/ssz_snappy\"}",
topic2: "p2p_message_received_total{topic=\"/eth2/%x/beacon_aggregate_and_proof/ssz_snappy\"}",
expectedComparison: 0.8,
},
{
name: "committee index 0 beacon attestation",
topic1: "p2p_message_failed_validation_total{topic=\"/eth2/%x/committee_index0_beacon_attestation/ssz_snappy\"}",
topic2: "p2p_message_received_total{topic=\"/eth2/%x/committee_index0_beacon_attestation/ssz_snappy\"}",
expectedComparison: 0.1,
},
{
name: "committee index 1 beacon attestation",
topic1: "p2p_message_failed_validation_total{topic=\"/eth2/%x/committee_index1_beacon_attestation/ssz_snappy\"}",
topic2: "p2p_message_received_total{topic=\"/eth2/%x/committee_index1_beacon_attestation/ssz_snappy\"}",
expectedComparison: 0.1,
},
{
name: "committee cache",
topic1: "committee_cache_miss",
topic2: "committee_cache_hit",
expectedComparison: 0.01,
},
{
name: "hot state cache",
topic1: "hot_state_cache_miss",
topic2: "hot_state_cache_hit",
expectedComparison: 0.01,
},
}

func metricsTest(conns ...*grpc.ClientConn) error {
for i := 0; i < len(conns); i++ {
response, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", e2e.TestParams.BeaconNodeMetricsPort+i))
if err != nil {
return errors.Wrap(err, "failed to reach prometheus metrics page")
}
dataInBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
pageContent := string(dataInBytes)
if err := response.Body.Close(); err != nil {
return err
}

genesis, err := eth.NewNodeClient(conns[i]).GetGenesis(context.Background(), &ptypes.Empty{})
if err != nil {
return err
}
forkDigest, err := p2putils.CreateForkDigest(time.Unix(genesis.GenesisTime.Seconds, 0), genesis.GenesisValidatorsRoot)
if err != nil {
return err
}

chainHead, err := eth.NewBeaconChainClient(conns[i]).GetChainHead(context.Background(), &ptypes.Empty{})
if err != nil {
return err
}
timeSlot, err := getValueOfTopic(pageContent, "beacon_clock_time_slot")
if err != nil {
return err
}
if chainHead.HeadSlot != uint64(timeSlot) {
return fmt.Errorf("expected metrics slot to equal chain head slot, expected %d, received %d", chainHead.HeadSlot, timeSlot)
}

for _, test := range metricLessThanTests {
topic := test.topic
if strings.Contains(topic, "%x") {
topic = fmt.Sprintf(topic, forkDigest)
}
if err := metricCheckLessThan(pageContent, topic, test.value); err != nil {
return errors.Wrapf(err, "failed %s check", test.name)
}
}
for _, test := range metricComparisonTests {
topic1 := test.topic1
if strings.Contains(topic1, "%x") {
topic1 = fmt.Sprintf(topic1, forkDigest)
}
topic2 := test.topic2
if strings.Contains(topic2, "%x") {
topic2 = fmt.Sprintf(topic2, forkDigest)
}
if err := metricCheckComparison(pageContent, topic1, topic2, test.expectedComparison); err != nil {
return err
}
}
}
return nil
}

func metricCheckLessThan(pageContent string, topic string, value int) error {
topicValue, err := getValueOfTopic(pageContent, topic)
if err != nil {
return err
}
if topicValue >= value {
return fmt.Errorf(
"unexpected result for metric %s, expected less than %d, received %d",
topic,
value,
topicValue,
)
}
return nil
}

func metricCheckComparison(pageContent string, topic1 string, topic2 string, comparison float64) error {
topic2Value, err := getValueOfTopic(pageContent, topic2)
if err != nil {
return err
}
topic1Value, err := getValueOfTopic(pageContent, topic1)
// If we can't find the first topic (error metrics), then assume the test passes.
if topic1Value == -1 && topic2Value != -1 {
return nil
}
if err != nil {
return err
}
topicComparison := float64(topic1Value) / float64(topic2Value)
if topicComparison >= comparison {
return fmt.Errorf(
"unexpected result for comparison between metric %s and metric %s, expected comparison to be %.2f, received %.2f",
topic1,
topic2,
comparison,
topicComparison,
)
}
return nil
}

func getValueOfTopic(pageContent string, topic string) (int, error) {
// Adding a space to search exactly.
startIdx := strings.LastIndex(pageContent, topic+" ")
if startIdx == -1 {
return -1, fmt.Errorf("did not find requested text %s in %s", topic, pageContent)
}
endOfTopic := startIdx + len(topic)
// Adding 1 to skip the space after the topic name.
startOfValue := endOfTopic + 1
endOfValue := strings.Index(pageContent[startOfValue:], "\n")
if endOfValue == -1 {
return -1, fmt.Errorf("could not find next space in %s", pageContent[startOfValue:])
}
metricValue := pageContent[startOfValue : startOfValue+endOfValue]
floatResult, err := strconv.ParseFloat(metricValue, 64)
if err != nil {
return -1, errors.Wrapf(err, "could not parse %s for int", metricValue)
}
return int(floatResult), nil
}
24 changes: 9 additions & 15 deletions endtoend/evaluators/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,43 +138,37 @@ func allNodesHaveSameHead(conns ...*grpc.ClientConn) error {
}
}

for i, epoch := range headEpochs {
if headEpochs[0] != epoch {
for i := 0; i < len(conns); i++ {
if headEpochs[0] != headEpochs[i] {
return fmt.Errorf(
"received conflicting head epochs on node %d, expected %d, received %d",
i,
headEpochs[0],
epoch,
headEpochs[i],
)
}
}
for i, root := range justifiedRoots {
if !bytes.Equal(justifiedRoots[0], root) {
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
return fmt.Errorf(
"received conflicting justified block roots on node %d, expected %#x, received %#x",
i,
justifiedRoots[0],
root,
justifiedRoots[i],
)
}
}
for i, root := range prevJustifiedRoots {
if !bytes.Equal(prevJustifiedRoots[0], root) {
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
return fmt.Errorf(
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
i,
prevJustifiedRoots[0],
root,
prevJustifiedRoots[i],
)
}
}
for i, root := range finalizedRoots {
if !bytes.Equal(finalizedRoots[0], root) {
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
return fmt.Errorf(
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
i,
finalizedRoots[0],
root,
finalizedRoots[i],
)
}
}
Expand Down
1 change: 1 addition & 0 deletions endtoend/long_minimal_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestEndToEnd_Long_MinimalConfig(t *testing.T) {
ev.ValidatorsAreActive,
ev.ValidatorsParticipating,
ev.FinalizationOccurs,
ev.MetricsCheck,
ev.ProcessesDepositedValidators,
ev.DepositedValidatorsAreActive,
},
Expand Down
1 change: 1 addition & 0 deletions endtoend/minimal_antiflake_e2e_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestEndToEnd_AntiFlake_MinimalConfig_1(t *testing.T) {
ev.HealthzCheck,
ev.ValidatorsAreActive,
ev.ValidatorsParticipating,
ev.MetricsCheck,
},
}
if err := e2eParams.Init(4); err != nil {
Expand Down
1 change: 1 addition & 0 deletions endtoend/minimal_antiflake_e2e_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestEndToEnd_AntiFlake_MinimalConfig_2(t *testing.T) {
ev.HealthzCheck,
ev.ValidatorsAreActive,
ev.ValidatorsParticipating,
ev.MetricsCheck,
},
}
if err := e2eParams.Init(4); err != nil {
Expand Down
1 change: 1 addition & 0 deletions endtoend/minimal_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestEndToEnd_MinimalConfig(t *testing.T) {
ev.ValidatorsAreActive,
ev.ValidatorsParticipating,
ev.FinalizationOccurs,
ev.MetricsCheck,
},
}
if err := e2eParams.Init(4); err != nil {
Expand Down

0 comments on commit 84e51a5

Please sign in to comment.