Skip to content

Commit

Permalink
feat: trigger block search for unknown block attestations (#5485)
Browse files Browse the repository at this point in the history
* feat: unknown block sync

* fix: only emit unknownBlock event once per slot

* feat: add elapsedTimeTillReceived metric and use debug log

* feat: prevent unbundling attack

* chore: log unknown block roots

* fix: merge issue

* chore: add peer return incorrect root block test case

* fix: trigger search for unknown root block in single place
  • Loading branch information
twoeths authored May 30, 2023
1 parent f408a30 commit 245e22c
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 96 deletions.
1 change: 1 addition & 0 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export function getBeaconPoolApi({
// see https://github.com/ChainSafe/lodestar/issues/5098
const {indexedAttestation, subnet, attDataRootHex} = await validateGossipFnRetryUnknownRoot(
validateFn,
network,
chain,
slot,
beaconBlockRoot
Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,11 @@ export function getValidatorApi({
// when a validator is configured with multiple beacon node urls, this beaconBlockRoot may come from another beacon node
// and it hasn't been in our forkchoice since we haven't seen / processing that block
// see https://github.com/ChainSafe/lodestar/issues/5063
if (!chain.forkChoice.getBlock(beaconBlockRoot)) {
if (!chain.forkChoice.hasBlock(beaconBlockRoot)) {
const rootHex = toHexString(beaconBlockRoot);
network.searchUnknownSlotRoot({slot, root: rootHex});
// if result of this call is false, i.e. block hasn't seen after 1 slot then the below notOnOptimisticBlockRoot call will throw error
await chain.waitForBlock(slot, toHexString(beaconBlockRoot));
await chain.waitForBlock(slot, rootHex);
}

// Check the execution status as validator shouldn't contribute on an optimistic head
Expand Down Expand Up @@ -562,6 +564,7 @@ export function getValidatorApi({
// see https://github.com/ChainSafe/lodestar/issues/5098
const {indexedAttestation, committeeIndices, attDataRootHex} = await validateGossipFnRetryUnknownRoot(
validateFn,
network,
chain,
slot,
beaconBlockRoot
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ export async function processBlocks(
// TODO: De-duplicate with logic above
// ChainEvent.errorBlock
if (!(err instanceof BlockError)) {
this.logger.error("Non BlockError received", {}, err);
this.logger.debug("Non BlockError received", {}, err);
} else if (!opts.disableOnBlockError) {
this.logger.error("Block error", {slot: err.signedBlock.message.slot}, err);
this.logger.debug("Block error", {slot: err.signedBlock.message.slot}, err);

if (err.type.code === BlockErrorCode.INVALID_SIGNATURE) {
const {signedBlock} = err;
Expand Down
10 changes: 8 additions & 2 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,10 @@ export function createLodestarMetrics(
},

syncUnknownBlock: {
requests: register.gauge({
requests: register.gauge<"type">({
name: "lodestar_sync_unknown_block_requests_total",
help: "Total number of unknownBlockParent events or requests",
help: "Total number of unknown block events or requests",
labelNames: ["type"],
}),
pendingBlocks: register.gauge({
name: "lodestar_sync_unknown_block_pending_blocks_size",
Expand Down Expand Up @@ -517,6 +518,11 @@ export function createLodestarMetrics(
name: "lodestar_sync_unknown_block_removed_blocks_total",
help: "Total number of removed bad blocks in UnknownBlockSync",
}),
elapsedTimeTillReceived: register.histogram({
name: "lodestar_sync_unknown_block_elapsed_time_till_received",
help: "Time elapsed between block slot time and the time block received via unknown block sync",
buckets: [0.5, 1, 2, 4, 6, 12],
}),
},

// Gossip sync committee
Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {EventEmitter} from "events";
import {PeerId} from "@libp2p/interface-peer-id";
import {TopicValidatorResult} from "@libp2p/interface-pubsub";
import {phase0} from "@lodestar/types";
import {phase0, RootHex} from "@lodestar/types";
import {BlockInput} from "../chain/blocks/types.js";
import {StrictEventEmitterSingleArg} from "../util/strictEvents.js";
import {PeerIdStr} from "../util/peerId.js";
Expand All @@ -17,6 +17,7 @@ export enum NetworkEvent {
reqRespRequest = "req-resp.request",
// TODO remove this event, this is not a network-level concern, rather a chain / sync concern
unknownBlockParent = "unknownBlockParent",
unknownBlock = "unknownBlock",

// Network processor events
/** (Network -> App) A gossip message is ready for validation */
Expand All @@ -29,7 +30,8 @@ export type NetworkEventData = {
[NetworkEvent.peerConnected]: {peer: PeerIdStr; status: phase0.Status};
[NetworkEvent.peerDisconnected]: {peer: PeerIdStr};
[NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId};
[NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: string};
[NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr};
[NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr};
[NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage;
[NetworkEvent.gossipMessageValidationResult]: {
msgId: string;
Expand All @@ -43,6 +45,7 @@ export const networkEventDirection: Record<NetworkEvent, EventDirection> = {
[NetworkEvent.peerDisconnected]: EventDirection.workerToMain,
[NetworkEvent.reqRespRequest]: EventDirection.none, // Only used internally in NetworkCore
[NetworkEvent.unknownBlockParent]: EventDirection.workerToMain,
[NetworkEvent.unknownBlock]: EventDirection.workerToMain,
[NetworkEvent.pendingGossipsubMessage]: EventDirection.workerToMain,
[NetworkEvent.gossipMessageValidationResult]: EventDirection.mainToWorker,
};
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Libp2p as ILibp2p} from "libp2p";
import {Connection} from "@libp2p/interface-connection";
import {Registrar} from "@libp2p/interface-registrar";
import {ConnectionManager} from "@libp2p/interface-connection-manager";
import {Slot, allForks, altair, capella, deneb, phase0} from "@lodestar/types";
import {Slot, SlotRootHex, allForks, altair, capella, deneb, phase0} from "@lodestar/types";
import {BlockInput} from "../chain/blocks/types.js";
import {PeerIdStr} from "../util/peerId.js";
import {INetworkEventBus} from "./events.js";
Expand All @@ -29,7 +29,7 @@ export interface INetwork extends INetworkCorePublic {
reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): void;
shouldAggregate(subnet: number, slot: Slot): boolean;
reStatusPeers(peers: PeerIdStr[]): Promise<void>;

searchUnknownSlotRoot(slotRoot: SlotRootHex, peer?: PeerIdStr): void;
// ReqResp
sendBeaconBlocksByRange(
peerId: PeerIdStr,
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {BeaconConfig} from "@lodestar/config";
import {sleep} from "@lodestar/utils";
import {LoggerNode} from "@lodestar/logger/node";
import {computeStartSlotAtEpoch, computeTimeAtSlot} from "@lodestar/state-transition";
import {phase0, allForks, deneb, altair, Root, capella} from "@lodestar/types";
import {phase0, allForks, deneb, altair, Root, capella, SlotRootHex} from "@lodestar/types";
import {routes} from "@lodestar/api";
import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/score";
import {ResponseIncoming} from "@lodestar/reqresp";
Expand Down Expand Up @@ -224,6 +224,10 @@ export class Network implements INetwork {
return this.core.reStatusPeers(peers);
}

searchUnknownSlotRoot(slotRoot: SlotRootHex, peer?: PeerIdStr): void {
this.networkProcessor.searchUnknownSlotRoot(slotRoot, peer);
}

async reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): Promise<void> {
return this.core.reportPeer(peer, action, actionName);
}
Expand Down
9 changes: 8 additions & 1 deletion packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {validateGossipBlobsSidecar} from "../../chain/validation/blobsSidecar.js
import {BlockInput, BlockSource, getBlockInput} from "../../chain/blocks/types.js";
import {sszDeserialize} from "../gossip/topic.js";
import {INetworkCore} from "../core/index.js";
import {INetwork} from "../interface.js";
import {AggregatorTracker} from "./aggregatorTracker.js";

/**
Expand Down Expand Up @@ -400,6 +401,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
*/
export async function validateGossipFnRetryUnknownRoot<T>(
fn: () => Promise<T>,
network: INetwork,
chain: IBeaconChain,
slot: Slot,
blockRoot: Root
Expand All @@ -414,8 +416,13 @@ export async function validateGossipFnRetryUnknownRoot<T>(
e instanceof AttestationError &&
e.type.code === AttestationErrorCode.UNKNOWN_OR_PREFINALIZED_BEACON_BLOCK_ROOT
) {
if (unknownBlockRootRetries++ < MAX_UNKNOWN_BLOCK_ROOT_RETRIES) {
if (unknownBlockRootRetries === 0) {
// Trigger unknown block root search here
const rootHex = toHexString(blockRoot);
network.searchUnknownSlotRoot({slot, root: rootHex});
}

if (unknownBlockRootRetries++ < MAX_UNKNOWN_BLOCK_ROOT_RETRIES) {
const foundBlock = await chain.waitForBlock(slot, toHexString(blockRoot));
// Returns true if the block was found on time. In that case, try to get it from the fork-choice again.
// Otherwise, throw the error below.
Expand Down
21 changes: 20 additions & 1 deletion packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import {Logger, MapDef, mapValues, sleep} from "@lodestar/utils";
import {RootHex, Slot} from "@lodestar/types";
import {RootHex, Slot, SlotRootHex} from "@lodestar/types";
import {routes} from "@lodestar/api";
import {pruneSetToMax} from "@lodestar/utils";
import {IBeaconChain} from "../../chain/interface.js";
import {GossipErrorCode} from "../../chain/errors/gossipValidation.js";
import {Metrics} from "../../metrics/metrics.js";
import {IBeaconDb} from "../../db/interface.js";
import {ClockEvent} from "../../util/clock.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {GossipHandlers, GossipType, GossipValidatorFn} from "../gossip/interface.js";
import {PeerIdStr} from "../peers/index.js";
import {createGossipQueues} from "./gossipQueues.js";
import {PendingGossipsubMessage} from "./types.js";
import {ValidatorFnsModules, GossipHandlerOpts, getGossipHandlers} from "./gossipHandlers.js";
Expand All @@ -31,6 +33,11 @@ export type NetworkProcessorOpts = GossipHandlerOpts & {
maxGossipTopicConcurrency?: number;
};

/**
* Keep up to 3 slot of unknown roots, so we don't always emit to UnknownBlock sync.
*/
const MAX_UNKNOWN_ROOTS_SLOT_CACHE_SIZE = 3;

/**
* This is respective to gossipsub seenTTL (which is 550 * 0.7 = 385s), also it's respective
* to beacon_attestation ATTESTATION_PROPAGATION_SLOT_RANGE (32 slots).
Expand Down Expand Up @@ -143,6 +150,7 @@ export class NetworkProcessor {
private readonly awaitingGossipsubMessagesByRootBySlot: MapDef<Slot, MapDef<RootHex, Set<PendingGossipsubMessage>>>;
private unknownBlockGossipsubMessagesCount = 0;
private isProcessingCurrentSlotBlock = false;
private unknownRootsBySlot = new MapDef<Slot, Set<RootHex>>(() => new Set());

constructor(modules: NetworkProcessorModules, private readonly opts: NetworkProcessorOpts) {
const {chain, events, logger, metrics} = modules;
Expand Down Expand Up @@ -201,6 +209,14 @@ export class NetworkProcessor {
return queue.getAll();
}

searchUnknownSlotRoot({slot, root}: SlotRootHex, peer?: PeerIdStr): void {
// Search for the unknown block
if (!this.unknownRootsBySlot.getOrDefault(slot).has(root)) {
this.unknownRootsBySlot.getOrDefault(slot).add(root);
this.events.emit(NetworkEvent.unknownBlock, {rootHex: root, peer});
}
}

private onPendingGossipsubMessage(message: PendingGossipsubMessage): void {
const topicType = message.topic.type;
const extractBlockSlotRootFn = this.extractBlockSlotRootFns[topicType];
Expand Down Expand Up @@ -229,6 +245,8 @@ export class NetworkProcessor {
}
message.msgSlot = slot;
if (root && !this.chain.forkChoice.hasBlockHex(root)) {
this.searchUnknownSlotRoot({slot, root}, message.propagationSource.toString());

if (this.unknownBlockGossipsubMessagesCount > MAX_QUEUED_UNKNOWN_BLOCK_GOSSIP_OBJECTS) {
// TODO: Should report the dropped job to gossip? It will be eventually pruned from the mcache
this.metrics?.reprocessGossipAttestations.reject.inc({reason: ReprocessRejectReason.reached_limit});
Expand Down Expand Up @@ -310,6 +328,7 @@ export class NetworkProcessor {
this.awaitingGossipsubMessagesByRootBySlot.delete(slot);
}
}
pruneSetToMax(this.unknownRootsBySlot, MAX_UNKNOWN_ROOTS_SLOT_CACHE_SIZE);
this.unknownBlockGossipsubMessagesCount = 0;
}

Expand Down
35 changes: 31 additions & 4 deletions packages/beacon-node/src/sync/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,43 @@ export interface SyncModules {
wsCheckpoint?: phase0.Checkpoint;
}

/**
* onUnknownBlock: store 1 record with undefined parentBlockRootHex & blockInput, blockRootHex as key, status pending
* onUnknownBlockParent:
* - store 1 record with known parentBlockRootHex & blockInput, blockRootHex as key, status downloaded
* - store 1 record with undefined parentBlockRootHex & blockInput, parentBlockRootHex as key, status pending
*/
export type PendingBlock = {
blockRootHex: RootHex;
parentBlockRootHex: RootHex;
blockInput: BlockInput;
peerIdStrs: Set<string>;
status: PendingBlockStatus;
downloadAttempts: number;
};
} & (
| {
status: PendingBlockStatus.pending | PendingBlockStatus.fetching;
parentBlockRootHex: null;
blockInput: null;
}
| {
status: PendingBlockStatus.downloaded | PendingBlockStatus.processing;
parentBlockRootHex: RootHex;
blockInput: BlockInput;
}
);

export enum PendingBlockStatus {
pending = "pending",
fetching = "fetching",
downloaded = "downloaded",
processing = "processing",
}

export enum PendingBlockType {
/**
* We got a block root (from a gossip attestation, for exxample) but we don't have the block in forkchoice.
*/
UNKNOWN_BLOCK = "unknown_block",
/**
* During gossip time, we may get a block but the parent root is unknown (not in forkchoice).
*/
UNKNOWN_PARENT = "unknown_parent",
}
Loading

0 comments on commit 245e22c

Please sign in to comment.