diff --git a/packages/lodestar/src/api/impl/beacon/pool/index.ts b/packages/lodestar/src/api/impl/beacon/pool/index.ts index 93bee203c164..8d5d5c0c8ff1 100644 --- a/packages/lodestar/src/api/impl/beacon/pool/index.ts +++ b/packages/lodestar/src/api/impl/beacon/pool/index.ts @@ -125,16 +125,33 @@ export function getBeaconPoolApi({ // Worst case if `signature` is not valid, gossip peers will drop it and slightly downscore us. await validateSyncCommitteeSigOnly(chain, state, signature); + // The same validator can appear multiple times in the sync committee. It can appear multiple times per + // subnet even. First compute on which subnet the signature must be broadcasted to. + const subnets: number[] = []; + + for (const indexInCommittee of indexesInCommittee) { + // Sync committee subnet members are just sequential in the order they appear in SyncCommitteeIndexes array + const subnet = Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE); + const indexInSubcommittee = indexInCommittee % SYNC_COMMITTEE_SUBNET_SIZE; + chain.syncCommitteeMessagePool.add(subnet, signature, indexInSubcommittee); + + // Cheap de-duplication code to avoid using a Set. indexesInCommittee is always sorted + if (subnets.length === 0 || subnets[subnets.length - 1] !== subnet) { + subnets.push(subnet); + } + } + + // TODO: Broadcast at once to all topics await Promise.all( - indexesInCommittee.map(async (indexInCommittee) => { - // Sync committee subnet members are just sequential in the order they appear in SyncCommitteeIndexes array - const subnet = Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE); - const indexInSubcommittee = indexInCommittee % SYNC_COMMITTEE_SUBNET_SIZE; - chain.syncCommitteeMessagePool.add(subnet, signature, indexInSubcommittee); - await network.gossip.publishSyncCommitteeSignature(signature, subnet); - }) + subnets.map(async (subnet) => network.gossip.publishSyncCommitteeSignature(signature, subnet)) ); } catch (e) { + // TODO: gossipsub should allow publishing same message to different topics + // https://github.com/ChainSafe/js-libp2p-gossipsub/issues/272 + if ((e as Error).message === "PublishError.Duplicate") { + return; + } + errors.push(e as Error); logger.error( `Error on submitPoolSyncCommitteeSignatures [${i}]`, diff --git a/packages/validator/src/services/syncCommitteeDuties.ts b/packages/validator/src/services/syncCommitteeDuties.ts index f8d334373829..d257dd565c24 100644 --- a/packages/validator/src/services/syncCommitteeDuties.ts +++ b/packages/validator/src/services/syncCommitteeDuties.ts @@ -5,7 +5,7 @@ import { isSyncCommitteeAggregator, } from "@chainsafe/lodestar-beacon-state-transition"; import {IChainForkConfig} from "@chainsafe/lodestar-config"; -import {BLSSignature, Epoch, Root, Slot, SyncPeriod, ValidatorIndex} from "@chainsafe/lodestar-types"; +import {BLSSignature, Epoch, RootHex, Slot, SyncPeriod, ValidatorIndex} from "@chainsafe/lodestar-types"; import {toHexString} from "@chainsafe/ssz"; import {Api, routes} from "@chainsafe/lodestar-api"; import {extendError} from "@chainsafe/lodestar-utils"; @@ -14,6 +14,7 @@ import {PubkeyHex} from "../types.js"; import {Metrics} from "../metrics.js"; import {ValidatorStore} from "./validatorStore.js"; import {IndicesService} from "./indices.js"; +import {syncCommitteeIndicesToSubnets} from "./utils.js"; /** Only retain `HISTORICAL_DUTIES_PERIODS` duties prior to the current periods. */ const HISTORICAL_DUTIES_PERIODS = 2; @@ -32,6 +33,18 @@ const ALTAIR_FORK_LOOKAHEAD_EPOCHS = 0; /** How many epochs prior from a subscription starting, ask the node to subscribe */ const SUBSCRIPTIONS_LOOKAHEAD_EPOCHS = 2; +export type SyncDutySubnet = { + pubkey: string; + /** Index of validator in validator registry. */ + validatorIndex: ValidatorIndex; + /** + * The indices of the validator in the sync committee. + * The same validator can appear multiples in the sync committee. Given how sync messages are constructor, the + * validator client only cares in which subnets the validator is in, not the specific index. + */ + subnets: number[]; +}; + export type SyncSelectionProof = { /** This value is only set to not null if the proof indicates that the validator is an aggregator. */ selectionProof: BLSSignature | null; @@ -40,7 +53,7 @@ export type SyncSelectionProof = { /** Neatly joins SyncDuty with the locally-generated `selectionProof`. */ export type SyncDutyAndProofs = { - duty: routes.validator.SyncDuty; + duty: SyncDutySubnet; /** * Array because the same validator can appear multiple times in the sync committee. * `routes.validator.SyncDuty` `.validatorSyncCommitteeIndices` is an array for that reason. @@ -51,7 +64,7 @@ export type SyncDutyAndProofs = { }; // To assist with readability -type DutyAtPeriod = {dependentRoot: Root; duty: routes.validator.SyncDuty}; +type DutyAtPeriod = {dependentRoot: RootHex; duty: SyncDutySubnet}; /** * Validators are part of a static long (~27h) sync committee, and part of static subnets. @@ -115,7 +128,7 @@ export class SyncCommitteeDutiesService { removeDutiesForKey(pubkey: PubkeyHex): void { for (const [syncPeriod, validatorDutyAtPeriodMap] of this.dutiesByIndexByPeriod) { for (const [validatorIndex, dutyAtPeriod] of validatorDutyAtPeriodMap) { - if (toHexString(dutyAtPeriod.duty.pubkey) === pubkey) { + if (dutyAtPeriod.duty.pubkey === pubkey) { validatorDutyAtPeriodMap.delete(validatorIndex); if (validatorDutyAtPeriodMap.size === 0) { this.dutiesByIndexByPeriod.delete(syncPeriod); @@ -194,7 +207,9 @@ export class SyncCommitteeDutiesService { if (currentEpoch >= fromEpoch - SUBSCRIPTIONS_LOOKAHEAD_EPOCHS) { syncCommitteeSubscriptions.push({ validatorIndex, - syncCommitteeIndices: dutyAtEpoch.duty.validatorSyncCommitteeIndices, + // prepareSyncCommitteeSubnets does not care about which specific index in the sync committee the + // validator is, but at what subnets is it participating. + syncCommitteeIndices: dutyAtEpoch.duty.subnets.map((subnet) => subnet * SYNC_COMMITTEE_SUBNET_SIZE), untilEpoch, // No need to send isAggregator here since the beacon node will assume validator always aggregates }); @@ -226,7 +241,7 @@ export class SyncCommitteeDutiesService { throw extendError(e, "Failed to obtain SyncDuties"); }); - const dependentRoot = syncDuties.dependentRoot; + const dependentRoot = toHexString(syncDuties.dependentRoot); const dutiesByIndex = new Map(); let count = 0; @@ -237,6 +252,19 @@ export class SyncCommitteeDutiesService { } count++; + // Note: For networks where `state.validators.length < SYNC_COMMITTEE_SIZE` the same validator can appear + // multiple times in the sync committee. So `routes.validator.SyncDuty` `.validatorSyncCommitteeIndices` + // is an array, with all of those apparences. + // + // Validator signs two messages: + // `SyncCommitteeMessage`: + // - depends on slot, blockRoot, and validatorIndex. + // - Validator signs and publishes only one message regardless of validatorSyncCommitteeIndices length + // `SyncCommitteeContribution`: + // - depends on slot, blockRoot, validatorIndex, and subnet. + // - Validator must sign and publish only one message per subnet MAX. Regarless of validatorSyncCommitteeIndices + const subnets = syncCommitteeIndicesToSubnets(duty.validatorSyncCommitteeIndices); + // TODO: Enable dependentRoot functionality // Meanwhile just overwrite them, since the latest duty will be older and less likely to re-org // @@ -246,9 +274,12 @@ export class SyncCommitteeDutiesService { // - The dependent root has changed, signalling a re-org. // // if (reorg) this.metrics?.syncCommitteeDutiesReorg.inc() - + // // Using `alreadyWarnedReorg` avoids excessive logs. - dutiesByIndex.set(validatorIndex, {dependentRoot, duty}); + + // TODO: Use memory-efficient toHexString() + const pubkeyHex = toHexString(duty.pubkey); + dutiesByIndex.set(validatorIndex, {dependentRoot, duty: {pubkey: pubkeyHex, validatorIndex, subnets}}); } // these could be redundant duties due to the state of next period query reorged @@ -257,25 +288,17 @@ export class SyncCommitteeDutiesService { const period = computeSyncPeriodAtEpoch(epoch); this.dutiesByIndexByPeriod.set(period, dutiesByIndex); - this.logger.debug("Downloaded SyncDuties", { - epoch, - dependentRoot: toHexString(dependentRoot), - count, - }); + this.logger.debug("Downloaded SyncDuties", {epoch, dependentRoot, count}); } - private async getSelectionProofs(slot: Slot, duty: routes.validator.SyncDuty): Promise { - // Fast indexing with precomputed pubkeyHex. Fallback to toHexString(duty.pubkey) - const pubkey = this.indicesService.index2pubkey.get(duty.validatorIndex) ?? toHexString(duty.pubkey); - + private async getSelectionProofs(slot: Slot, duty: SyncDutySubnet): Promise { const dutiesAndProofs: SyncSelectionProof[] = []; - for (const index of duty.validatorSyncCommitteeIndices) { - const subcommitteeIndex = Math.floor(index / SYNC_COMMITTEE_SUBNET_SIZE); - const selectionProof = await this.validatorStore.signSyncCommitteeSelectionProof(pubkey, slot, subcommitteeIndex); + for (const subnet of duty.subnets) { + const selectionProof = await this.validatorStore.signSyncCommitteeSelectionProof(duty.pubkey, slot, subnet); dutiesAndProofs.push({ // selectionProof === null is used to check if is aggregator selectionProof: isSyncCommitteeAggregator(selectionProof) ? selectionProof : null, - subcommitteeIndex, + subcommitteeIndex: subnet, }); } return dutiesAndProofs; diff --git a/packages/validator/src/services/utils.ts b/packages/validator/src/services/utils.ts index 5d69d0532d03..da0459bb5616 100644 --- a/packages/validator/src/services/utils.ts +++ b/packages/validator/src/services/utils.ts @@ -1,11 +1,11 @@ -import {routes} from "@chainsafe/lodestar-api"; +import {SYNC_COMMITTEE_SUBNET_SIZE} from "@chainsafe/lodestar-params"; import {CommitteeIndex, SubcommitteeIndex} from "@chainsafe/lodestar-types"; import {AttDutyAndProof} from "./attestationDuties.js"; -import {SyncDutyAndProofs, SyncSelectionProof} from "./syncCommitteeDuties.js"; +import {SyncDutyAndProofs, SyncDutySubnet, SyncSelectionProof} from "./syncCommitteeDuties.js"; /** Sync committee duty associated to a single sub committee subnet */ export type SubcommitteeDuty = { - duty: routes.validator.SyncDuty; + duty: SyncDutySubnet; selectionProof: SyncSelectionProof["selectionProof"]; }; @@ -43,3 +43,16 @@ export function groupSyncDutiesBySubcommitteeIndex( return dutiesBySubcommitteeIndex; } + +/** + * Given a list of indexes of a sync committee returns the list of unique subnet numbers the indexes are part of + */ +export function syncCommitteeIndicesToSubnets(indexesInCommittee: number[]): number[] { + const subnets = new Set(); + + for (const indexInCommittee of indexesInCommittee) { + subnets.add(Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE)); + } + + return Array.from(subnets); +} diff --git a/packages/validator/src/services/validatorStore.ts b/packages/validator/src/services/validatorStore.ts index 8034cc7be0b8..ec963ee21ecd 100644 --- a/packages/validator/src/services/validatorStore.ts +++ b/packages/validator/src/services/validatorStore.ts @@ -52,6 +52,8 @@ export type SignerRemote = { pubkeyHex: PubkeyHex; }; +type BLSPubkeyMaybeHex = BLSPubkey | string; + /** * Validator entity capable of producing signatures. Either: * - local: With BLS secret key @@ -213,7 +215,7 @@ export class ValidatorStore { } async signSyncCommitteeSignature( - pubkey: BLSPubkey, + pubkey: BLSPubkeyMaybeHex, validatorIndex: ValidatorIndex, slot: Slot, beaconBlockRoot: Root @@ -230,7 +232,7 @@ export class ValidatorStore { } async signContributionAndProof( - duty: Pick, + duty: {pubkey: BLSPubkeyMaybeHex; validatorIndex: number}, selectionProof: BLSSignature, contribution: altair.SyncCommitteeContribution ): Promise { @@ -249,7 +251,7 @@ export class ValidatorStore { }; } - async signAttestationSelectionProof(pubkey: BLSPubkey, slot: Slot): Promise { + async signAttestationSelectionProof(pubkey: BLSPubkeyMaybeHex, slot: Slot): Promise { const domain = this.config.getDomain(DOMAIN_SELECTION_PROOF, slot); const signingRoot = computeSigningRoot(ssz.Slot, slot, domain); @@ -257,7 +259,7 @@ export class ValidatorStore { } async signSyncCommitteeSelectionProof( - pubkey: BLSPubkey | string, + pubkey: BLSPubkeyMaybeHex, slot: Slot, subcommitteeIndex: number ): Promise { @@ -273,7 +275,7 @@ export class ValidatorStore { } async signVoluntaryExit( - pubkey: PubkeyHex, + pubkey: BLSPubkeyMaybeHex, validatorIndex: number, exitEpoch: Epoch ): Promise { @@ -288,7 +290,7 @@ export class ValidatorStore { }; } - private async getSignature(pubkey: BLSPubkey | string, signingRoot: Uint8Array): Promise { + private async getSignature(pubkey: BLSPubkeyMaybeHex, signingRoot: Uint8Array): Promise { // TODO: Refactor indexing to not have to run toHexString() on the pubkey every time const pubkeyHex = typeof pubkey === "string" ? pubkey : toHexString(pubkey); diff --git a/packages/validator/test/unit/services/syncCommittee.test.ts b/packages/validator/test/unit/services/syncCommittee.test.ts index 0ed825723dee..921d4da7613a 100644 --- a/packages/validator/test/unit/services/syncCommittee.test.ts +++ b/packages/validator/test/unit/services/syncCommittee.test.ts @@ -69,9 +69,9 @@ describe("SyncCommitteeService", function () { const duties: SyncDutyAndProofs[] = [ { duty: { - pubkey: pubkeys[0], + pubkey: toHexString(pubkeys[0]), validatorIndex: 0, - validatorSyncCommitteeIndices: [7], + subnets: [0], }, selectionProofs: [{selectionProof: ZERO_HASH, subcommitteeIndex: 0}], }, diff --git a/packages/validator/test/unit/services/utils.test.ts b/packages/validator/test/unit/services/utils.test.ts new file mode 100644 index 000000000000..5c18932c2725 --- /dev/null +++ b/packages/validator/test/unit/services/utils.test.ts @@ -0,0 +1,25 @@ +import {expect} from "chai"; +import {SYNC_COMMITTEE_SUBNET_SIZE} from "@chainsafe/lodestar-params"; +import {syncCommitteeIndicesToSubnets} from "../../../src/services/utils.js"; + +describe("services / utils / syncCommitteeIndicesToSubnets", () => { + before("Check SYNC_COMMITTEE_SUBNET_SIZE", () => { + expect(SYNC_COMMITTEE_SUBNET_SIZE).equals(128); + }); + + const testCases: {indexes: number[]; subnets: number[]}[] = [ + {indexes: [], subnets: []}, + {indexes: [0], subnets: [0]}, + {indexes: [0, 1, 2], subnets: [0]}, + {indexes: [0, 128, 256, 384], subnets: [0, 1, 2, 3]}, + {indexes: [0, 1, 128, 129, 256, 257, 384, 385], subnets: [0, 1, 2, 3]}, + // Non-sorted case + {indexes: [256, 0, 1, 2], subnets: [2, 0]}, + ]; + + for (const {indexes, subnets} of testCases) { + it(indexes.join(","), () => { + expect(syncCommitteeIndicesToSubnets(indexes)).deep.equals(subnets); + }); + } +});