Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SubscribeCommitteeSubnet method #5299

Merged
merged 19 commits into from
Apr 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ go_repository(

go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "90224d88dd65813ba2f35197bb1bb8915b2d0c6b",
commit = "3f6a75ac9460621b140270b90057a5a445d66436",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
"attestation_data_test.go",
"checkpoint_state_test.go",
"committee_fuzz_test.go",
"committee_ids_test.go",
"committee_test.go",
"eth1_data_test.go",
"feature_flag_test.go",
Expand Down
70 changes: 55 additions & 15 deletions beacon-chain/cache/committee_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,79 @@ import (
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
)

type committeeIDs struct {
cache *lru.Cache
lock sync.RWMutex
attester *lru.Cache
attesterLock sync.RWMutex
aggregator *lru.Cache
aggregatorLock sync.RWMutex
}

// CommitteeIDs for attestations.
// CommitteeIDs for attester and aggregator.
var CommitteeIDs = newCommitteeIDs()

func newCommitteeIDs() *committeeIDs {
cache, err := lru.New(8)
// Given a node can calculate committee assignments of current epoch and next epoch.
// Max size is set to 2 epoch length.
cacheSize := int(params.BeaconConfig().MaxCommitteesPerSlot * params.BeaconConfig().SlotsPerEpoch * 2)
attesterCache, err := lru.New(cacheSize)
if err != nil {
panic(err)
}
return &committeeIDs{cache: cache}
aggregatorCache, err := lru.New(cacheSize)
if err != nil {
panic(err)
}
return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache}
}

// AddIDs to the cache for attestation committees by epoch.
func (t *committeeIDs) AddIDs(indices []uint64, epoch uint64) {
t.lock.Lock()
defer t.lock.Unlock()
val, exists := t.cache.Get(epoch)
// AddAttesterCommiteeID adds committee ID for subscribing subnet for the attester of a given slot.
func (c *committeeIDs) AddAttesterCommiteeID(slot uint64, committeeID uint64) {
c.attesterLock.Lock()
defer c.attesterLock.Unlock()

ids := []uint64{committeeID}
val, exists := c.attester.Get(slot)
if exists {
indices = sliceutil.UnionUint64(append(indices, val.([]uint64)...))
ids = sliceutil.UnionUint64(append(val.([]uint64), ids...))
}
t.cache.Add(epoch, indices)
c.attester.Add(slot, ids)
}

// GetIDs from the cache for attestation committees by epoch.
func (t *committeeIDs) GetIDs(epoch uint64) []uint64 {
val, exists := t.cache.Get(epoch)
// GetAttesterCommitteeIDs gets the committee ID for subscribing subnet for attester of the slot.
func (c *committeeIDs) GetAttesterCommitteeIDs(slot uint64) []uint64 {
c.attesterLock.RLock()
defer c.attesterLock.RUnlock()

val, exists := c.attester.Get(slot)
if !exists {
return []uint64{}
}
return val.([]uint64)
}

// AddAggregatorCommiteeID adds committee ID for subscribing subnet for the aggregator of a given slot.
func (c *committeeIDs) AddAggregatorCommiteeID(slot uint64, committeeID uint64) {
c.aggregatorLock.Lock()
defer c.aggregatorLock.Unlock()

ids := []uint64{committeeID}
val, exists := c.aggregator.Get(slot)
if exists {
ids = sliceutil.UnionUint64(append(val.([]uint64), ids...))
}
c.aggregator.Add(slot, ids)
}

// GetAggregatorCommitteeIDs gets the committee ID for subscribing subnet for aggregator of the slot.
func (c *committeeIDs) GetAggregatorCommitteeIDs(slot uint64) []uint64 {
c.aggregatorLock.RLock()
defer c.aggregatorLock.RUnlock()

val, exists := c.aggregator.Get(slot)
if !exists {
return []uint64{}
}
Expand Down
56 changes: 56 additions & 0 deletions beacon-chain/cache/committee_ids_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cache

import (
"reflect"
"testing"
)

func TestCommitteeIDCache_RoundTrip(t *testing.T) {
c := newCommitteeIDs()
slot := uint64(100)
committeeIDs := c.GetAggregatorCommitteeIDs(slot)
if len(committeeIDs) != 0 {
t.Errorf("Empty cache returned an object: %v", committeeIDs)
}

c.AddAggregatorCommiteeID(slot, 1)
res := c.GetAggregatorCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{1}) {
t.Error("Expected equal value to return from cache")
}

c.AddAggregatorCommiteeID(slot, 2)
res = c.GetAggregatorCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2}) {
t.Error("Expected equal value to return from cache")
}

c.AddAggregatorCommiteeID(slot, 3)
res = c.GetAggregatorCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2, 3}) {
t.Error("Expected equal value to return from cache")
}

