Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P Message Execution Tracing #517

Merged
merged 12 commits into from
Sep 20, 2018
18 changes: 18 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,21 @@ go_repository(
commit = "c99c65617cd3d686aea8365fe563d6542f01d940",
importpath = "github.com/steakknife/hamming",
)

go_repository(
name = "io_opencensus_go",
commit = "f21fe3feadc5461b952191052818685a410428d4",
importpath = "go.opencensus.io",
)

go_repository(
name = "org_golang_google_api",
commit = "7ca32eb868bf53ea2fc406698eb98583a8073d19",
importpath = "google.golang.org/api",
)

go_repository(
name = "org_golang_x_sync",
commit = "1d60e4601c6fd243af51cc01ddf169918a5407ca",
importpath = "golang.org/x/sync",
)
3 changes: 3 additions & 0 deletions beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ VERSION:
utils.KeyFlag,
cmd.DataDirFlag,
cmd.VerbosityFlag,
cmd.EnableTracingFlag,
cmd.TracingEndpointFlag,
cmd.TraceSampleFractionFlag,
debug.PProfFlag,
debug.PProfAddrFlag,
debug.PProfPortFlag,
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//shared/database:go_default_library",
"//shared/debug:go_default_library",
"//shared/p2p:go_default_library",
"//shared/p2p/adapter/tracer:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//ethclient:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}

