diff --git a/WORKSPACE b/WORKSPACE index 3d086b3ee006..be4672cdb5d4 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -1309,7 +1309,7 @@ go_repository( go_repository( name = "com_github_prysmaticlabs_ethereumapis", - commit = "3f6a75ac9460621b140270b90057a5a445d66436", + commit = "59479f4a647fcec5d8dbf7c50435cc10fb5751fc", importpath = "github.com/prysmaticlabs/ethereumapis", patch_args = ["-p1"], patches = [ diff --git a/beacon-chain/rpc/validator/attester.go b/beacon-chain/rpc/validator/attester.go index 025a04150661..7e692ebad6dc 100644 --- a/beacon-chain/rpc/validator/attester.go +++ b/beacon-chain/rpc/validator/attester.go @@ -179,12 +179,17 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation }, nil } -// SubscribeCommitteeSubnet subscribes to the committee ID subnet given subscribe request. -func (vs *Server) SubscribeCommitteeSubnet(ctx context.Context, req *ethpb.CommitteeSubnetSubscribeRequest) (*ptypes.Empty, error) { - cache.CommitteeIDs.AddAttesterCommiteeID(req.Slot, req.CommitteeId) +// SubscribeCommitteeSubnets subscribes to the committee ID subnet given subscribe request. +func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.CommitteeSubnetsSubscribeRequest) (*ptypes.Empty, error) { + if len(req.Slots) != len(req.CommitteeIds) && len(req.CommitteeIds) != len(req.IsAggregator) { + return nil, status.Error(codes.InvalidArgument, "request fields are not the same length") + } - if req.IsAggregator { - cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slot, req.CommitteeId) + for i := 0; i < len(req.Slots); i++ { + cache.CommitteeIDs.AddAttesterCommiteeID(req.Slots[i], req.CommitteeIds[i]) + if req.IsAggregator[i] { + cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slots[i], req.CommitteeIds[i]) + } } return &ptypes.Empty{}, nil diff --git a/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch b/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch index 47c06b1940ed..9d1d1c8b9941 100644 --- a/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch +++ b/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch @@ -447,7 +447,7 @@ index 8ee263b..60607de 100644 // The epoch for which this set of validator assignments is valid. diff --git a/eth/v1alpha1/validator.proto b/eth/v1alpha1/validator.proto -index 3455de7..f4fab59 100644 +index 3bc824b..53cd49b 100644 --- a/eth/v1alpha1/validator.proto +++ b/eth/v1alpha1/validator.proto @@ -15,6 +15,7 @@ syntax = "proto3"; @@ -553,8 +553,8 @@ index 3455de7..f4fab59 100644 + bytes attestation_data_root = 1 [(gogoproto.moretags) = "ssz-size:\"32\""]; } - message CommitteeSubnetSubscribeRequest { -@@ -381,10 +383,10 @@ message CommitteeSubnetSubscribeRequest { + message CommitteeSubnetsSubscribeRequest { +@@ -382,10 +384,10 @@ message CommitteeSubnetsSubscribeRequest { // An Ethereum 2.0 validator. message Validator { // 48 byte BLS public key used for the validator's activities. diff --git a/validator/client/validator.go b/validator/client/validator.go index 7d7cb5a121b8..3cc9261d2422 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -289,6 +289,9 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { } v.duties = resp + subscribeSlots := make([]uint64, 0, len(validatingKeys)) + subscribeCommitteeIDs := make([]uint64, 0, len(validatingKeys)) + subscribeIsAggregator := make([]bool, 0, len(validatingKeys)) // Only log the full assignments output on epoch start to be less verbose. // Also log out on first launch so the user doesn't have to wait a whole epoch to see their assignments. if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived { @@ -316,13 +319,9 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } - if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ - Slot: duty.AttesterSlot, - CommitteeId: duty.CommitteeIndex, - IsAggregator: aggregator, - }); err != nil { - return err - } + subscribeSlots = append(subscribeSlots, duty.AttesterSlot) + subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex) + subscribeIsAggregator = append(subscribeIsAggregator, aggregator) } log.WithFields(lFields).Info("New assignment") @@ -343,18 +342,20 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } - if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ - Slot: duty.AttesterSlot, - CommitteeId: duty.CommitteeIndex, - IsAggregator: aggregator, - }); err != nil { - return err - } + subscribeSlots = append(subscribeSlots, duty.AttesterSlot) + subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex) + subscribeIsAggregator = append(subscribeIsAggregator, aggregator) } } } - return nil + _, err = v.validatorClient.SubscribeCommitteeSubnets(ctx, ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: subscribeSlots, + CommitteeIds: subscribeCommitteeIDs, + IsAggregator: subscribeIsAggregator, + }) + + return err } // RolesAt slot returns the validator roles at the given slot. Returns nil if the diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 436fe5bf3641..3496f0158896 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -535,6 +535,11 @@ func TestUpdateDuties_OK(t *testing.T) { gomock.Any(), ).Return(resp, nil) + client.EXPECT().SubscribeCommitteeSubnets( + gomock.Any(), + gomock.Any(), + ).Return(nil, nil) + if err := v.UpdateDuties(context.Background(), slot); err != nil { t.Fatalf("Could not update assignments: %v", err) } diff --git a/validator/internal/beacon_node_validator_service_mock.go b/validator/internal/beacon_node_validator_service_mock.go index 1be712ad60a4..0925f4c9cc56 100644 --- a/validator/internal/beacon_node_validator_service_mock.go +++ b/validator/internal/beacon_node_validator_service_mock.go @@ -969,22 +969,22 @@ func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) RecvMsg(m return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).RecvMsg), m) } -// SubscribeCommitteeSubnet mocks base method -func (m *MockBeaconNodeValidatorClient) SubscribeCommitteeSubnet(arg0 context.Context, arg1 *eth.CommitteeSubnetSubscribeRequest, arg2 ...grpc.CallOption) (*ptypes.Empty, error) { +// SubscribeCommitteeSubnets mocks base method +func (m *MockBeaconNodeValidatorClient) SubscribeCommitteeSubnets(arg0 context.Context, arg1 *eth.CommitteeSubnetsSubscribeRequest, arg2 ...grpc.CallOption) (*ptypes.Empty, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { varargs = append(varargs, a) } - ret := m.ctrl.Call(m, "SubscribeCommitteeSubnet", varargs...) + ret := m.ctrl.Call(m, "SubscribeCommitteeSubnets", varargs...) ret0, _ := ret[0].(*ptypes.Empty) ret1, _ := ret[1].(error) return ret0, ret1 } -// SubscribeCommitteeSubnet indicates an expected call of SubscribeCommitteeSubnet -func (mr *MockBeaconNodeValidatorClientMockRecorder) SubscribeCommitteeSubnet(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +// SubscribeCommitteeSubnets indicates an expected call of SubscribeCommitteeSubnets +func (mr *MockBeaconNodeValidatorClientMockRecorder) SubscribeCommitteeSubnets(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeCommitteeSubnet", reflect.TypeOf((*MockBeaconNodeValidatorClient)(nil).SubscribeCommitteeSubnet), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeCommitteeSubnets", reflect.TypeOf((*MockBeaconNodeValidatorClient)(nil).SubscribeCommitteeSubnets), varargs...) }