Skip to content

Commit

Permalink
Merge 399eafc into 4bbe4a3
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion authored Jun 9, 2022
2 parents 4bbe4a3 + 399eafc commit c4fff81
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 58 deletions.
31 changes: 24 additions & 7 deletions packages/lodestar/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,33 @@ export function getBeaconPoolApi({
// Worst case if `signature` is not valid, gossip peers will drop it and slightly downscore us.
await validateSyncCommitteeSigOnly(chain, state, signature);

// The same validator can appear multiple times in the sync committee. It can appear multiple times per
// subnet even. First compute on which subnet the signature must be broadcasted to.
const subnets: number[] = [];

for (const indexInCommittee of indexesInCommittee) {
// Sync committee subnet members are just sequential in the order they appear in SyncCommitteeIndexes array
const subnet = Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE);
const indexInSubcommittee = indexInCommittee % SYNC_COMMITTEE_SUBNET_SIZE;
chain.syncCommitteeMessagePool.add(subnet, signature, indexInSubcommittee);

// Cheap de-duplication code to avoid using a Set. indexesInCommittee is always sorted
if (subnets.length === 0 || subnets[subnets.length - 1] !== subnet) {
subnets.push(subnet);
}
}

// TODO: Broadcast at once to all topics
await Promise.all(
indexesInCommittee.map(async (indexInCommittee) => {
// Sync committee subnet members are just sequential in the order they appear in SyncCommitteeIndexes array
const subnet = Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE);
const indexInSubcommittee = indexInCommittee % SYNC_COMMITTEE_SUBNET_SIZE;
chain.syncCommitteeMessagePool.add(subnet, signature, indexInSubcommittee);
await network.gossip.publishSyncCommitteeSignature(signature, subnet);
})
subnets.map(async (subnet) => network.gossip.publishSyncCommitteeSignature(signature, subnet))
);
} catch (e) {
// TODO: gossipsub should allow publishing same message to different topics
// https://github.com/ChainSafe/js-libp2p-gossipsub/issues/272
if ((e as Error).message === "PublishError.Duplicate") {
return;
}

errors.push(e as Error);
logger.error(
`Error on submitPoolSyncCommitteeSignatures [${i}]`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ export class SyncContributionAndProofPool {
const {contribution} = contributionAndProof;
const {slot, beaconBlockRoot} = contribution;
const rootHex = toHexString(beaconBlockRoot);
const lowestPermissibleSlot = this.lowestPermissibleSlot;

// Reject if too old.
if (slot < lowestPermissibleSlot) {
if (slot < this.lowestPermissibleSlot) {
return InsertOutcome.Old;
}

Expand Down
71 changes: 50 additions & 21 deletions packages/validator/src/services/syncCommitteeDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
isSyncCommitteeAggregator,
} from "@chainsafe/lodestar-beacon-state-transition";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {BLSSignature, Epoch, Root, Slot, SyncPeriod, ValidatorIndex} from "@chainsafe/lodestar-types";
import {BLSSignature, Epoch, RootHex, Slot, SyncPeriod, ValidatorIndex} from "@chainsafe/lodestar-types";
import {toHexString} from "@chainsafe/ssz";
import {Api, routes} from "@chainsafe/lodestar-api";
import {extendError} from "@chainsafe/lodestar-utils";
Expand All @@ -14,6 +14,7 @@ import {PubkeyHex} from "../types.js";
import {Metrics} from "../metrics.js";
import {ValidatorStore} from "./validatorStore.js";
import {IndicesService} from "./indices.js";
import {syncCommitteeIndicesToSubnets} from "./utils.js";

/** Only retain `HISTORICAL_DUTIES_PERIODS` duties prior to the current periods. */
const HISTORICAL_DUTIES_PERIODS = 2;
Expand All @@ -32,6 +33,18 @@ const ALTAIR_FORK_LOOKAHEAD_EPOCHS = 0;
/** How many epochs prior from a subscription starting, ask the node to subscribe */
const SUBSCRIPTIONS_LOOKAHEAD_EPOCHS = 2;

export type SyncDutySubnet = {
pubkey: string;
/** Index of validator in validator registry. */
validatorIndex: ValidatorIndex;
/**
* The indices of the validator in the sync committee.
* The same validator can appear multiples in the sync committee. Given how sync messages are constructor, the
* validator client only cares in which subnets the validator is in, not the specific index.
*/
subnets: number[];
};

export type SyncSelectionProof = {
/** This value is only set to not null if the proof indicates that the validator is an aggregator. */
selectionProof: BLSSignature | null;
Expand All @@ -40,12 +53,18 @@ export type SyncSelectionProof = {

/** Neatly joins SyncDuty with the locally-generated `selectionProof`. */
export type SyncDutyAndProofs = {
duty: routes.validator.SyncDuty;
duty: SyncDutySubnet;
/**
* Array because the same validator can appear multiple times in the sync committee.
* `routes.validator.SyncDuty` `.validatorSyncCommitteeIndices` is an array for that reason.
* SelectionProof signs over slot + index in committee, so the length of `.selectionProofs` equals
* `.validatorSyncCommitteeIndices`.
*/
selectionProofs: SyncSelectionProof[];
};

// To assist with readability
type DutyAtPeriod = {dependentRoot: Root; duty: routes.validator.SyncDuty};
type DutyAtPeriod = {dependentRoot: RootHex; duty: SyncDutySubnet};

/**
* Validators are part of a static long (~27h) sync committee, and part of static subnets.
Expand Down Expand Up @@ -109,7 +128,7 @@ export class SyncCommitteeDutiesService {
removeDutiesForKey(pubkey: PubkeyHex): void {
for (const [syncPeriod, validatorDutyAtPeriodMap] of this.dutiesByIndexByPeriod) {
for (const [validatorIndex, dutyAtPeriod] of validatorDutyAtPeriodMap) {
if (toHexString(dutyAtPeriod.duty.pubkey) === pubkey) {
if (dutyAtPeriod.duty.pubkey === pubkey) {
validatorDutyAtPeriodMap.delete(validatorIndex);
if (validatorDutyAtPeriodMap.size === 0) {
this.dutiesByIndexByPeriod.delete(syncPeriod);
Expand Down Expand Up @@ -188,7 +207,9 @@ export class SyncCommitteeDutiesService {
if (currentEpoch >= fromEpoch - SUBSCRIPTIONS_LOOKAHEAD_EPOCHS) {
syncCommitteeSubscriptions.push({
validatorIndex,
syncCommitteeIndices: dutyAtEpoch.duty.validatorSyncCommitteeIndices,
// prepareSyncCommitteeSubnets does not care about which specific index in the sync committee the
// validator is, but at what subnets is it participating.
syncCommitteeIndices: dutyAtEpoch.duty.subnets.map((subnet) => subnet * SYNC_COMMITTEE_SUBNET_SIZE),
untilEpoch,
// No need to send isAggregator here since the beacon node will assume validator always aggregates
});
Expand Down Expand Up @@ -220,7 +241,7 @@ export class SyncCommitteeDutiesService {
throw extendError(e, "Failed to obtain SyncDuties");
});

const dependentRoot = syncDuties.dependentRoot;
const dependentRoot = toHexString(syncDuties.dependentRoot);
const dutiesByIndex = new Map<ValidatorIndex, DutyAtPeriod>();
let count = 0;

Expand All @@ -231,6 +252,19 @@ export class SyncCommitteeDutiesService {
}
count++;

// Note: For networks where `state.validators.length < SYNC_COMMITTEE_SIZE` the same validator can appear
// multiple times in the sync committee. So `routes.validator.SyncDuty` `.validatorSyncCommitteeIndices`
// is an array, with all of those apparences.
//
// Validator signs two messages:
// `SyncCommitteeMessage`:
// - depends on slot, blockRoot, and validatorIndex.
// - Validator signs and publishes only one message regardless of validatorSyncCommitteeIndices length
// `SyncCommitteeContribution`:
// - depends on slot, blockRoot, validatorIndex, and subnet.
// - Validator must sign and publish only one message per subnet MAX. Regarless of validatorSyncCommitteeIndices
const subnets = syncCommitteeIndicesToSubnets(duty.validatorSyncCommitteeIndices);

// TODO: Enable dependentRoot functionality
// Meanwhile just overwrite them, since the latest duty will be older and less likely to re-org
//
Expand All @@ -240,9 +274,12 @@ export class SyncCommitteeDutiesService {
// - The dependent root has changed, signalling a re-org.
//
// if (reorg) this.metrics?.syncCommitteeDutiesReorg.inc()

//
// Using `alreadyWarnedReorg` avoids excessive logs.
dutiesByIndex.set(validatorIndex, {dependentRoot, duty});

// TODO: Use memory-efficient toHexString()
const pubkeyHex = toHexString(duty.pubkey);
dutiesByIndex.set(validatorIndex, {dependentRoot, duty: {pubkey: pubkeyHex, validatorIndex, subnets}});
}

// these could be redundant duties due to the state of next period query reorged
Expand All @@ -251,25 +288,17 @@ export class SyncCommitteeDutiesService {
const period = computeSyncPeriodAtEpoch(epoch);
this.dutiesByIndexByPeriod.set(period, dutiesByIndex);

this.logger.debug("Downloaded SyncDuties", {
epoch,
dependentRoot: toHexString(dependentRoot),
count,
});
this.logger.debug("Downloaded SyncDuties", {epoch, dependentRoot, count});
}

private async getSelectionProofs(slot: Slot, duty: routes.validator.SyncDuty): Promise<SyncSelectionProof[]> {
// Fast indexing with precomputed pubkeyHex. Fallback to toHexString(duty.pubkey)
const pubkey = this.indicesService.index2pubkey.get(duty.validatorIndex) ?? toHexString(duty.pubkey);

private async getSelectionProofs(slot: Slot, duty: SyncDutySubnet): Promise<SyncSelectionProof[]> {
const dutiesAndProofs: SyncSelectionProof[] = [];
for (const index of duty.validatorSyncCommitteeIndices) {
const subcommitteeIndex = Math.floor(index / SYNC_COMMITTEE_SUBNET_SIZE);
const selectionProof = await this.validatorStore.signSyncCommitteeSelectionProof(pubkey, slot, subcommitteeIndex);
for (const subnet of duty.subnets) {
const selectionProof = await this.validatorStore.signSyncCommitteeSelectionProof(duty.pubkey, slot, subnet);
dutiesAndProofs.push({
// selectionProof === null is used to check if is aggregator
selectionProof: isSyncCommitteeAggregator(selectionProof) ? selectionProof : null,
subcommitteeIndex,
subcommitteeIndex: subnet,
});
}
return dutiesAndProofs;
Expand Down
19 changes: 16 additions & 3 deletions packages/validator/src/services/utils.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {routes} from "@chainsafe/lodestar-api";
import {SYNC_COMMITTEE_SUBNET_SIZE} from "@chainsafe/lodestar-params";
import {CommitteeIndex, SubcommitteeIndex} from "@chainsafe/lodestar-types";
import {AttDutyAndProof} from "./attestationDuties.js";
import {SyncDutyAndProofs, SyncSelectionProof} from "./syncCommitteeDuties.js";
import {SyncDutyAndProofs, SyncDutySubnet, SyncSelectionProof} from "./syncCommitteeDuties.js";

/** Sync committee duty associated to a single sub committee subnet */
export type SubcommitteeDuty = {
duty: routes.validator.SyncDuty;
duty: SyncDutySubnet;
selectionProof: SyncSelectionProof["selectionProof"];
};

Expand Down Expand Up @@ -43,3 +43,16 @@ export function groupSyncDutiesBySubcommitteeIndex(

return dutiesBySubcommitteeIndex;
}

/**
* Given a list of indexes of a sync committee returns the list of unique subnet numbers the indexes are part of
*/
export function syncCommitteeIndicesToSubnets(indexesInCommittee: number[]): number[] {
const subnets = new Set<number>();

for (const indexInCommittee of indexesInCommittee) {
subnets.add(Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE));
}

return Array.from(subnets);
}
14 changes: 8 additions & 6 deletions packages/validator/src/services/validatorStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export type SignerRemote = {
pubkeyHex: PubkeyHex;
};

type BLSPubkeyMaybeHex = BLSPubkey | string;

/**
* Validator entity capable of producing signatures. Either:
* - local: With BLS secret key
Expand Down Expand Up @@ -213,7 +215,7 @@ export class ValidatorStore {
}

async signSyncCommitteeSignature(
pubkey: BLSPubkey,
pubkey: BLSPubkeyMaybeHex,
validatorIndex: ValidatorIndex,
slot: Slot,
beaconBlockRoot: Root
Expand All @@ -230,7 +232,7 @@ export class ValidatorStore {
}

async signContributionAndProof(
duty: Pick<routes.validator.SyncDuty, "pubkey" | "validatorIndex">,
duty: {pubkey: BLSPubkeyMaybeHex; validatorIndex: number},
selectionProof: BLSSignature,
contribution: altair.SyncCommitteeContribution
): Promise<altair.SignedContributionAndProof> {
Expand All @@ -249,15 +251,15 @@ export class ValidatorStore {
};
}

async signAttestationSelectionProof(pubkey: BLSPubkey, slot: Slot): Promise<BLSSignature> {
async signAttestationSelectionProof(pubkey: BLSPubkeyMaybeHex, slot: Slot): Promise<BLSSignature> {
const domain = this.config.getDomain(DOMAIN_SELECTION_PROOF, slot);
const signingRoot = computeSigningRoot(ssz.Slot, slot, domain);

return await this.getSignature(pubkey, signingRoot);
}

async signSyncCommitteeSelectionProof(
pubkey: BLSPubkey | string,
pubkey: BLSPubkeyMaybeHex,
slot: Slot,
subcommitteeIndex: number
): Promise<BLSSignature> {
Expand All @@ -273,7 +275,7 @@ export class ValidatorStore {
}

async signVoluntaryExit(
pubkey: PubkeyHex,
pubkey: BLSPubkeyMaybeHex,
validatorIndex: number,
exitEpoch: Epoch
): Promise<phase0.SignedVoluntaryExit> {
Expand All @@ -288,7 +290,7 @@ export class ValidatorStore {
};
}

private async getSignature(pubkey: BLSPubkey | string, signingRoot: Uint8Array): Promise<BLSSignature> {
private async getSignature(pubkey: BLSPubkeyMaybeHex, signingRoot: Uint8Array): Promise<BLSSignature> {
// TODO: Refactor indexing to not have to run toHexString() on the pubkey every time
const pubkeyHex = typeof pubkey === "string" ? pubkey : toHexString(pubkey);

Expand Down
Loading

0 comments on commit c4fff81

Please sign in to comment.