Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use All Attestation Subnets #5966

Merged
merged 9 commits into from
May 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
return nil
}

func (mb *mockBroadcaster) BroadcastAttestation(_ context.Context, _ uint64, _ *ethpb.Attestation) error {
mb.broadcastCalled = true
return nil
}

var _ = p2p.Broadcaster(&mockBroadcaster{})

func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
Expand Down
20 changes: 10 additions & 10 deletions beacon-chain/cache/committee_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ func newCommitteeIDs() *committeeIDs {
return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache, persistentSubnets: persistentCache}
}

// AddAttesterCommiteeID adds committee ID for subscribing subnet for the attester of a given slot.
func (c *committeeIDs) AddAttesterCommiteeID(slot uint64, committeeID uint64) {
// AddAttesterSubnetID adds the subnet index for subscribing subnet for the attester of a given slot.
func (c *committeeIDs) AddAttesterSubnetID(slot uint64, subnetID uint64) {
c.attesterLock.Lock()
defer c.attesterLock.Unlock()

ids := []uint64{committeeID}
ids := []uint64{subnetID}
val, exists := c.attester.Get(slot)
if exists {
ids = sliceutil.UnionUint64(append(val.([]uint64), ids...))
}
c.attester.Add(slot, ids)
}

// GetAttesterCommitteeIDs gets the committee ID for subscribing subnet for attester of the slot.
func (c *committeeIDs) GetAttesterCommitteeIDs(slot uint64) []uint64 {
// GetAttesterSubnetIDs gets the subnet IDs for subscribed subnets for attesters of the slot.
func (c *committeeIDs) GetAttesterSubnetIDs(slot uint64) []uint64 {
c.attesterLock.RLock()
defer c.attesterLock.RUnlock()

Expand All @@ -68,21 +68,21 @@ func (c *committeeIDs) GetAttesterCommitteeIDs(slot uint64) []uint64 {
return nil
}

// AddAggregatorCommiteeID adds committee ID for subscribing subnet for the aggregator of a given slot.
func (c *committeeIDs) AddAggregatorCommiteeID(slot uint64, committeeID uint64) {
// AddAggregatorSubnetID adds the subnet ID for subscribing subnet for the aggregator of a given slot.
func (c *committeeIDs) AddAggregatorSubnetID(slot uint64, subnetID uint64) {
c.aggregatorLock.Lock()
defer c.aggregatorLock.Unlock()

ids := []uint64{committeeID}
ids := []uint64{subnetID}
val, exists := c.aggregator.Get(slot)
if exists {
ids = sliceutil.UnionUint64(append(val.([]uint64), ids...))
}
c.aggregator.Add(slot, ids)
}

// GetAggregatorCommitteeIDs gets the committee ID for subscribing subnet for aggregator of the slot.
func (c *committeeIDs) GetAggregatorCommitteeIDs(slot uint64) []uint64 {
// GetAggregatorSubnetIDs gets the subnet IDs for subscribing subnet for aggregator of the slot.
func (c *committeeIDs) GetAggregatorSubnetIDs(slot uint64) []uint64 {
c.aggregatorLock.RLock()
defer c.aggregatorLock.RUnlock()

Expand Down
28 changes: 14 additions & 14 deletions beacon-chain/cache/committee_ids_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,48 @@ import (
func TestCommitteeIDCache_RoundTrip(t *testing.T) {
c := newCommitteeIDs()
slot := uint64(100)
committeeIDs := c.GetAggregatorCommitteeIDs(slot)
committeeIDs := c.GetAggregatorSubnetIDs(slot)
if len(committeeIDs) != 0 {
t.Errorf("Empty cache returned an object: %v", committeeIDs)
}

c.AddAggregatorCommiteeID(slot, 1)
res := c.GetAggregatorCommitteeIDs(slot)
c.AddAggregatorSubnetID(slot, 1)
res := c.GetAggregatorSubnetIDs(slot)
if !reflect.DeepEqual(res, []uint64{1}) {
t.Error("Expected equal value to return from cache")
}

c.AddAggregatorCommiteeID(slot, 2)
res = c.GetAggregatorCommitteeIDs(slot)
c.AddAggregatorSubnetID(slot, 2)
res = c.GetAggregatorSubnetIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2}) {
t.Error("Expected equal value to return from cache")
}

c.AddAggregatorCommiteeID(slot, 3)
res = c.GetAggregatorCommitteeIDs(slot)
c.AddAggregatorSubnetID(slot, 3)
res = c.GetAggregatorSubnetIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2, 3}) {
t.Error("Expected equal value to return from cache")
}

committeeIDs = c.GetAttesterCommitteeIDs(slot)
committeeIDs = c.GetAttesterSubnetIDs(slot)
if len(committeeIDs) != 0 {
t.Errorf("Empty cache returned an object: %v", committeeIDs)
}

c.AddAttesterCommiteeID(slot, 11)
res = c.GetAttesterCommitteeIDs(slot)
c.AddAttesterSubnetID(slot, 11)
res = c.GetAttesterSubnetIDs(slot)
if !reflect.DeepEqual(res, []uint64{11}) {
t.Error("Expected equal value to return from cache")
}

c.AddAttesterCommiteeID(slot, 22)
res = c.GetAttesterCommitteeIDs(slot)
c.AddAttesterSubnetID(slot, 22)
res = c.GetAttesterSubnetIDs(slot)
if !reflect.DeepEqual(res, []uint64{11, 22}) {
t.Error("Expected equal value to return from cache")
}

c.AddAttesterCommiteeID(slot, 33)
res = c.GetAttesterCommitteeIDs(slot)
c.AddAttesterSubnetID(slot, 33)
res = c.GetAttesterSubnetIDs(slot)
if !reflect.DeepEqual(res, []uint64{11, 22, 33}) {
t.Error("Expected equal value to return from cache")
}
Expand Down
37 changes: 37 additions & 0 deletions beacon-chain/core/helpers/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,40 @@ func AggregateSignature(attestations []*ethpb.Attestation) (*bls.Signature, erro
func IsAggregated(attestation *ethpb.Attestation) bool {
return attestation.AggregationBits.Count() > 1
}

// ComputeSubnetForAttestation returns the subnet for which the provided attestation will be broadcasted to.
// This differs from the spec definition by instead passing in the active validators indices in the attestation's
// given epoch.
//
// Spec pseudocode definition:
// def compute_subnet_for_attestation(state: BeaconState, attestation: Attestation) -> uint64:
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected Phase 1 behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = attestation.data.slot % SLOTS_PER_EPOCH
// committees_since_epoch_start = get_committee_count_at_slot(state, attestation.data.slot) * slots_since_epoch_start
// return (committees_since_epoch_start + attestation.data.index) % ATTESTATION_SUBNET_COUNT
func ComputeSubnetForAttestation(activeValCount uint64, att *ethpb.Attestation) uint64 {
return ComputeSubnetFromCommitteeAndSlot(activeValCount, att.Data.CommitteeIndex, att.Data.Slot)
}

// ComputeSubnetFromCommitteeAndSlot is a flattened version of ComputeSubnetForAttestation where we only pass in
// the relevant fields from the attestation as function arguments.
//
// Spec pseudocode definition:
// def compute_subnet_for_attestation(state: BeaconState, attestation: Attestation) -> uint64:
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected Phase 1 behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = attestation.data.slot % SLOTS_PER_EPOCH
// committees_since_epoch_start = get_committee_count_at_slot(state, attestation.data.slot) * slots_since_epoch_start
// return (committees_since_epoch_start + attestation.data.index) % ATTESTATION_SUBNET_COUNT
func ComputeSubnetFromCommitteeAndSlot(activeValCount, comIdx, attSlot uint64) uint64 {
slotSinceStart := SlotsSinceEpochStarts(attSlot)
comCount := SlotCommitteeCount(activeValCount)
commsSinceStart := comCount * slotSinceStart
computedSubnet := (commsSinceStart + comIdx) % params.BeaconNetworkConfig().AttestationSubnetCount
return computedSubnet
}
51 changes: 51 additions & 0 deletions beacon-chain/core/helpers/attestation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package helpers_test
import (
"bytes"
"sort"
"strconv"
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand Down Expand Up @@ -348,3 +349,53 @@ func TestAggregateSignature_False(t *testing.T) {
t.Error("Signature not suppose to verify")
}
}

func TestComputeSubnetForAttestation_ComputeForAttestation(t *testing.T) {
// Create 10 committees
committeeCount := uint64(10)
validatorCount := committeeCount * params.BeaconConfig().TargetCommitteeSize
validators := make([]*ethpb.Validator, validatorCount)

for i := 0; i < len(validators); i++ {
k := make([]byte, 48)
copy(k, strconv.Itoa(i))
validators[i] = &ethpb.Validator{
PublicKey: k,
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
}
}

state, err := beaconstate.InitializeFromProto(&pb.BeaconState{
Validators: validators,
Slot: 200,
BlockRoots: make([][]byte, params.BeaconConfig().SlotsPerHistoricalRoot),
StateRoots: make([][]byte, params.BeaconConfig().SlotsPerHistoricalRoot),
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
})
if err != nil {
t.Fatal(err)
}
att := &ethpb.Attestation{
AggregationBits: []byte{'A'},
Data: &ethpb.AttestationData{
Slot: 34,
CommitteeIndex: 4,
BeaconBlockRoot: []byte{'C'},
Source: nil,
Target: nil,
},
Signature: []byte{'B'},
XXX_NoUnkeyedLiteral: struct{}{},
XXX_unrecognized: nil,
XXX_sizecache: 0,
}
valCount, err := helpers.ActiveValidatorCount(state, helpers.SlotToEpoch(att.Data.Slot))
if err != nil {
t.Fatal(err)
}
sub := helpers.ComputeSubnetForAttestation(valCount, att)
if sub != 6 {
t.Errorf("Did not get correct subnet for attestation, wanted %d but got %d", 6, sub)
}
}
56 changes: 39 additions & 17 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,12 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
return err
}

var topic string
switch msg.(type) {
case *eth.Attestation:
topic = attestationToTopic(msg.(*eth.Attestation), forkDigest)
default:
var ok bool
topic, ok = GossipTypeMapping[reflect.TypeOf(msg)]
if !ok {
traceutil.AnnotateError(span, ErrMessageNotMapped)
return ErrMessageNotMapped
}
topic = fmt.Sprintf(topic, forkDigest)
topic, ok := GossipTypeMapping[reflect.TypeOf(msg)]
if !ok {
traceutil.AnnotateError(span, ErrMessageNotMapped)
return ErrMessageNotMapped
}
topic = fmt.Sprintf(topic, forkDigest)

span.AddAttributes(trace.StringAttribute("topic", topic))

Expand All @@ -64,11 +57,40 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
return nil
}

const attestationSubnetTopicFormat = "/eth2/%x/committee_index%d_beacon_attestation"
// BroadcastAttestation broadcasts an attestation to the p2p network.
func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *eth.Attestation) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
forkDigest, err := s.forkDigest()
if err != nil {
return err
}
topic := attestationToTopic(subnet, forkDigest)
span.AddAttributes(trace.StringAttribute("topic", topic))

func attestationToTopic(att *eth.Attestation, forkDigest [4]byte) string {
if att == nil || att.Data == nil {
return ""
buf := new(bytes.Buffer)
if _, err := s.Encoding().EncodeGossip(buf, att); err != nil {
err := errors.Wrap(err, "could not encode message")
traceutil.AnnotateError(span, err)
return err
}
return fmt.Sprintf(attestationSubnetTopicFormat, forkDigest, att.Data.CommitteeIndex)

if span.IsRecordingEvents() {
id := hashutil.FastSum64(buf.Bytes())
messageLen := int64(buf.Len())
span.AddMessageSendEvent(int64(id), messageLen /*uncompressed*/, messageLen /*compressed*/)
}

if err := s.pubsub.Publish(topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
err := errors.Wrap(err, "could not publish message")
traceutil.AnnotateError(span, err)
return err
}
return nil
}

const attestationSubnetTopicFormat = "/eth2/%x/beacon_attestation_%d"

func attestationToTopic(subnet uint64, forkDigest [4]byte) string {
return fmt.Sprintf(attestationSubnetTopicFormat, forkDigest, subnet)
}
21 changes: 10 additions & 11 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"

"github.com/gogo/protobuf/proto"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
Expand Down Expand Up @@ -109,36 +111,33 @@ func TestService_Attestation_Subnet(t *testing.T) {
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 0,
Slot: 2,
},
},
topic: "/eth2/00000000/committee_index0_beacon_attestation",
topic: "/eth2/00000000/beacon_attestation_2",
},
{
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 11,
Slot: 10,
},
},
topic: "/eth2/00000000/committee_index11_beacon_attestation",
topic: "/eth2/00000000/beacon_attestation_21",
},
{
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 55,
Slot: 529,
},
},
topic: "/eth2/00000000/committee_index55_beacon_attestation",
},
{
att: &eth.Attestation{},
topic: "",
},
{
topic: "",
topic: "/eth2/00000000/beacon_attestation_8",
},
}
for _, tt := range tests {
if res := attestationToTopic(tt.att, [4]byte{} /* fork digest */); res != tt.topic {
subnet := helpers.ComputeSubnetFromCommitteeAndSlot(100, tt.att.Data.CommitteeIndex, tt.att.Data.Slot)
if res := attestationToTopic(subnet, [4]byte{} /* fork digest */); res != tt.topic {
t.Errorf("Wrong topic, got %s wanted %s", res, tt.topic)
}
}
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
// GossipTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup.
var GossipTopicMappings = map[string]proto.Message{
"/eth2/%x/beacon_block": &pb.SignedBeaconBlock{},
"/eth2/%x/committee_index%d_beacon_attestation": &pb.Attestation{},
"/eth2/%x/voluntary_exit": &pb.SignedVoluntaryExit{},
"/eth2/%x/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/%x/attester_slashing": &pb.AttesterSlashing{},
"/eth2/%x/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{},
"/eth2/%x/beacon_block": &pb.SignedBeaconBlock{},
"/eth2/%x/beacon_attestation_%d": &pb.Attestation{},
"/eth2/%x/voluntary_exit": &pb.SignedVoluntaryExit{},
"/eth2/%x/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/%x/attester_slashing": &pb.AttesterSlashing{},
"/eth2/%x/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{},
}

// GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
Expand All @@ -28,6 +29,7 @@ type P2P interface {
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
type Broadcaster interface {
Broadcast(context.Context, proto.Message) error
BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error
}

// SetStreamHandler configures p2p to handle streams of a certain topic ID.
Expand Down
Loading