Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement distributed aggregation selections #5344

Merged
merged 11 commits into from
Apr 14, 2023
106 changes: 106 additions & 0 deletions packages/api/src/beacon/routes/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
ValidatorIndex,
RootHex,
StringType,
SubcommitteeIndex,
Wei,
} from "@lodestar/types";
import {ApiClientResponse} from "../../interfaces.js";
Expand Down Expand Up @@ -97,6 +98,32 @@ export type SyncDuty = {
validatorSyncCommitteeIndices: number[];
};

/**
* From https://github.com/ethereum/beacon-APIs/pull/224
*/
export type BeaconCommitteeSelection = {
/** Index of the validator */
validatorIndex: ValidatorIndex;
/** The slot at which a validator is assigned to attest */
slot: Slot;
/** The `slot_signature` calculated by the validator for the upcoming attestation slot */
selectionProof: BLSSignature;
};

/**
* From https://github.com/ethereum/beacon-APIs/pull/224
*/
export type SyncCommitteeSelection = {
/** Index of the validator */
validatorIndex: ValidatorIndex;
/** The slot at which validator is assigned to produce a sync committee contribution */
slot: Slot;
/** SubcommitteeIndex to which the validator is assigned */
subcommitteeIndex: SubcommitteeIndex;
/** The `slot_signature` calculated by the validator for the upcoming sync committee slot */
selectionProof: BLSSignature;
};

