diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index ce7c17fb3ef..9dc10da7611 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -91,6 +91,7 @@ export type EnvVar = | 'P2P_TCP_LISTEN_ADDR' | 'P2P_TCP_ANNOUNCE_ADDR' | 'P2P_TX_POOL_KEEP_PROVEN_FOR' + | 'P2P_ATTESTATION_POOL_KEEP_FOR' | 'P2P_TX_PROTOCOL' | 'P2P_UDP_ANNOUNCE_ADDR' | 'P2P_UDP_LISTEN_ADDR' diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index b81929903a2..b277a21509a 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -58,6 +58,7 @@ describe('In-Memory P2P Client', () => { addAttestations: jest.fn(), deleteAttestations: jest.fn(), deleteAttestationsForSlot: jest.fn(), + deleteAttestationsOlderThan: jest.fn(), getAttestationsForSlot: jest.fn().mockReturnValue(undefined), }; diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index dfc74254bd9..9fcb8dd1f69 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -202,6 +202,9 @@ export class P2PClient extends WithTracer implements P2P { private attestationPool: AttestationPool; private epochProofQuotePool: EpochProofQuotePool; + /** How many slots to keep attestations for. */ + private keepAttestationsInPoolFor: number; + private blockStream; /** @@ -224,7 +227,10 @@ export class P2PClient extends WithTracer implements P2P { ) { super(telemetry, 'P2PClient'); - const { blockCheckIntervalMS, blockRequestBatchSize } = getP2PConfigFromEnv(); + const { blockCheckIntervalMS, blockRequestBatchSize, keepAttestationsInPoolFor } = + getP2PConfigFromEnv(); + + this.keepAttestationsInPoolFor = keepAttestationsInPoolFor; this.blockStream = new L2BlockStream(l2BlockSource, this, this, { batchSize: blockRequestBatchSize, @@ -615,6 +621,7 @@ export class P2PClient extends WithTracer implements P2P { const firstBlockNum = blocks[0].number; const lastBlockNum = blocks[blocks.length - 1].number; + const lastBlockSlot = blocks[blocks.length - 1].header.globalVariables.slotNumber.toBigInt(); if (this.keepProvenTxsFor === 0) { await this.deleteTxsFromBlocks(blocks); @@ -626,12 +633,19 @@ export class P2PClient extends WithTracer implements P2P { await this.deleteTxsFromBlocks(blocksToDeleteTxsFrom); } + // We delete attestations older than the last block slot minus the number of slots we want to keep in the pool. + if (lastBlockSlot - BigInt(this.keepAttestationsInPoolFor) >= BigInt(INITIAL_L2_BLOCK_NUM)) { + await this.attestationPool.deleteAttestationsOlderThan(lastBlockSlot - BigInt(this.keepAttestationsInPoolFor)); + } + await this.synchedProvenBlockNumber.set(lastBlockNum); this.log.debug(`Synched to proven block ${lastBlockNum}`); const provenEpochNumber = await this.l2BlockSource.getProvenL2EpochNumber(); if (provenEpochNumber !== undefined) { this.epochProofQuotePool.deleteQuotesToEpoch(BigInt(provenEpochNumber)); } + + await this.startServiceIfSynched(); } diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index 7cff1711b48..b16f0f46543 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -91,6 +91,10 @@ export interface P2PConfig extends P2PReqRespConfig { /** How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven) */ keepProvenTxsInPoolFor: number; + /** How many slots to keep attestations for. */ + keepAttestationsInPoolFor: number; + + /** * The interval of the gossipsub heartbeat to perform maintenance tasks. */ @@ -229,6 +233,11 @@ export const p2pConfigMappings: ConfigMappingsType = { 'How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven)', ...numberConfigHelper(0), }, + keepAttestationsInPoolFor: { + env: 'P2P_ATTESTATION_POOL_KEEP_FOR', + description: 'How many slots to keep attestations for.', + ...numberConfigHelper(96), + }, gossipsubInterval: { env: 'P2P_GOSSIPSUB_INTERVAL_MS', description: 'The interval of the gossipsub heartbeat to perform maintenance tasks.', diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts index cdfe8729911..5c57e85b87b 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts @@ -21,6 +21,16 @@ export interface AttestationPool { */ deleteAttestations(attestations: BlockAttestation[]): Promise; + /** + * Delete Attestations with a slot number smaller than the given slot + * + * Removes all attestations associated with a slot + * + * @param slot - The oldest slot to keep. + */ + deleteAttestationsOlderThan(slot: bigint): Promise; + + /** * Delete Attestations for slot * diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts index b8bb71f30ce..0840f5e7d94 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts @@ -5,6 +5,7 @@ import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { type MockProxy, mock } from 'jest-mock-extended'; +import { jest } from '@jest/globals'; import { type PoolInstrumentation } from '../instrumentation.js'; import { InMemoryAttestationPool } from './memory_attestation_pool.js'; import { mockAttestation } from './mocks.js'; @@ -30,6 +31,11 @@ describe('MemoryAttestationPool', () => { (ap as any).metrics = metricsMock; }); + const createAttestationsForSlot = (slotNumber: number) => { + const archive = Fr.random(); + return signers.map(signer => mockAttestation(signer, slotNumber, archive)); + }; + it('should add attestations to pool', async () => { const slotNumber = 420; const archive = Fr.random(); @@ -171,4 +177,29 @@ describe('MemoryAttestationPool', () => { const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); expect(retreivedAttestationsAfterDelete.length).toBe(0); }); + + it('Should delete attestations older than a given slot', async () => { + const slotNumbers = [1, 2, 3, 69, 72, 74, 88, 420]; + const attestations = slotNumbers.map(slotNumber => createAttestationsForSlot(slotNumber)).flat(); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + const attestationsForSlot1 = await ap.getAttestationsForSlot(BigInt(1), proposalId); + expect(attestationsForSlot1.length).toBe(signers.length); + + const deleteAttestationsSpy = jest.spyOn(ap, 'deleteAttestationsForSlot'); + + await ap.deleteAttestationsOlderThan(BigInt(73)); + + const attestationsForSlot1AfterDelete = await ap.getAttestationsForSlot(BigInt(1), proposalId); + expect(attestationsForSlot1AfterDelete.length).toBe(0); + + expect(deleteAttestationsSpy).toHaveBeenCalledTimes(5); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(1)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(2)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(3)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(69)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(72)); + }); }); diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts index 9364130c4f8..fa738340c9a 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts @@ -58,6 +58,27 @@ export class InMemoryAttestationPool implements AttestationPool { return total; } + public deleteAttestationsOlderThan(oldestSlot: bigint): Promise { + const olderThan = []; + + // Entries are iterated in insertion order, so we can break as soon as we find a slot that is older than the oldestSlot. + // Note: this will only prune correctly if attestations are added in order of rising slot, it is important that we do not allow + // insertion of attestations that are old. #(https://github.com/AztecProtocol/aztec-packages/issues/10322) + const slots = this.attestations.keys(); + for (const slot of slots) { + if (slot < oldestSlot) { + olderThan.push(slot); + } else { + break; + } + } + + for (const oldSlot of olderThan) { + this.deleteAttestationsForSlot(oldSlot); + } + return Promise.resolve(); + } + public deleteAttestationsForSlot(slot: bigint): Promise { // We count the number of attestations we are removing const numberOfAttestations = this.#getNumberOfAttestationsInSlot(slot); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts index 3e28c031a0d..57445e9d396 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts @@ -64,6 +64,7 @@ const makeMockPools = () => { addAttestations: jest.fn(), deleteAttestations: jest.fn(), deleteAttestationsForSlot: jest.fn(), + deleteAttestationsOlderThan: jest.fn(), getAttestationsForSlot: jest.fn().mockReturnValue(undefined), }, epochProofQuotePool: {