committeeIDs = c.GetAttesterCommitteeIDs(slot)
if len(committeeIDs) != 0 {
t.Errorf("Empty cache returned an object: %v", committeeIDs)
}

c.AddAttesterCommiteeID(slot, 11)
res = c.GetAttesterCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{11}) {
t.Error("Expected equal value to return from cache")
}

c.AddAttesterCommiteeID(slot, 22)
res = c.GetAttesterCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{11, 22}) {
t.Error("Expected equal value to return from cache")
}

c.AddAttesterCommiteeID(slot, 33)
res = c.GetAttesterCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{11, 22, 33}) {
t.Error("Expected equal value to return from cache")
}
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//shared/iputils:go_default_library",
"//shared/params:go_default_library",
"//shared/runutil:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_btcsuite_btcd//btcec:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
Expand Down
9 changes: 8 additions & 1 deletion beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -354,7 +355,13 @@ func (s *Service) RefreshENR(epoch uint64) {
return
}
bitV := bitfield.NewBitvector64()
committees := cache.CommitteeIDs.GetIDs(epoch)

var committees []uint64
epochStartSlot := helpers.StartSlot(epoch)
for i := epochStartSlot; i < epochStartSlot+2*params.BeaconConfig().SlotsPerEpoch; i++ {
committees = append(committees, sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(i),
cache.CommitteeIDs.GetAggregatorCommitteeIDs(i))...)
}
for _, idx := range committees {
bitV.SetBitAt(idx, true)
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {

// Update ENR of a peer.
testService := &Service{dv5Listener: listeners[0]}
cache.CommitteeIDs.AddIDs([]uint64{10}, 0)
cache.CommitteeIDs.AddAttesterCommiteeID(0, 10)
testService.RefreshENR(0)
time.Sleep(2 * time.Second)

Expand Down
7 changes: 0 additions & 7 deletions beacon-chain/rpc/validator/assignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -87,11 +85,6 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth

}

if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs(committeeIDs, req.Epoch)
cache.CommitteeIDs.AddIDs(nextCommitteeIDs, req.Epoch+1)
}

return &ethpb.DutiesResponse{
Duties: validatorAssignments,
}, nil
Expand Down
24 changes: 12 additions & 12 deletions beacon-chain/rpc/validator/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
Expand Down Expand Up @@ -36,12 +37,6 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation
trace.Int64Attribute("committeeIndex", int64(req.CommitteeIndex)),
)

// If attestation committee subnets are enabled, we track the committee
// index into a cache.
if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs([]uint64{req.CommitteeIndex}, helpers.SlotToEpoch(req.Slot))
}

if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
Expand Down Expand Up @@ -159,12 +154,6 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature")
}

// If attestation committee subnets are enabled, we track the committee
// index into a cache.
if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs([]uint64{att.Data.CommitteeIndex}, helpers.SlotToEpoch(att.Data.Slot))
}

root, err := ssz.HashTreeRoot(att.Data)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err)
Expand Down Expand Up @@ -235,3 +224,14 @@ func (vs *Server) waitToOneThird(ctx context.Context, slot uint64) {
}
}
}

// SubscribeCommitteeSubnet subscribes to the committee ID subnet given subscribe request.
func (vs *Server) SubscribeCommitteeSubnet(ctx context.Context, req *ethpb.CommitteeSubnetSubscribeRequest) (*ptypes.Empty, error) {
cache.CommitteeIDs.AddAttesterCommiteeID(req.Slot, req.CommitteeId)

if req.IsAggregator {
cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slot, req.CommitteeId)
}

return &ptypes.Empty{}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *Service) committeesCount() int {
}

func (r *Service) committeeIndices() []uint64 {
currentEpoch := helpers.SlotToEpoch(r.chain.HeadSlot())
return sliceutil.UnionUint64(cache.CommitteeIDs.GetIDs(currentEpoch),
cache.CommitteeIDs.GetIDs(currentEpoch+1))
currentSlot := r.chain.CurrentSlot()
return sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(currentSlot),
cache.CommitteeIDs.GetAggregatorCommitteeIDs(currentSlot))
}
22 changes: 12 additions & 10 deletions third_party/com_github_prysmaticlabs_ethereumapis-tags.patch
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ index 8ee263b..60607de 100644