export type LivenessResponseData = {
index: ValidatorIndex;
epoch: Epoch;
Expand Down Expand Up @@ -303,6 +330,52 @@ export type Api = {
proposers: ProposerPreparationData[]
): Promise<ApiClientResponse<{[HttpStatusCode.OK]: void}, HttpStatusCode.BAD_REQUEST>>;

/**
* Determine if a distributed validator has been selected to aggregate attestations
*
* This endpoint is implemented by a distributed validator middleware client to exchange
* partial beacon committee selection proofs for combined/aggregated selection proofs to allow
* a validator client to correctly determine if one of its validators has been selected to
* perform an aggregation duty in this slot.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* @param requestBody An array of partial beacon committee selection proofs
* @returns An array of threshold aggregated beacon committee selection proofs
* @throws ApiError
*/
submitBeaconCommitteeSelections(
selections: BeaconCommitteeSelection[]
): Promise<
ApiClientResponse<
{[HttpStatusCode.OK]: {data: BeaconCommitteeSelection[]}},
HttpStatusCode.BAD_REQUEST | HttpStatusCode.NOT_IMPLEMENTED | HttpStatusCode.SERVICE_UNAVAILABLE
>
>;

/**
* Determine if a distributed validator has been selected to make a sync committee contribution
*
* This endpoint is implemented by a distributed validator middleware client to exchange
* partial sync committee selection proofs for combined/aggregated selection proofs to allow
* a validator client to correctly determine if one of its validators has been selected to
* perform a sync committee contribution (sync aggregation) duty in this slot.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* @param requestBody An array of partial sync committee selection proofs
* @returns An array of threshold aggregated sync committee selection proofs
* @throws ApiError
*/
submitSyncCommitteeSelections(
selections: SyncCommitteeSelection[]
): Promise<
ApiClientResponse<
{[HttpStatusCode.OK]: {data: SyncCommitteeSelection[]}},
HttpStatusCode.BAD_REQUEST | HttpStatusCode.NOT_IMPLEMENTED | HttpStatusCode.SERVICE_UNAVAILABLE
>
>;

/** Returns validator indices that have been observed to be active on the network */
getLiveness(
indices: ValidatorIndex[],
Expand Down Expand Up @@ -332,6 +405,8 @@ export const routesData: RoutesData<Api> = {
prepareBeaconCommitteeSubnet: {url: "/eth/v1/validator/beacon_committee_subscriptions", method: "POST"},
prepareSyncCommitteeSubnets: {url: "/eth/v1/validator/sync_committee_subscriptions", method: "POST"},
prepareBeaconProposer: {url: "/eth/v1/validator/prepare_beacon_proposer", method: "POST"},
submitBeaconCommitteeSelections: {url: "/eth/v1/validator/beacon_committee_selections", method: "POST"},
submitSyncCommitteeSelections: {url: "/eth/v1/validator/sync_committee_selections", method: "POST"},
getLiveness: {url: "/eth/v1/validator/liveness", method: "GET"},
registerValidator: {url: "/eth/v1/validator/register_validator", method: "POST"},
};
Expand All @@ -352,10 +427,31 @@ export type ReqTypes = {
prepareBeaconCommitteeSubnet: {body: unknown};
prepareSyncCommitteeSubnets: {body: unknown};
prepareBeaconProposer: {body: unknown};
submitBeaconCommitteeSelections: {body: unknown};
submitSyncCommitteeSelections: {body: unknown};
getLiveness: {query: {indices: ValidatorIndex[]; epoch: Epoch}};
registerValidator: {body: unknown};
};

const BeaconCommitteeSelection = new ContainerType(
{
validatorIndex: ssz.ValidatorIndex,
slot: ssz.Slot,
selectionProof: ssz.BLSSignature,
},
{jsonCase: "eth2"}
);

const SyncCommitteeSelection = new ContainerType(
{
validatorIndex: ssz.ValidatorIndex,
slot: ssz.Slot,
subcommitteeIndex: ssz.SubcommitteeIndex,
selectionProof: ssz.BLSSignature,
},
{jsonCase: "eth2"}
);

export function getReqSerializers(): ReqSerializers<Api, ReqTypes> {
const BeaconCommitteeSubscription = new ContainerType(
{
Expand Down Expand Up @@ -461,6 +557,14 @@ export function getReqSerializers(): ReqSerializers<Api, ReqTypes> {
],
schema: {body: Schema.ObjectArray},
},
submitBeaconCommitteeSelections: {
writeReq: (items) => ({body: ArrayOf(BeaconCommitteeSelection).toJson(items)}),
parseReq: () => [[]],
},
submitSyncCommitteeSelections: {
writeReq: (items) => ({body: ArrayOf(SyncCommitteeSelection).toJson(items)}),
parseReq: () => [[]],
},
getLiveness: {
writeReq: (indices, epoch) => ({query: {indices, epoch}}),
parseReq: ({query}) => [query.indices, query.epoch],
Expand Down Expand Up @@ -532,6 +636,8 @@ export function getReturnTypes(): ReturnTypes<Api> {
produceAttestationData: ContainerData(ssz.phase0.AttestationData),
produceSyncCommitteeContribution: ContainerData(ssz.altair.SyncCommitteeContribution),
getAggregatedAttestation: ContainerData(ssz.phase0.Attestation),
submitBeaconCommitteeSelections: ContainerData(ArrayOf(BeaconCommitteeSelection)),
submitSyncCommitteeSelections: ContainerData(ArrayOf(SyncCommitteeSelection)),
getLiveness: jsonType("snake"),
};
}
9 changes: 9 additions & 0 deletions packages/api/test/unit/beacon/testData/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {GenericServerTestCases} from "../../../utils/genericServerTest.js";
const ZERO_HASH = Buffer.alloc(32, 0);
const ZERO_HASH_HEX = "0x" + ZERO_HASH.toString("hex");
const randaoReveal = Buffer.alloc(96, 1);
const selectionProof = Buffer.alloc(96, 1);
const graffiti = "a".repeat(32);

export const testData: GenericServerTestCases<Api> = {
Expand Down Expand Up @@ -90,6 +91,14 @@ export const testData: GenericServerTestCases<Api> = {
args: [[{validatorIndex: "1", feeRecipient: "0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"}]],
res: undefined,
},
submitBeaconCommitteeSelections: {
args: [[]],
res: {data: [{validatorIndex: 1, slot: 2, selectionProof}]},
},
submitSyncCommitteeSelections: {
args: [[]],
res: {data: [{validatorIndex: 1, slot: 2, subcommitteeIndex: 3, selectionProof}]},
},
getLiveness: {
args: [[0], 0],
res: {data: []},
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/api/impl/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,11 @@ export class NodeIsSyncing extends ApiError {
super(503, `Node is syncing - ${statusMsg}`);
}
}

// Error thrown by beacon node APIs that are only supported by distributed validator middleware clients
// For example https://github.com/ethereum/beacon-APIs/blob/f087fbf2764e657578a6c29bdf0261b36ee8db1e/apis/validator/beacon_committee_selections.yaml
export class OnlySupportedByDVT extends ApiError {
constructor() {
super(501, "Only supported by distributed validator middleware clients");
}
}
10 changes: 9 additions & 1 deletion packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {ZERO_HASH} from "../../../constants/index.js";
import {SyncState} from "../../../sync/index.js";
import {isOptimisticBlock} from "../../../util/forkChoice.js";
import {toGraffitiBuffer} from "../../../util/graffiti.js";
import {ApiError, NodeIsSyncing} from "../errors.js";
import {ApiError, NodeIsSyncing, OnlySupportedByDVT} from "../errors.js";
import {validateSyncCommitteeGossipContributionAndProof} from "../../../chain/validation/syncCommitteeContributionAndProof.js";
import {CommitteeSubscription} from "../../../network/subnets/index.js";
import {ApiModules} from "../types.js";
Expand Down Expand Up @@ -698,6 +698,14 @@ export function getValidatorApi({
await chain.updateBeaconProposerData(chain.clock.currentEpoch, proposers);
},

async submitBeaconCommitteeSelections() {
throw new OnlySupportedByDVT();
},

async submitSyncCommitteeSelections() {
throw new OnlySupportedByDVT();
},

async getLiveness(indices: ValidatorIndex[], epoch: Epoch) {
if (indices.length === 0) {
return {
Expand Down
113 changes: 109 additions & 4 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {phase0, Slot, ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {BLSSignature, phase0, Slot, ssz} from "@lodestar/types";
import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition";
import {sleep} from "@lodestar/utils";
import {Api, ApiError} from "@lodestar/api";
import {Api, ApiError, routes} from "@lodestar/api";
import {toHexString} from "@chainsafe/ssz";
import {IClock, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
Expand All @@ -15,6 +15,7 @@ import {ValidatorEventEmitter} from "./emitter.js";
export type AttestationServiceOpts = {
afterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
distributedAggregationSelection?: boolean;
};

/**
Expand Down Expand Up @@ -42,7 +43,9 @@ export class AttestationService {
private readonly metrics: Metrics | null,
private readonly opts?: AttestationServiceOpts
) {
this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics);
this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics, {
distributedAggregationSelection: opts?.distributedAggregationSelection,
});

// At most every slot, check existing duties from AttestationDutiesService and run tasks
clock.runEverySlot(this.runAttestationTasks);
Expand All @@ -59,6 +62,18 @@ export class AttestationService {
return;
}

if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other attestation tasks but must be finished before starting
// attestation aggregation as it is required to correctly determine if validator is aggregator
// and to produce a AggregateAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(duties, slot, signal).catch((e) =>
this.logger.error("Error on attestation aggregation selection", {slot}, e)
);
}

// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot) -- whichever comes first.
Expand Down Expand Up @@ -276,4 +291,94 @@ export class AttestationService {
}
}
}

/**
* Performs additional attestation aggregation tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should aggregate attestations
* 3. Mutate duty objects to set selection proofs for aggregators
* 4. Resubscribe validators as aggregators on beacon committee subnets
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
*/
private async runDistributedAggregationSelectionTasks(
duties: AttDutyAndProof[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(
({duty, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
selectionProof: partialSelectionProof as BLSSignature,
})
);

this.logger.debug("Submitting partial beacon committee selection proofs", {slot, count: partialSelections.length});

const res = await Promise.race([
this.api.validator
.submitBeaconCommitteeSelections(partialSelections)
.catch((e) => this.logger.error("Error on submitBeaconCommitteeSelections", {slot}, e)),
nflaig marked this conversation as resolved.
Show resolved Hide resolved
// Exit attestation aggregation flow if there is no response after 1/3 of slot as
// beacon node would likely not have enough time to prepare an aggregate attestation.
// Note that the aggregations flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_aggregator` in AttestationDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.clock.msToSlot(slot + 1 / 3), signal),
]);

if (!res) {
throw new Error("submitBeaconCommitteeSelections did not resolve after 1/3 of slot");
}
ApiError.assert(res, "Error receiving combined selection proofs");

const combinedSelections = res.response.data;
this.logger.debug("Received combined beacon committee selection proofs", {slot, count: combinedSelections.length});

const beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[] = [];

for (const dutyAndProof of duties) {
const {validatorIndex, committeeIndex, committeeLength, committeesAtSlot} = dutyAndProof.duty;
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};

const combinedSelection = combinedSelections.find((s) => s.validatorIndex === validatorIndex && s.slot === slot);

if (!combinedSelection) {
this.logger.warn("Did not receive combined beacon committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isAggregatorFromCommitteeLength(committeeLength, combinedSelection.selectionProof);

if (isAggregator) {
// Update selection proof by mutating duty object
dutyAndProof.selectionProof = combinedSelection.selectionProof;

// Only push subnet subscriptions with `isAggregator=true` as all validators
// with duties for slot are already subscribed to subnets with `isAggregator=false`.
beaconCommitteeSubscriptions.push({
validatorIndex,
committeesAtSlot,
committeeIndex,
slot,
isAggregator,
});
this.logger.debug("Resubscribing validator as aggregator on beacon committee subnet", logCtxValidator);
}
}

// If there are any subscriptions with aggregators, push them out to the beacon node.
if (beaconCommitteeSubscriptions.length > 0) {
ApiError.assert(
await this.api.validator.prepareBeaconCommitteeSubnet(beaconCommitteeSubscriptions),
"Failed to resubscribe to beacon committee subnets"
);
this.logger.debug("Resubscribed validators as aggregators on beacon committee subnets", {
slot,
count: beaconCommitteeSubscriptions.length,
});
}
}
}
Loading