Skip to content

Commit

Permalink
Implement distributed attestation aggregation selection
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Apr 5, 2023
1 parent 611b280 commit 8899e32
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 25 deletions.
109 changes: 103 additions & 6 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {phase0, Slot, ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition";
import {sleep} from "@lodestar/utils";
import {Api, ApiError} from "@lodestar/api";
import {Api, ApiError, routes} from "@lodestar/api";
import {toHexString} from "@chainsafe/ssz";
import {IClock, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
Expand All @@ -15,6 +15,7 @@ import {ValidatorEventEmitter} from "./emitter.js";
export type AttestationServiceOpts = {
afterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
combinedSelectionProofs?: boolean;
};

/**
Expand Down Expand Up @@ -42,7 +43,9 @@ export class AttestationService {
private readonly metrics: Metrics | null,
private readonly opts?: AttestationServiceOpts
) {
this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics);
this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics, {
skipIsAggregator: opts?.combinedSelectionProofs,
});

// At most every slot, check existing duties from AttestationDutiesService and run tasks
clock.runEverySlot(this.runAttestationTasks);
Expand All @@ -59,6 +62,18 @@ export class AttestationService {
return;
}

if (this.opts?.combinedSelectionProofs) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other attestation tasks but must be finished before starting
// attestation aggregation as it is required to correctly determine if validator is aggregator
// and to produce a AggregateAndProof that can be threshold aggregated by the middleware client.
this.combineSelectionProofs(duties, slot, signal).catch((e) =>
this.logger.error("Error combining selection proofs", {slot}, e)
);
}

// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot) -- whichever comes first.
Expand Down Expand Up @@ -231,7 +246,7 @@ export class AttestationService {
const logCtx = {slot: attestation.slot, index: attestation.index};

// No validator is aggregator, skip
if (duties.every(({selectionProof}) => selectionProof === null)) {
if (duties.every(({isAggregator}) => !isAggregator)) {
return;
}

Expand All @@ -247,11 +262,11 @@ export class AttestationService {
const signedAggregateAndProofs: phase0.SignedAggregateAndProof[] = [];

await Promise.all(
duties.map(async ({duty, selectionProof}) => {
duties.map(async ({duty, selectionProof, isAggregator}) => {
const logCtxValidator = {...logCtx, validatorIndex: duty.validatorIndex};
try {
// Produce signed aggregates only for validators that are subscribed aggregators.
if (selectionProof !== null) {
if (isAggregator) {
signedAggregateAndProofs.push(
await this.validatorStore.signAggregateAndProof(duty, selectionProof, aggregate.data)
);
Expand All @@ -276,4 +291,86 @@ export class AttestationService {
}
}
}

/**
* Performs additional steps required if validator is part of distributed cluster
*
* - exchange partial for combined selection proofs
* - determine validators that should aggregate attestations
* - resubscribe aggregators on beacon committee subnet
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
*/
private async combineSelectionProofs(duties: AttDutyAndProof[], slot: number, signal: AbortSignal): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(({duty, selectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
selectionProof,
}));

this.logger.debug("Submitting partial selection proofs", {slot, count: partialSelections.length});

const res = await Promise.race([
this.api.validator.submitBeaconCommitteeSelections(partialSelections),
// Exit aggregations flow if there is no response after 1/3 of slot as
// beacon node would likely not have enough time to prepare an aggregate attestation.
// Note that the aggregations flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_aggregator` in duties service is not done
// and defaulted to `isAggregator=false`, meaning no validator will be an aggregator.
sleep(this.clock.msToSlot(slot + 1 / 3), signal),
]);

if (!res) {
throw new Error("No response after 1/3 of slot");
}
ApiError.assert(res);

const combinedSelections = res.response.data;
this.logger.debug("Received combined selection proofs", {slot, count: combinedSelections.length});

const beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[] = [];

for (const dutyAndProof of duties) {
const {validatorIndex, committeeIndex, committeeLength, committeesAtSlot} = dutyAndProof.duty;
const selection = combinedSelections.find((s) => s.validatorIndex === validatorIndex);
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};

if (!selection) {
this.logger.warn("Did not receive combined selection proof", logCtxValidator);
continue;
}

const isAggregator = isAggregatorFromCommitteeLength(committeeLength, selection.selectionProof);

// Replace partial with combined selection proof by mutating object
dutyAndProof.selectionProof = selection.selectionProof;
dutyAndProof.isAggregator = isAggregator;

if (isAggregator) {
// Only push subnet subscriptions with `isAggregator=true` as all validators
// with duties for slot are already subscribed to subnets with `isAggregator=false`.
beaconCommitteeSubscriptions.push({
validatorIndex,
committeesAtSlot,
committeeIndex,
slot,
isAggregator,
});
this.logger.debug("Resubscribing validator as aggregator on beacon committee subnet", logCtxValidator);
}
}

// If there are any subscriptions with aggregators, push them out to the beacon node.
if (beaconCommitteeSubscriptions.length > 0) {
try {
ApiError.assert(await this.api.validator.prepareBeaconCommitteeSubnet(beaconCommitteeSubscriptions));
this.logger.debug("Resubscribed validators as aggregators on beacon committee subnet", {
slot,
count: beaconCommitteeSubscriptions.length,
});
} catch (e) {
this.logger.error("Failed to subscribe to beacon committee subnets", {slot}, e as Error);
}
}
}
}
31 changes: 20 additions & 11 deletions packages/validator/src/services/attestationDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ const SUBSCRIPTIONS_PER_REQUEST = 8738;
/** Neatly joins the server-generated `AttesterData` with the locally-generated `selectionProof`. */
export type AttDutyAndProof = {
duty: routes.validator.AttesterDuty;
/** This value is only set to not null if the proof indicates that the validator is an aggregator. */
selectionProof: BLSSignature | null;
/** Locally-generated selection proof, only partial if validator is part of distributed cluster */
selectionProof: BLSSignature;
/** Whether the validator is an aggregator */
isAggregator: boolean;
};

