Skip to content

Commit

Permalink
chore(prover): prover node should not gossip attestations (#10672)
Browse files Browse the repository at this point in the history
fixes: #10668
  • Loading branch information
Maddiaa0 authored Dec 13, 2024
1 parent 3d3fabb commit 41fc0f0
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 68 deletions.
10 changes: 9 additions & 1 deletion yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
MerkleTreeId,
NullifierMembershipWitness,
type NullifierWithBlockSource,
P2PClientType,
type ProcessedTx,
type ProverConfig,
PublicDataWitness,
Expand Down Expand Up @@ -167,7 +168,14 @@ export class AztecNodeService implements AztecNode, Traceable {
}

// create the tx pool and the p2p client, which will need the l2 block source
const p2pClient = await createP2PClient(config, archiver, proofVerifier, worldStateSynchronizer, telemetry);
const p2pClient = await createP2PClient(
P2PClientType.Full,
config,
archiver,
proofVerifier,
worldStateSynchronizer,
telemetry,
);

// start both and wait for them to sync from the block source
await Promise.all([p2pClient.start(), worldStateSynchronizer.start()]);
Expand Down
27 changes: 17 additions & 10 deletions yarn-project/circuit-types/src/interfaces/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { type ApiSchemaFor, optional, schemas } from '@aztec/foundation/schemas'
import { z } from 'zod';

import { BlockAttestation } from '../p2p/block_attestation.js';
import { type P2PClientType } from '../p2p/client_type.js';
import { EpochProofQuote } from '../prover_coordination/epoch_proof_quote.js';
import { Tx } from '../tx/tx.js';

Expand All @@ -24,16 +25,7 @@ const PeerInfoSchema = z.discriminatedUnion('status', [
]);

/** Exposed API to the P2P module. */
export interface P2PApi {
/**
* Queries the Attestation pool for attestations for the given slot
*
* @param slot - the slot to query
* @param proposalId - the proposal id to query, or undefined to query all proposals for the slot
* @returns BlockAttestations
*/
getAttestationsForSlot(slot: bigint, proposalId?: string): Promise<BlockAttestation[]>;

export interface P2PApiWithoutAttestations {
/**
* Queries the EpochProofQuote pool for quotes for the given epoch
*
Expand All @@ -59,6 +51,21 @@ export interface P2PApi {
getPeers(includePending?: boolean): Promise<PeerInfo[]>;
}

export interface P2PClient extends P2PApiWithoutAttestations {
/**
* Queries the Attestation pool for attestations for the given slot
*
* @param slot - the slot to query
* @param proposalId - the proposal id to query, or undefined to query all proposals for the slot
* @returns BlockAttestations
*/
getAttestationsForSlot(slot: bigint, proposalId?: string): Promise<BlockAttestation[]>;
}

export type P2PApi<T extends P2PClientType = P2PClientType.Full> = T extends P2PClientType.Full
? P2PClient & P2PApiWithoutAttestations
: P2PApiWithoutAttestations;

export const P2PApiSchema: ApiSchemaFor<P2PApi> = {
getAttestationsForSlot: z
.function()
Expand Down
6 changes: 6 additions & 0 deletions yarn-project/circuit-types/src/p2p/client_type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export enum P2PClientType {
// Full p2p clients will subscribe to all gossip topics
Full,
// Prove p2p clients will only subscribe to transaction and proving topics
Prover,
}
1 change: 1 addition & 0 deletions yarn-project/circuit-types/src/p2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './gossipable.js';
export * from './interface.js';
export * from './signature_utils.js';
export * from './topic_type.js';
export * from './client_type.js';
9 changes: 9 additions & 0 deletions yarn-project/circuit-types/src/p2p/topic_type.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { P2PClientType } from './client_type.js';

/** Create Topic String
*
* The topic channel identifier
Expand All @@ -18,6 +20,13 @@ export enum TopicType {
epoch_proof_quote = 'epoch_proof_quote',
}

export function getTopicTypeForClientType(clientType: P2PClientType) {
if (clientType === P2PClientType.Full) {
return Object.values(TopicType);
}
return [TopicType.tx, TopicType.epoch_proof_quote];
}

/**
* Convert the topic string into a set of labels
*
Expand Down
46 changes: 34 additions & 12 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import type { ClientProtocolCircuitVerifier, L2BlockSource, WorldStateSynchronizer } from '@aztec/circuit-types';
import {
type ClientProtocolCircuitVerifier,
type L2BlockSource,
P2PClientType,
type WorldStateSynchronizer,
} from '@aztec/circuit-types';
import { createLogger } from '@aztec/foundation/log';
import { type AztecKVStore } from '@aztec/kv-store';
import { type DataStoreConfig } from '@aztec/kv-store/config';
Expand All @@ -21,27 +26,35 @@ import { configureP2PClientAddresses, createLibP2PPeerIdFromPrivateKey, getPeerI

export * from './p2p_client.js';

export const createP2PClient = async (
type P2PClientDeps<T extends P2PClientType> = {
txPool?: TxPool;
store?: AztecKVStore;
attestationPool?: T extends P2PClientType.Full ? AttestationPool : undefined;
epochProofQuotePool?: EpochProofQuotePool;
};

export const createP2PClient = async <T extends P2PClientType>(
clientType: T,
_config: P2PConfig & DataStoreConfig,
l2BlockSource: L2BlockSource,
proofVerifier: ClientProtocolCircuitVerifier,
worldStateSynchronizer: WorldStateSynchronizer,
telemetry: TelemetryClient = new NoopTelemetryClient(),
deps: {
txPool?: TxPool;
store?: AztecKVStore;
attestationPool?: AttestationPool;
epochProofQuotePool?: EpochProofQuotePool;
} = {},
deps: P2PClientDeps<T> = {},
) => {
let config = { ..._config };
const logger = createLogger('p2p');
const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb')));

const mempools: MemPools = {
const mempools: MemPools<T> = {
txPool: deps.txPool ?? new AztecKVTxPool(store, telemetry),
attestationPool: deps.attestationPool ?? new InMemoryAttestationPool(telemetry),
epochProofQuotePool: deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry),
attestationPool:
clientType === P2PClientType.Full
? ((deps.attestationPool ?? new InMemoryAttestationPool(telemetry)) as T extends P2PClientType.Full
? AttestationPool
: undefined)
: undefined,
};

let p2pService;
Expand All @@ -55,7 +68,8 @@ export const createP2PClient = async (
const peerId = await createLibP2PPeerIdFromPrivateKey(peerIdPrivateKey);
const discoveryService = new DiscV5Service(peerId, config, telemetry);

p2pService = await LibP2PService.new(
p2pService = await LibP2PService.new<T>(
clientType,
config,
discoveryService,
peerId,
Expand All @@ -70,5 +84,13 @@ export const createP2PClient = async (
logger.verbose('P2P is disabled. Using dummy P2P service');
p2pService = new DummyP2PService();
}
return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor, telemetry);
return new P2PClient(
clientType,
store,
l2BlockSource,
mempools,
p2pService,
config.keepProvenTxsInPoolFor,
telemetry,
);
};
16 changes: 8 additions & 8 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { MockL2BlockSource } from '@aztec/archiver/test';
import { L2Block, mockEpochProofQuote, mockTx } from '@aztec/circuit-types';
import { L2Block, P2PClientType, mockEpochProofQuote, mockTx } from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
import { retryUntil } from '@aztec/foundation/retry';
import { sleep } from '@aztec/foundation/sleep';
Expand Down Expand Up @@ -49,7 +49,7 @@ describe('In-Memory P2P Client', () => {
};

kvStore = openTmpStore();
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, 0);
});

const advanceToProvenBlock = async (getProvenBlockNumber: number, provenEpochNumber = getProvenBlockNumber) => {
Expand Down Expand Up @@ -119,7 +119,7 @@ describe('In-Memory P2P Client', () => {
await client.start();
await client.stop();

const client2 = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
const client2 = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, 0);
expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum());
});

Expand All @@ -134,7 +134,7 @@ describe('In-Memory P2P Client', () => {
});

it('deletes txs after waiting the set number of blocks', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10);
client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, 10);
blockSource.setProvenBlockNumber(0);
await client.start();
expect(txPool.deleteTxs).not.toHaveBeenCalled();
Expand All @@ -151,7 +151,7 @@ describe('In-Memory P2P Client', () => {
});

it('stores and returns epoch proof quotes', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, 0);

blockSource.setProvenEpochNumber(2);
await client.start();
Expand Down Expand Up @@ -182,7 +182,7 @@ describe('In-Memory P2P Client', () => {
});

it('deletes expired proof quotes', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, 0);

blockSource.setProvenEpochNumber(1);
blockSource.setProvenBlockNumber(1);
Expand Down Expand Up @@ -245,7 +245,7 @@ describe('In-Memory P2P Client', () => {
});

it('deletes txs created from a pruned block', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10);
client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, 10);
blockSource.setProvenBlockNumber(0);
await client.start();

Expand All @@ -267,7 +267,7 @@ describe('In-Memory P2P Client', () => {
});

it('moves mined and valid txs back to the pending set', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10);
client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, 10);
blockSource.setProvenBlockNumber(0);
await client.start();

Expand Down
21 changes: 13 additions & 8 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type L2BlockStreamEvent,
type L2Tips,
type P2PApi,
type P2PClientType,
type PeerInfo,
type Tx,
type TxHash,
Expand Down Expand Up @@ -61,7 +62,7 @@ export interface P2PSyncState {
/**
* Interface of a P2P client.
**/
export interface P2P extends P2PApi {
export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApi<T> & {
/**
* Broadcasts a block proposal to other peers.
*
Expand Down Expand Up @@ -171,12 +172,15 @@ export interface P2P extends P2PApi {

/** Identifies a p2p client. */
isP2PClient(): true;
}
};

/**
* The P2P client implementation.
*/
export class P2PClient extends WithTracer implements P2P {
export class P2PClient<T extends P2PClientType = P2PClientType.Full>
extends WithTracer
implements P2P, P2P<P2PClientType.Prover>
{
/** Property that indicates whether the client is running. */
private stopping = false;

Expand All @@ -194,7 +198,7 @@ export class P2PClient extends WithTracer implements P2P {
private synchedProvenBlockNumber: AztecSingleton<number>;

private txPool: TxPool;
private attestationPool: AttestationPool;
private attestationPool: T extends P2PClientType.Full ? AttestationPool : undefined;
private epochProofQuotePool: EpochProofQuotePool;

/** How many slots to keep attestations for. */
Expand All @@ -212,9 +216,10 @@ export class P2PClient extends WithTracer implements P2P {
* @param log - A logger.
*/
constructor(
clientType: T,
store: AztecKVStore,
private l2BlockSource: L2BlockSource,
mempools: MemPools,
mempools: MemPools<T>,
private p2pService: P2PService,
private keepProvenTxsFor: number,
telemetry: TelemetryClient = new NoopTelemetryClient(),
Expand All @@ -238,8 +243,8 @@ export class P2PClient extends WithTracer implements P2P {
this.synchedProvenBlockNumber = store.openSingleton('p2p_pool_last_proven_l2_block');

this.txPool = mempools.txPool;
this.attestationPool = mempools.attestationPool;
this.epochProofQuotePool = mempools.epochProofQuotePool;
this.attestationPool = mempools.attestationPool!;
}

public isP2PClient(): true {
Expand Down Expand Up @@ -406,7 +411,7 @@ export class P2PClient extends WithTracer implements P2P {
}

public getAttestationsForSlot(slot: bigint, proposalId: string): Promise<BlockAttestation[]> {
return Promise.resolve(this.attestationPool.getAttestationsForSlot(slot, proposalId));
return Promise.resolve(this.attestationPool?.getAttestationsForSlot(slot, proposalId) ?? []);
}

// REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963
Expand Down Expand Up @@ -651,7 +656,7 @@ export class P2PClient extends WithTracer implements P2P {
// We delete attestations older than the last block slot minus the number of slots we want to keep in the pool.
const lastBlockSlotMinusKeepAttestationsInPoolFor = lastBlockSlot - BigInt(this.keepAttestationsInPoolFor);
if (lastBlockSlotMinusKeepAttestationsInPoolFor >= BigInt(INITIAL_L2_BLOCK_NUM)) {
await this.attestationPool.deleteAttestationsOlderThan(lastBlockSlotMinusKeepAttestationsInPoolFor);
await this.attestationPool?.deleteAttestationsOlderThan(lastBlockSlotMinusKeepAttestationsInPoolFor);
}

await this.synchedProvenBlockNumber.set(lastBlockNum);
Expand Down
8 changes: 5 additions & 3 deletions yarn-project/p2p/src/mem_pools/interface.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { type P2PClientType } from '@aztec/circuit-types';

import { type AttestationPool } from './attestation_pool/attestation_pool.js';
import { type EpochProofQuotePool } from './epoch_proof_quote_pool/epoch_proof_quote_pool.js';
import { type TxPool } from './tx_pool/tx_pool.js';

/**
* A interface the combines all mempools
*/
export interface MemPools {
export type MemPools<T extends P2PClientType = P2PClientType.Full> = {
txPool: TxPool;
attestationPool: AttestationPool;
attestationPool?: T extends P2PClientType.Full ? AttestationPool : undefined;
epochProofQuotePool: EpochProofQuotePool;
}
};
9 changes: 6 additions & 3 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
type ClientProtocolCircuitVerifier,
type L2BlockSource,
type P2PClientType,
type Tx,
type WorldStateSynchronizer,
} from '@aztec/circuit-types';
Expand Down Expand Up @@ -95,11 +96,12 @@ export async function createLibp2pNode(
*
*
*/
export async function createTestLibP2PService(
export async function createTestLibP2PService<T extends P2PClientType>(
clientType: T,
boostrapAddrs: string[] = [],
l2BlockSource: L2BlockSource,
worldStateSynchronizer: WorldStateSynchronizer,
mempools: MemPools,
mempools: MemPools<T>,
telemetry: TelemetryClient,
port: number = 0,
peerId?: PeerId,
Expand All @@ -123,7 +125,8 @@ export async function createTestLibP2PService(
// No bootstrap nodes provided as the libp2p service will register them in the constructor
const p2pNode = await createLibp2pNode([], peerId, port, /*enable gossip */ true, /**start */ false);

return new LibP2PService(
return new LibP2PService<T>(
clientType,
config,
p2pNode as PubSubLibp2p,
discoveryService,
Expand Down
Loading

0 comments on commit 41fc0f0

Please sign in to comment.