Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validator batch subscribe subnets #5332

Merged
merged 6 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ go_repository(

go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "3f6a75ac9460621b140270b90057a5a445d66436",
commit = "59479f4a647fcec5d8dbf7c50435cc10fb5751fc",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [
Expand Down
15 changes: 10 additions & 5 deletions beacon-chain/rpc/validator/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,17 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
}, nil
}

// SubscribeCommitteeSubnet subscribes to the committee ID subnet given subscribe request.
func (vs *Server) SubscribeCommitteeSubnet(ctx context.Context, req *ethpb.CommitteeSubnetSubscribeRequest) (*ptypes.Empty, error) {
cache.CommitteeIDs.AddAttesterCommiteeID(req.Slot, req.CommitteeId)
// SubscribeCommitteeSubnets subscribes to the committee ID subnet given subscribe request.
func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.CommitteeSubnetsSubscribeRequest) (*ptypes.Empty, error) {
if len(req.Slots) != len(req.CommitteeIds) && len(req.CommitteeIds) != len(req.IsAggregator) {
return nil, status.Error(codes.InvalidArgument, "request fields are not the same length")
}

if req.IsAggregator {
cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slot, req.CommitteeId)
for i := 0; i < len(req.Slots); i++ {
cache.CommitteeIDs.AddAttesterCommiteeID(req.Slots[i], req.CommitteeIds[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it will be good to check that req.Slots[i] has the same size as req.CommitteeIds[i]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are singular objects though , both are uint64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant the same length

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	if len(req.Slots) != len(req.CommitteeIds) && len(req.CommitteeIds) != len(req.IsAggregator) {
		return nil, status.Error(codes.InvalidArgument, "request fields are not the same length")
	}

we already check that above so it should be fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's checked on L184, am I missing anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i missed it

if req.IsAggregator[i] {
cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slots[i], req.CommitteeIds[i])
}
}

return &ptypes.Empty{}, nil
Expand Down
6 changes: 3 additions & 3 deletions third_party/com_github_prysmaticlabs_ethereumapis-tags.patch
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ index 8ee263b..60607de 100644

// The epoch for which this set of validator assignments is valid.
diff --git a/eth/v1alpha1/validator.proto b/eth/v1alpha1/validator.proto
index 3455de7..f4fab59 100644
index 3bc824b..53cd49b 100644
--- a/eth/v1alpha1/validator.proto
+++ b/eth/v1alpha1/validator.proto
@@ -15,6 +15,7 @@ syntax = "proto3";
Expand Down Expand Up @@ -553,8 +553,8 @@ index 3455de7..f4fab59 100644
+ bytes attestation_data_root = 1 [(gogoproto.moretags) = "ssz-size:\"32\""];
}

message CommitteeSubnetSubscribeRequest {
@@ -381,10 +383,10 @@ message CommitteeSubnetSubscribeRequest {
message CommitteeSubnetsSubscribeRequest {
@@ -382,10 +384,10 @@ message CommitteeSubnetsSubscribeRequest {
// An Ethereum 2.0 validator.
message Validator {
// 48 byte BLS public key used for the validator's activities.
Expand Down
31 changes: 16 additions & 15 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
}

v.duties = resp
subscribeSlots := make([]uint64, 0, len(validatingKeys))
subscribeCommitteeIDs := make([]uint64, 0, len(validatingKeys))
subscribeIsAggregator := make([]bool, 0, len(validatingKeys))
// Only log the full assignments output on epoch start to be less verbose.
// Also log out on first launch so the user doesn't have to wait a whole epoch to see their assignments.
if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived {
Expand Down Expand Up @@ -316,13 +319,9 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, &ethpb.CommitteeSubnetSubscribeRequest{
Slot: duty.AttesterSlot,
CommitteeId: duty.CommitteeIndex,
IsAggregator: aggregator,
}); err != nil {
return err
}
subscribeSlots = append(subscribeSlots, duty.AttesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
}

log.WithFields(lFields).Info("New assignment")
Expand All @@ -343,18 +342,20 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, &ethpb.CommitteeSubnetSubscribeRequest{
Slot: duty.AttesterSlot,
CommitteeId: duty.CommitteeIndex,
IsAggregator: aggregator,
}); err != nil {
return err
}
subscribeSlots = append(subscribeSlots, duty.AttesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
}
}
}

return nil
_, err = v.validatorClient.SubscribeCommitteeSubnets(ctx, &ethpb.CommitteeSubnetsSubscribeRequest{
Slots: subscribeSlots,
CommitteeIds: subscribeCommitteeIDs,
IsAggregator: subscribeIsAggregator,
})

return err
}

// RolesAt slot returns the validator roles at the given slot. Returns nil if the
Expand Down
5 changes: 5 additions & 0 deletions validator/client/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,11 @@ func TestUpdateDuties_OK(t *testing.T) {
gomock.Any(),
).Return(resp, nil)

client.EXPECT().SubscribeCommitteeSubnets(
gomock.Any(),
gomock.Any(),
).Return(nil, nil)

if err := v.UpdateDuties(context.Background(), slot); err != nil {
t.Fatalf("Could not update assignments: %v", err)
}
Expand Down
12 changes: 6 additions & 6 deletions validator/internal/beacon_node_validator_service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.