// To assist with readability
type AttDutiesAtEpoch = {dependentRoot: RootHex; dutiesByIndex: Map<ValidatorIndex, AttDutyAndProof>};

type AttestationDutiesServiceOpts = {
skipIsAggregator?: boolean;
};

export class AttestationDutiesService {
/** Maps a validator public key to their duties for each epoch */
private readonly dutiesByIndexByEpoch = new Map<Epoch, AttDutiesAtEpoch>();
Expand All @@ -46,7 +52,8 @@ export class AttestationDutiesService {
private clock: IClock,
private readonly validatorStore: ValidatorStore,
chainHeadTracker: ChainHeaderTracker,
private readonly metrics: Metrics | null
private readonly metrics: Metrics | null,
private readonly opts?: AttestationDutiesServiceOpts
) {
// Running this task every epoch is safe since a re-org of two epochs is very unlikely
// TODO: If the re-org event is reliable consider re-running then
Expand Down Expand Up @@ -176,14 +183,14 @@ export class AttestationDutiesService {
for (const epoch of [currentEpoch, nextEpoch]) {
const epochDuties = this.dutiesByIndexByEpoch.get(epoch)?.dutiesByIndex;
if (epochDuties) {
for (const {duty, selectionProof} of epochDuties.values()) {
for (const {duty, isAggregator} of epochDuties.values()) {
if (indexSet.has(duty.validatorIndex)) {
beaconCommitteeSubscriptions.push({
validatorIndex: duty.validatorIndex,
committeesAtSlot: duty.committeesAtSlot,
committeeIndex: duty.committeeIndex,
slot: duty.slot,
isAggregator: selectionProof !== null,
isAggregator,
});
}
}
Expand Down Expand Up @@ -326,13 +333,15 @@ export class AttestationDutiesService {

private async getDutyAndProof(duty: routes.validator.AttesterDuty): Promise<AttDutyAndProof> {
const selectionProof = await this.validatorStore.signAttestationSelectionProof(duty.pubkey, duty.slot);
const isAggregator = isAggregatorFromCommitteeLength(duty.committeeLength, selectionProof);

return {
duty,
// selectionProof === null is used to check if is aggregator
selectionProof: isAggregator ? selectionProof : null,
};
if (this.opts?.skipIsAggregator) {
// Validator in distributed cluster only has a key share, not the full private key.
// Passing a partial selection proof to `is_aggregator` would produce incorrect result.
// Attestation service will combine selection proofs and determine aggregators at beginning of the slot.
return {duty, selectionProof, isAggregator: false};
}

return {duty, selectionProof, isAggregator: isAggregatorFromCommitteeLength(duty.committeeLength, selectionProof)};
}

/** Run once per epoch to prune duties map */
Expand Down
1 change: 1 addition & 0 deletions packages/validator/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export class Validator {
{
afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction,
disableAttestationGrouping: opts.disableAttestationGrouping || opts.distributed,
combinedSelectionProofs: opts.distributed,
}
);

Expand Down
1 change: 1 addition & 0 deletions packages/validator/test/unit/services/attestation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ describe("AttestationService", function () {
pubkey: pubkeys[0],
},
selectionProof: ZERO_HASH,
isAggregator: true,
},
];

Expand Down
21 changes: 13 additions & 8 deletions packages/validator/test/unit/services/attestationDuties.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {expect} from "chai";
import sinon from "sinon";
import {chainConfig} from "@lodestar/config/default";
import bls from "@chainsafe/bls";
import {toHexString} from "@chainsafe/ssz";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {HttpStatusCode, routes} from "@lodestar/api";
import {ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
Expand Down Expand Up @@ -35,6 +35,11 @@ describe("AttestationDutiesService", function () {
status: "active",
validator: ssz.phase0.Validator.defaultValue(),
};
const signedAttSelectionProof = fromHexString(
"0x8d80fe4be57500d2fe4f99c7d5586d9bd65ea4f9e4def0591020dd66f7d1daad" +
"1cea7520beb815423e2bc8316949ac2606da80d5d00df34f352b5a946d6a4bb4" +
"7e402f5d1167dec97af9742e61820625c5c792ddd2b8796962243d8e8cbeadee"
);

before(() => {
const secretKeys = [bls.SecretKey.fromBytes(toBufferBE(BigInt(98), 32))];
Expand Down Expand Up @@ -96,23 +101,23 @@ describe("AttestationDutiesService", function () {
Object.fromEntries(dutiesService["dutiesByIndexByEpoch"].get(epoch)?.dutiesByIndex || new Map())
).to.deep.equal(
{
// Since the ZERO_HASH won't pass the isAggregator test, selectionProof is null
[index]: {duty, selectionProof: null},
// Since the ZERO_HASH won't pass the isAggregator test
[index]: {duty, selectionProof: signedAttSelectionProof, isAggregator: false},
},
"Wrong dutiesService.attesters Map at current epoch"
);
expect(
Object.fromEntries(dutiesService["dutiesByIndexByEpoch"].get(epoch + 1)?.dutiesByIndex || new Map())
).to.deep.equal(
{
// Since the ZERO_HASH won't pass the isAggregator test, selectionProof is null
[index]: {duty, selectionProof: null},
// Since the ZERO_HASH won't pass the isAggregator test
[index]: {duty, selectionProof: signedAttSelectionProof, isAggregator: false},
},
"Wrong dutiesService.attesters Map at next epoch"
);

expect(dutiesService.getDutiesAtSlot(slot)).to.deep.equal(
[{duty, selectionProof: null}],
[{duty, selectionProof: signedAttSelectionProof, isAggregator: false}],
"Wrong getAttestersAtSlot()"
);

Expand Down Expand Up @@ -165,13 +170,13 @@ describe("AttestationDutiesService", function () {
// first confirm duties for this and next epoch should be persisted
expect(Object.fromEntries(dutiesService["dutiesByIndexByEpoch"].get(0)?.dutiesByIndex || new Map())).to.deep.equal(
{
4: {duty: duty, selectionProof: null},
4: {duty: duty, selectionProof: signedAttSelectionProof, isAggregator: false},
},
"Wrong dutiesService.attesters Map at current epoch"
);
expect(Object.fromEntries(dutiesService["dutiesByIndexByEpoch"].get(1)?.dutiesByIndex || new Map())).to.deep.equal(
{
4: {duty: duty, selectionProof: null},
4: {duty: duty, selectionProof: signedAttSelectionProof, isAggregator: false},
},
"Wrong dutiesService.attesters Map at current epoch"
);
Expand Down

0 comments on commit 8899e32

Please sign in to comment.