// The epoch for which this set of validator assignments is valid.
diff --git a/eth/v1alpha1/validator.proto b/eth/v1alpha1/validator.proto
index 068a04c..80f73b8 100644
index 3455de7..f4fab59 100644
--- a/eth/v1alpha1/validator.proto
+++ b/eth/v1alpha1/validator.proto
@@ -15,6 +15,7 @@ syntax = "proto3";
Expand All @@ -458,7 +458,7 @@ index 068a04c..80f73b8 100644
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "eth/v1alpha1/beacon_block.proto";
@@ -197,7 +198,7 @@ message DomainResponse {
@@ -208,7 +209,7 @@ message DomainResponse {

message ValidatorActivationRequest {
// A list of 48 byte validator public keys.
Expand All @@ -467,7 +467,7 @@ index 068a04c..80f73b8 100644
}

message ValidatorActivationResponse {
@@ -223,7 +224,7 @@ message ChainStartResponse {
@@ -234,7 +235,7 @@ message ChainStartResponse {

message ValidatorIndexRequest {
// A 48 byte validator public key.
Expand All @@ -476,7 +476,7 @@ index 068a04c..80f73b8 100644
}

message ValidatorIndexResponse {
@@ -233,7 +234,7 @@ message ValidatorIndexResponse {
@@ -244,7 +245,7 @@ message ValidatorIndexResponse {

message ValidatorStatusRequest {
// A 48 byte validator public key.
Expand All @@ -485,7 +485,7 @@ index 068a04c..80f73b8 100644
}

enum ValidatorStatus {
@@ -271,7 +272,7 @@ message DutiesRequest {
@@ -282,7 +283,7 @@ message DutiesRequest {
uint64 epoch = 1;

// Array of byte encoded BLS public keys.
Expand All @@ -494,7 +494,7 @@ index 068a04c..80f73b8 100644
}

message DutiesResponse {
@@ -290,7 +291,7 @@ message DutiesResponse {
@@ -301,7 +302,7 @@ message DutiesResponse {
uint64 proposer_slot = 4;

// 48 byte BLS public key for the validator who's assigned to perform a duty.
Expand All @@ -503,7 +503,7 @@ index 068a04c..80f73b8 100644

// The current status of the validator assigned to perform the duty.
ValidatorStatus status = 6;
@@ -305,15 +306,16 @@ message BlockRequest {
@@ -316,15 +317,16 @@ message BlockRequest {
uint64 slot = 1;

// Validator's 32 byte randao reveal secret of the current epoch.
Expand All @@ -523,7 +523,7 @@ index 068a04c..80f73b8 100644
}

message AttestationDataRequest {
@@ -326,7 +328,7 @@ message AttestationDataRequest {
@@ -337,7 +339,7 @@ message AttestationDataRequest {

message AttestResponse {
// The root of the attestation data successfully submitted to the beacon node.
Expand All @@ -532,7 +532,7 @@ index 068a04c..80f73b8 100644
}

message AggregateSelectionRequest {
@@ -335,10 +337,10 @@ message AggregateSelectionRequest {
@@ -346,10 +348,10 @@ message AggregateSelectionRequest {
// Committee index of the validator at the given slot.
uint64 committee_index = 2;
// 48 byte public key of the validator.
Expand All @@ -545,14 +545,16 @@ index 068a04c..80f73b8 100644
}

message AggregateSelectionResponse {
@@ -353,16 +355,16 @@ message SignedAggregateSubmitRequest {
@@ -364,7 +366,7 @@ message SignedAggregateSubmitRequest {

message SignedAggregateSubmitResponse {
// The 32 byte hash tree root of the aggregated attestation data.
- bytes attestation_data_root = 1;
+ bytes attestation_data_root = 1 [(gogoproto.moretags) = "ssz-size:\"32\""];
}

message CommitteeSubnetSubscribeRequest {
@@ -381,10 +383,10 @@ message CommitteeSubnetSubscribeRequest {
// An Ethereum 2.0 validator.
message Validator {
// 48 byte BLS public key used for the validator's activities.
Expand Down
Loading