if err := beacon.registerP2P(); err != nil {
if err := beacon.registerP2P(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -143,8 +143,8 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error {
return nil
}

func (b *BeaconNode) registerP2P() error {
beaconp2p, err := configureP2P()
func (b *BeaconNode) registerP2P(ctx *cli.Context) error {
beaconp2p, err := configureP2P(ctx)
if err != nil {
return fmt.Errorf("could not register p2p service: %v", err)
}
Expand Down
17 changes: 14 additions & 3 deletions beacon-chain/node/p2p_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package node

import (
"github.com/golang/protobuf/proto"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/p2p/adapter/tracer"
"github.com/urfave/cli"

pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
Expand All @@ -20,14 +23,22 @@ var topicMappings = map[pb.Topic]proto.Message{
pb.Topic_ACTIVE_STATE_RESPONSE: &pb.ActiveStateResponse{},
}

func configureP2P() (*p2p.Server, error) {
func configureP2P(ctx *cli.Context) (*p2p.Server, error) {
s, err := p2p.NewServer()
if err != nil {
return nil, err
}

// TODO(437, 438): Define default adapters for logging, monitoring, etc.
var adapters []p2p.Adapter
traceAdapter, err := tracer.New("beacon-chain",
ctx.GlobalString(cmd.TracingEndpointFlag.Name),
ctx.GlobalFloat64(cmd.TraceSampleFractionFlag.Name),
ctx.GlobalBool(cmd.EnableTracingFlag.Name))
if err != nil {
return nil, err
}

// TODO(437): Define default adapters for logging, monitoring, etc.
adapters := []p2p.Adapter{traceAdapter}
for k, v := range topicMappings {
s.RegisterTopic(k.String(), v, adapters...)
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//shared/p2p:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

Expand Down
207 changes: 122 additions & 85 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

var log = logrus.WithField("prefix", "sync")
Expand Down Expand Up @@ -112,25 +113,6 @@ func (ss *Service) BlockAnnouncementFeed() *event.Feed {
return ss.blockAnnouncementFeed
}

// ReceiveBlockHash accepts a block hash.
// New hashes are forwarded to other peers in the network (unimplemented), and
// the contents of the block are requested if the local chain doesn't have the block.
func (ss *Service) ReceiveBlockHash(data *pb.BeaconBlockHashAnnounce, peer p2p.Peer) error {
var h [32]byte
copy(h[:], data.Hash[:32])
blockExists, err := ss.chainService.ContainsBlock(h)
if err != nil {
return err
}
if blockExists {
return nil
}
log.WithField("blockHash", fmt.Sprintf("0x%x", h)).Debug("Received incoming block hash, requesting full block data from sender")
// Request the full block data from peer that sent the block hash.
ss.p2p.Send(&pb.BeaconBlockRequest{Hash: h[:]}, peer)
return nil
}

// run handles incoming block sync.
func (ss *Service) run() {
announceBlockHashSub := ss.p2p.Subscribe(&pb.BeaconBlockHashAnnounce{}, ss.announceBlockHashBuf)
Expand All @@ -147,74 +129,129 @@ func (ss *Service) run() {
log.Debug("Exiting goroutine")
return
case msg := <-ss.announceBlockHashBuf:
data := msg.Data.(*pb.BeaconBlockHashAnnounce)
if err := ss.ReceiveBlockHash(data, msg.Peer); err != nil {
log.Errorf("Received block hash failed: %v", err)
}
ss.receiveBlockHash(msg)
case msg := <-ss.blockBuf:
response := msg.Data.(*pb.BeaconBlockResponse)
block := types.NewBlock(response.Block)
blockHash, err := block.Hash()
if err != nil {
log.Errorf("Could not hash received block: %v", err)
}

blockExists, err := ss.chainService.ContainsBlock(blockHash)
if err != nil {
log.Errorf("Can not check for block in DB: %v", err)
continue
}
if blockExists {
continue
}

// Verify attestation coming from proposer then forward block to the subscribers.
attestation := types.NewAttestation(response.Attestation)
cState := ss.chainService.CurrentCrystallizedState()
parentSlot, err := ss.chainService.GetBlockSlotNumber(block.ParentHash())
if err != nil {
log.Errorf("Failed to get parent slot: %v", err)
continue
}
proposerShardID, _, err := casper.GetProposerIndexAndShard(cState.ShardAndCommitteesForSlots(), cState.LastStateRecalc(), parentSlot)
if err != nil {
log.Errorf("Failed to get proposer shard ID: %v", err)
continue
}
if err := attestation.VerifyAttestation(proposerShardID); err != nil {
log.Errorf("Failed to verify proposer attestation: %v", err)
continue
}

log.WithField("blockHash", fmt.Sprintf("0x%x", blockHash)).Debug("Sending newly received block to subscribers")
ss.chainService.IncomingBlockFeed().Send(block)
log.WithField("attestationHash", fmt.Sprintf("0x%x", attestation.Key())).Debug("Sending newly received attestation to subscribers")
ss.chainService.IncomingAttestationFeed().Send(attestation)

ss.receiveBlock(msg)
case msg := <-ss.blockRequestBySlot:
request, ok := msg.Data.(*pb.BeaconBlockRequestBySlotNumber)
if !ok {
log.Error("Received malformed beacon block request p2p message")
continue
}

blockExists, err := ss.chainService.CheckForCanonicalBlockBySlot(request.GetSlotNumber())
if err != nil {
log.Errorf("Error checking db for block %v", err)
continue
}
if !blockExists {
continue
}

block, err := ss.chainService.GetCanonicalBlockBySlotNumber(request.GetSlotNumber())
if err != nil {
log.Errorf("Error retrieving block from db %v", err)
continue
}

log.WithField("slotNumber", fmt.Sprintf("%d", request.GetSlotNumber())).Debug("Sending requested block to peer")
ss.p2p.Send(block.Proto(), msg.Peer)
ss.handleBlockRequestBySlot(msg)
}
}
}

// receiveBlockHash accepts a block hash.
// New hashes are forwarded to other peers in the network (unimplemented), and
// the contents of the block are requested if the local chain doesn't have the block.
func (ss *Service) receiveBlockHash(msg p2p.Message) {
ctx, receiveBlockSpan := trace.StartSpan(msg.Ctx, "receiveBlockHash")
defer receiveBlockSpan.End()

data := msg.Data.(*pb.BeaconBlockHashAnnounce)
var h [32]byte
copy(h[:], data.Hash[:32])

ctx, containsBlockSpan := trace.StartSpan(ctx, "containsBlock")
blockExists, err := ss.chainService.ContainsBlock(h)
containsBlockSpan.End()
if err != nil {
log.Errorf("Received block hash failed: %v", err)
}
if blockExists {
return
}

log.WithField("blockHash", fmt.Sprintf("0x%x", h)).Debug("Received incoming block hash, requesting full block data from sender")
// Request the full block data from peer that sent the block hash.
_, sendBlockRequestSpan := trace.StartSpan(ctx, "sendBlockRequest")
ss.p2p.Send(&pb.BeaconBlockRequest{Hash: h[:]}, msg.Peer)
sendBlockRequestSpan.End()
}

// receiveBlock processes a block from the p2p layer.
func (ss *Service) receiveBlock(msg p2p.Message) {
ctx, receiveBlockSpan := trace.StartSpan(msg.Ctx, "receiveBlock")
defer receiveBlockSpan.End()

response := msg.Data.(*pb.BeaconBlockResponse)
block := types.NewBlock(response.Block)
blockHash, err := block.Hash()
if err != nil {
log.Errorf("Could not hash received block: %v", err)
}

ctx, containsBlockSpan := trace.StartSpan(ctx, "containsBlock")
blockExists, err := ss.chainService.ContainsBlock(blockHash)
containsBlockSpan.End()
if err != nil {
log.Errorf("Can not check for block in DB: %v", err)
return
}
if blockExists {
return
}

// Verify attestation coming from proposer then forward block to the subscribers.
attestation := types.NewAttestation(response.Attestation)
cState := ss.chainService.CurrentCrystallizedState()
parentSlot, err := ss.chainService.GetBlockSlotNumber(block.ParentHash())
if err != nil {
log.Errorf("Failed to get parent slot: %v", err)
return
}
proposerShardID, _, err := casper.GetProposerIndexAndShard(cState.ShardAndCommitteesForSlots(), cState.LastStateRecalc(), parentSlot)
if err != nil {
log.Errorf("Failed to get proposer shard ID: %v", err)
return
}
if err := attestation.VerifyAttestation(proposerShardID); err != nil {
log.Errorf("Failed to verify proposer attestation: %v", err)
return
}

_, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
log.WithField("blockHash", fmt.Sprintf("0x%x", blockHash)).Debug("Sending newly received block to subscribers")
ss.chainService.IncomingBlockFeed().Send(block)
sendBlockSpan.End()

_, sendAttestationSpan := trace.StartSpan(ctx, "sendAttestation")
log.WithField("attestationHash", fmt.Sprintf("0x%x", attestation.Key())).Debug("Sending newly received attestation to subscribers")
ss.chainService.IncomingAttestationFeed().Send(attestation)
sendAttestationSpan.End()
}

// handleBlockRequestBySlot processes a block request from the p2p layer.
// if found, the block is sent to the requesting peer.
func (ss *Service) handleBlockRequestBySlot(msg p2p.Message) {
ctx, blockRequestSpan := trace.StartSpan(msg.Ctx, "blockRequestBySlot")
defer blockRequestSpan.End()

request, ok := msg.Data.(*pb.BeaconBlockRequestBySlotNumber)
// TODO: Handle this at p2p layer.
if !ok {
log.Error("Received malformed beacon block request p2p message")
return
}

ctx, checkForBlockSpan := trace.StartSpan(ctx, "checkForBlockBySlot")
blockExists, err := ss.chainService.CheckForCanonicalBlockBySlot(request.GetSlotNumber())
checkForBlockSpan.End()
if err != nil {
log.Errorf("Error checking db for block %v", err)
return
}
if !blockExists {
return
}

ctx, getBlockSpan := trace.StartSpan(ctx, "getBlockBySlot")
block, err := ss.chainService.GetCanonicalBlockBySlotNumber(request.GetSlotNumber())
getBlockSpan.End()
if err != nil {
log.Errorf("Error retrieving block from db %v", err)
return
}

_, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
log.WithField("slotNumber", fmt.Sprintf("%d", request.GetSlotNumber())).Debug("Sending requested block to peer")
ss.p2p.Send(block.Proto(), msg.Peer)
sendBlockSpan.End()
}
8 changes: 8 additions & 0 deletions beacon-chain/sync/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestProcessBlockHash(t *testing.T) {
}

msg := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Data: hashAnnounce,
}
Expand Down Expand Up @@ -150,6 +151,7 @@ func TestProcessBlock(t *testing.T) {
}

msg := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Data: responseBlock,
}
Expand Down Expand Up @@ -186,6 +188,7 @@ func TestProcessMultipleBlocks(t *testing.T) {
}

msg1 := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Data: responseBlock1,
}
Expand All @@ -200,6 +203,7 @@ func TestProcessMultipleBlocks(t *testing.T) {
}

msg2 := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Data: responseBlock2,
}
Expand Down Expand Up @@ -232,6 +236,7 @@ func TestBlockRequestErrors(t *testing.T) {
}

invalidmsg := p2p.Message{
Ctx: context.Background(),
Data: malformedRequest,
Peer: p2p.Peer{},
}
Expand All @@ -244,6 +249,7 @@ func TestBlockRequestErrors(t *testing.T) {
}

msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
}
Expand Down Expand Up @@ -273,6 +279,7 @@ func TestBlockRequestGetCanonicalError(t *testing.T) {
}

msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
}
Expand Down Expand Up @@ -304,6 +311,7 @@ func TestBlockRequestBySlot(t *testing.T) {
}

msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
}
Expand Down
Loading