diff --git a/yarn-project/kv-store/src/interfaces/map.ts b/yarn-project/kv-store/src/interfaces/map.ts index 6ded76080db..f63505dae9f 100644 --- a/yarn-project/kv-store/src/interfaces/map.ts +++ b/yarn-project/kv-store/src/interfaces/map.ts @@ -62,6 +62,19 @@ export interface AztecMap extends AztecBaseMap { * @param range - The range of keys to iterate over */ keys(range?: Range): IterableIterator; + + /** + * Clears the map. + */ + clear(): Promise; +} + +export interface AztecMapWithSize extends AztecMap { + /** + * Gets the size of the map. + * @returns The size of the map + */ + size(): number; } /** @@ -82,6 +95,14 @@ export interface AztecMultiMap extends AztecMap { deleteValue(key: K, val: V): Promise; } +export interface AztecMultiMapWithSize extends AztecMultiMap { + /** + * Gets the size of the map. + * @returns The size of the map + */ + size(): number; +} + /** * A map backed by a persistent store. */ diff --git a/yarn-project/kv-store/src/interfaces/store.ts b/yarn-project/kv-store/src/interfaces/store.ts index 81c4d956bed..bee1e2e0e8a 100644 --- a/yarn-project/kv-store/src/interfaces/store.ts +++ b/yarn-project/kv-store/src/interfaces/store.ts @@ -1,7 +1,14 @@ import { type AztecArray, type AztecAsyncArray } from './array.js'; import { type Key } from './common.js'; import { type AztecAsyncCounter, type AztecCounter } from './counter.js'; -import { type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from './map.js'; +import { + type AztecAsyncMap, + type AztecAsyncMultiMap, + type AztecMap, + type AztecMapWithSize, + type AztecMultiMap, + type AztecMultiMapWithSize, +} from './map.js'; import { type AztecAsyncSet, type AztecSet } from './set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from './singleton.js'; @@ -29,6 +36,20 @@ export interface AztecKVStore { */ openMultiMap(name: string): AztecMultiMap; + /** + * Creates a new multi-map with size. + * @param name - The name of the multi-map + * @returns The multi-map + */ + openMultiMapWithSize(name: string): AztecMultiMapWithSize; + + /** + * Creates a new map with size. + * @param name - The name of the map + * @returns The map + */ + openMapWithSize(name: string): AztecMapWithSize; + /** * Creates a new array. * @param name - The name of the array diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index 224750df8e9..2f78d4aca63 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -1,3 +1,6 @@ +import { expect } from 'chai'; + +import { type AztecMapWithSize, type AztecMultiMapWithSize } from '../interfaces/map.js'; import { describeAztecMap } from '../interfaces/map_test_suite.js'; import { openTmpStore } from './index.js'; @@ -6,3 +9,53 @@ describe('LMDBMap', () => { describeAztecMap('Async AztecMap', () => Promise.resolve(openTmpStore(true)), true); }); + +describe('AztecMultiMapWithSize', () => { + let map: AztecMultiMapWithSize; + let map2: AztecMultiMapWithSize; + + beforeEach(() => { + const store = openTmpStore(true); + map = store.openMultiMapWithSize('test'); + map2 = store.openMultiMapWithSize('test2'); + }); + + it('should be able to delete values', async () => { + await map.set('foo', 'bar'); + await map.set('foo', 'baz'); + + await map2.set('foo', 'bar'); + await map2.set('foo', 'baz'); + + expect(map.size()).to.equal(2); + expect(map2.size()).to.equal(2); + + await map.deleteValue('foo', 'bar'); + + expect(map.size()).to.equal(1); + expect(map.get('foo')).to.equal('baz'); + + expect(map2.size()).to.equal(2); + }); +}); + +describe('AztecMapWithSize', () => { + let map: AztecMapWithSize; + + beforeEach(() => { + const store = openTmpStore(true); + map = store.openMapWithSize('test'); + }); + + it('should be able to delete values', async () => { + await map.set('foo', 'bar'); + await map.set('fizz', 'buzz'); + + expect(map.size()).to.equal(2); + + await map.delete('foo'); + + expect(map.size()).to.equal(1); + expect(map.get('fizz')).to.equal('buzz'); + }); +}); diff --git a/yarn-project/kv-store/src/lmdb/map.ts b/yarn-project/kv-store/src/lmdb/map.ts index 38d87cf9c6b..4458c3c3539 100644 --- a/yarn-project/kv-store/src/lmdb/map.ts +++ b/yarn-project/kv-store/src/lmdb/map.ts @@ -1,7 +1,7 @@ import { type Database, type RangeOptions } from 'lmdb'; import { type Key, type Range } from '../interfaces/common.js'; -import { type AztecAsyncMultiMap, type AztecMultiMap } from '../interfaces/map.js'; +import { type AztecAsyncMultiMap, type AztecMapWithSize, type AztecMultiMap } from '../interfaces/map.js'; /** The slot where a key-value entry would be stored */ type MapValueSlot = ['map', string, 'slot', K]; @@ -13,8 +13,8 @@ export class LmdbAztecMap implements AztecMultiMap, Azte protected db: Database<[K, V], MapValueSlot>; protected name: string; - #startSentinel: MapValueSlot; - #endSentinel: MapValueSlot; + protected startSentinel: MapValueSlot; + protected endSentinel: MapValueSlot; constructor(rootDb: Database, mapName: string) { this.name = mapName; @@ -23,8 +23,8 @@ export class LmdbAztecMap implements AztecMultiMap, Azte // sentinels are used to define the start and end of the map // with LMDB's key encoding, no _primitive value_ can be "less than" an empty buffer or greater than Byte 255 // these will be used later to answer range queries - this.#startSentinel = ['map', this.name, 'slot', Buffer.from([])]; - this.#endSentinel = ['map', this.name, 'slot', Buffer.from([255])]; + this.startSentinel = ['map', this.name, 'slot', Buffer.from([])]; + this.endSentinel = ['map', this.name, 'slot', Buffer.from([255])]; } close(): Promise { @@ -32,7 +32,7 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } get(key: K): V | undefined { - return this.db.get(this.#slot(key))?.[1]; + return this.db.get(this.slot(key))?.[1]; } getAsync(key: K): Promise { @@ -40,7 +40,7 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } *getValues(key: K): IterableIterator { - const values = this.db.getValues(this.#slot(key)); + const values = this.db.getValues(this.slot(key)); for (const value of values) { yield value?.[1]; } @@ -53,7 +53,7 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } has(key: K): boolean { - return this.db.doesExist(this.#slot(key)); + return this.db.doesExist(this.slot(key)); } hasAsync(key: K): Promise { @@ -61,30 +61,30 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } async set(key: K, val: V): Promise { - await this.db.put(this.#slot(key), [key, val]); + await this.db.put(this.slot(key), [key, val]); } swap(key: K, fn: (val: V | undefined) => V): Promise { return this.db.childTransaction(() => { - const slot = this.#slot(key); + const slot = this.slot(key); const entry = this.db.get(slot); void this.db.put(slot, [key, fn(entry?.[1])]); }); } setIfNotExists(key: K, val: V): Promise { - const slot = this.#slot(key); + const slot = this.slot(key); return this.db.ifNoExists(slot, () => { void this.db.put(slot, [key, val]); }); } async delete(key: K): Promise { - await this.db.remove(this.#slot(key)); + await this.db.remove(this.slot(key)); } async deleteValue(key: K, val: V): Promise { - await this.db.remove(this.#slot(key), [key, val]); + await this.db.remove(this.slot(key), [key, val]); } *entries(range: Range = {}): IterableIterator<[K, V]> { @@ -93,19 +93,19 @@ export class LmdbAztecMap implements AztecMultiMap, Azte // in that case, we need to swap the start and end sentinels const start = reverse ? range.end - ? this.#slot(range.end) - : this.#endSentinel + ? this.slot(range.end) + : this.endSentinel : range.start - ? this.#slot(range.start) - : this.#startSentinel; + ? this.slot(range.start) + : this.startSentinel; const end = reverse ? range.start - ? this.#slot(range.start) - : this.#startSentinel + ? this.slot(range.start) + : this.startSentinel : range.end - ? this.#slot(range.end) - : this.#endSentinel; + ? this.slot(range.end) + : this.endSentinel; const lmdbRange: RangeOptions = { start, @@ -153,7 +153,82 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } } - #slot(key: K): MapValueSlot { + protected slot(key: K): MapValueSlot { return ['map', this.name, 'slot', key]; } + + async clear(): Promise { + const lmdbRange: RangeOptions = { + start: this.startSentinel, + end: this.endSentinel, + }; + + const iterator = this.db.getRange(lmdbRange); + + for (const { key } of iterator) { + await this.db.remove(key); + } + } +} + +export class LmdbAztecMapWithSize + extends LmdbAztecMap + implements AztecMapWithSize, AztecAsyncMultiMap +{ + #sizeCache?: number; + + constructor(rootDb: Database, mapName: string) { + super(rootDb, mapName); + } + + override async set(key: K, val: V): Promise { + await this.db.childTransaction(() => { + const exists = this.db.doesExist(this.slot(key)); + this.db.putSync(this.slot(key), [key, val], { + appendDup: true, + }); + if (!exists) { + this.#sizeCache = undefined; // Invalidate cache + } + }); + } + + override async delete(key: K): Promise { + await this.db.childTransaction(async () => { + const exists = this.db.doesExist(this.slot(key)); + if (exists) { + await this.db.remove(this.slot(key)); + this.#sizeCache = undefined; // Invalidate cache + } + }); + } + + override async deleteValue(key: K, val: V): Promise { + await this.db.childTransaction(async () => { + const exists = this.db.doesExist(this.slot(key)); + if (exists) { + await this.db.remove(this.slot(key), [key, val]); + this.#sizeCache = undefined; // Invalidate cache + } + }); + } + + /** + * Gets the size of the map by counting entries. + * @returns The number of entries in the map + */ + size(): number { + if (this.#sizeCache === undefined) { + this.#sizeCache = this.db.getCount({ + start: this.startSentinel, + end: this.endSentinel, + }); + } + return this.#sizeCache; + } + + // Reset cache on clear/drop operations + clearCache() { + this.#sizeCache = undefined; + } } diff --git a/yarn-project/kv-store/src/lmdb/store.ts b/yarn-project/kv-store/src/lmdb/store.ts index f0f453a98ad..d78030ec373 100644 --- a/yarn-project/kv-store/src/lmdb/store.ts +++ b/yarn-project/kv-store/src/lmdb/store.ts @@ -9,13 +9,20 @@ import { join } from 'path'; import { type AztecArray, type AztecAsyncArray } from '../interfaces/array.js'; import { type Key } from '../interfaces/common.js'; import { type AztecAsyncCounter, type AztecCounter } from '../interfaces/counter.js'; -import { type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from '../interfaces/map.js'; +import { + type AztecAsyncMap, + type AztecAsyncMultiMap, + type AztecMap, + type AztecMapWithSize, + type AztecMultiMap, + type AztecMultiMapWithSize, +} from '../interfaces/map.js'; import { type AztecAsyncSet, type AztecSet } from '../interfaces/set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from '../interfaces/singleton.js'; import { type AztecAsyncKVStore, type AztecKVStore } from '../interfaces/store.js'; import { LmdbAztecArray } from './array.js'; import { LmdbAztecCounter } from './counter.js'; -import { LmdbAztecMap } from './map.js'; +import { LmdbAztecMap, LmdbAztecMapWithSize } from './map.js'; import { LmdbAztecSet } from './set.js'; import { LmdbAztecSingleton } from './singleton.js'; @@ -118,6 +125,23 @@ export class AztecLmdbStore implements AztecKVStore, AztecAsyncKVStore { openCounter(name: string): AztecCounter & AztecAsyncCounter { return new LmdbAztecCounter(this.#data, name); } + /** + * Creates a new AztecMultiMapWithSize in the store. A multi-map with size stores multiple values for a single key automatically. + * @param name - Name of the map + * @returns A new AztecMultiMapWithSize + */ + openMultiMapWithSize(name: string): AztecMultiMapWithSize { + return new LmdbAztecMapWithSize(this.#multiMapData, name); + } + + /** + * Creates a new AztecMapWithSize in the store. + * @param name - Name of the map + * @returns A new AztecMapWithSize + */ + openMapWithSize(name: string): AztecMapWithSize { + return new LmdbAztecMapWithSize(this.#data, name); + } /** * Creates a new AztecArray in the store. diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index c2a01ff4294..9c4cca388db 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -14,7 +14,7 @@ import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { P2PClient } from '../client/p2p_client.js'; import { type P2PConfig } from '../config.js'; import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; -import { InMemoryAttestationPool } from '../mem_pools/attestation_pool/memory_attestation_pool.js'; +import { KvAttestationPool } from '../mem_pools/attestation_pool/kv_attestation_pool.js'; import { type EpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js'; import { MemoryEpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/memory_epoch_proof_quote_pool.js'; import { type MemPools } from '../mem_pools/interface.js'; @@ -51,7 +51,7 @@ export const createP2PClient = async ( epochProofQuotePool: deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry), attestationPool: clientType === P2PClientType.Full - ? ((deps.attestationPool ?? new InMemoryAttestationPool(telemetry)) as T extends P2PClientType.Full + ? ((deps.attestationPool ?? new KvAttestationPool(store, telemetry)) as T extends P2PClientType.Full ? AttestationPool : undefined) : undefined, diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts index bb7ecb5b704..d7f3e434d09 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts @@ -39,6 +39,16 @@ export interface AttestationPool { */ deleteAttestationsForSlot(slot: bigint): Promise; + /** + * Delete Attestations for slot and proposal + * + * Removes all attestations associated with a slot and proposal + * + * @param slot - The slot to delete. + * @param proposalId - The proposal to delete. + */ + deleteAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise; + /** * Get Attestations for slot * diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts new file mode 100644 index 00000000000..f8f838a08ab --- /dev/null +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts @@ -0,0 +1,237 @@ +import { type BlockAttestation, TxHash } from '@aztec/circuit-types'; +import { Secp256k1Signer } from '@aztec/foundation/crypto'; +import { Fr } from '@aztec/foundation/fields'; + +import { jest } from '@jest/globals'; +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { type PoolInstrumentation } from '../instrumentation.js'; +import { type AttestationPool } from './attestation_pool.js'; +import { mockAttestation } from './mocks.js'; + +const NUMBER_OF_SIGNERS_PER_TEST = 4; + +export function describeAttestationPool(getAttestationPool: () => AttestationPool) { + let ap: AttestationPool; + let signers: Secp256k1Signer[]; + + // Check that metrics are recorded correctly + let metricsMock: MockProxy>; + + beforeEach(() => { + ap = getAttestationPool(); + signers = Array.from({ length: NUMBER_OF_SIGNERS_PER_TEST }, () => Secp256k1Signer.random()); + + metricsMock = mock>(); + // Can i overwrite this like this?? + (ap as any).metrics = metricsMock; + }); + + const createAttestationsForSlot = (slotNumber: number) => { + const archive = Fr.random(); + return signers.map(signer => mockAttestation(signer, slotNumber, archive)); + }; + + // We compare buffers as the objects can have cached values attached to them which are not serialised + // using array containing as the kv store does not respect insertion order + const compareAttestations = (a1: BlockAttestation[], a2: BlockAttestation[]) => { + const a1Buffer = a1.map(attestation => attestation.toBuffer()); + const a2Buffer = a2.map(attestation => attestation.toBuffer()); + expect(a1Buffer.length).toBe(a2Buffer.length); + expect(a1Buffer).toEqual(expect.arrayContaining(a2Buffer)); + }; + + it('should add attestations to pool', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); + + await ap.addAttestations(attestations); + + // Check metrics have been updated. + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + + compareAttestations(retreivedAttestations, attestations); + + // Delete by slot + await ap.deleteAttestationsForSlot(BigInt(slotNumber)); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + expect(retreivedAttestationsAfterDelete.length).toBe(0); + }); + + it('Should handle duplicate proposals in a slot', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const txs = [0, 1, 2, 3, 4, 5].map(() => TxHash.random()); + + // Use the same signer for all attestations + const attestations: BlockAttestation[] = []; + const signer = signers[0]; + for (let i = 0; i < NUMBER_OF_SIGNERS_PER_TEST; i++) { + attestations.push(mockAttestation(signer, slotNumber, archive, txs)); + } + + await ap.addAttestations(attestations); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + expect(retreivedAttestations.length).toBe(1); + expect(retreivedAttestations[0].toBuffer()).toEqual(attestations[0].toBuffer()); + expect(retreivedAttestations[0].payload.txHashes).toEqual(txs); + expect(retreivedAttestations[0].getSender().toString()).toEqual(signer.address.toString()); + }); + + it('Should store attestations by differing slot', async () => { + const slotNumbers = [1, 2, 3, 4]; + const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i])); + + await ap.addAttestations(attestations); + + for (const attestation of attestations) { + const slot = attestation.payload.header.globalVariables.slotNumber; + const archive = attestation.archive.toString(); + + const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive); + expect(retreivedAttestations.length).toBe(1); + expect(retreivedAttestations[0].toBuffer()).toEqual(attestation.toBuffer()); + expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); + } + }); + + it('Should store attestations by differing slot and archive', async () => { + const slotNumbers = [1, 1, 2, 3]; + const archives = [Fr.random(), Fr.random(), Fr.random(), Fr.random()]; + const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i], archives[i])); + + await ap.addAttestations(attestations); + + for (const attestation of attestations) { + const slot = attestation.payload.header.globalVariables.slotNumber; + const proposalId = attestation.archive.toString(); + + const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId); + expect(retreivedAttestations.length).toBe(1); + expect(retreivedAttestations[0].toBuffer()).toEqual(attestation.toBuffer()); + expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); + } + }); + + it('Should delete attestations', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + compareAttestations(retreivedAttestations, attestations); + + await ap.deleteAttestations(attestations); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); + + const gottenAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(gottenAfterDelete.length).toBe(0); + }); + + it('Should blanket delete attestations per slot', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive))); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + compareAttestations(retreivedAttestations, attestations); + + await ap.deleteAttestationsForSlot(BigInt(slotNumber)); + + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestationsAfterDelete.length).toBe(0); + }); + + it('Should blanket delete attestations per slot and proposal', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); + const proposalId = attestations[0].archive.toString(); + + // Add another set of attestations with a different proposalId, yet the same slot + const archive2 = Fr.random(); + const attestations2 = signers.map(signer => mockAttestation(signer, slotNumber, archive2)); + const proposalId2 = attestations2[0].archive.toString(); + + await ap.addAttestations(attestations); + await ap.addAttestations(attestations2); + + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations2.length); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + compareAttestations(retreivedAttestations, attestations); + + await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestationsAfterDelete.length).toBe(0); + + const retreivedAttestationsAfterDeleteForOtherProposal = await ap.getAttestationsForSlot( + BigInt(slotNumber), + proposalId2, + ); + expect(retreivedAttestationsAfterDeleteForOtherProposal.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + compareAttestations(retreivedAttestationsAfterDeleteForOtherProposal, attestations2); + }); + + it('Should blanket delete attestations per slot and proposal (does not perform db ops if there are no attestations)', async () => { + const slotNumber = 420; + const proposalId = 'proposalId'; + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(0); + + await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledTimes(0); + }); + + it('Should delete attestations older than a given slot', async () => { + const slotNumbers = [1, 2, 3, 69, 72, 74, 88, 420]; + const attestations = slotNumbers.map(slotNumber => createAttestationsForSlot(slotNumber)).flat(); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + const attestationsForSlot1 = await ap.getAttestationsForSlot(BigInt(1), proposalId); + expect(attestationsForSlot1.length).toBe(signers.length); + + const deleteAttestationsSpy = jest.spyOn(ap, 'deleteAttestationsForSlot'); + + await ap.deleteAttestationsOlderThan(BigInt(73)); + + const attestationsForSlot1AfterDelete = await ap.getAttestationsForSlot(BigInt(1), proposalId); + expect(attestationsForSlot1AfterDelete.length).toBe(0); + + expect(deleteAttestationsSpy).toHaveBeenCalledTimes(5); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(1)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(2)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(3)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(69)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(72)); + }); +} diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts new file mode 100644 index 00000000000..2832694784c --- /dev/null +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts @@ -0,0 +1,18 @@ +import { type AztecKVStore } from '@aztec/kv-store'; +import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; + +import { describeAttestationPool } from './attestation_pool_test_suite.js'; +import { KvAttestationPool } from './kv_attestation_pool.js'; + +describe('KV Attestation Pool', () => { + let kvAttestationPool: KvAttestationPool; + let store: AztecKVStore; + + beforeEach(() => { + store = openTmpStore(); + kvAttestationPool = new KvAttestationPool(store, new NoopTelemetryClient()); + }); + + describeAttestationPool(() => kvAttestationPool); +}); diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts new file mode 100644 index 00000000000..8de98828eed --- /dev/null +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts @@ -0,0 +1,153 @@ +import { BlockAttestation } from '@aztec/circuit-types'; +import { Fr } from '@aztec/foundation/fields'; +import { createLogger } from '@aztec/foundation/log'; +import { type AztecKVStore, type AztecMapWithSize, type AztecMultiMap } from '@aztec/kv-store'; +import { type TelemetryClient } from '@aztec/telemetry-client'; + +import { PoolInstrumentation, PoolName } from '../instrumentation.js'; +import { type AttestationPool } from './attestation_pool.js'; + +export class KvAttestationPool implements AttestationPool { + private metrics: PoolInstrumentation; + + // Index of all proposal ids in a slot + private attestations: AztecMultiMap; + + constructor( + private store: AztecKVStore, + telemetry: TelemetryClient, + private log = createLogger('aztec:attestation_pool'), + ) { + this.attestations = store.openMultiMap('attestations'); + this.metrics = new PoolInstrumentation(telemetry, PoolName.ATTESTATION_POOL); + } + + private getProposalMapKey(slot: string, proposalId: string): string { + return `proposal-${slot}-${proposalId}`; + } + + /** + * Get the proposal map for a given slot and proposalId + * + * Essentially a nested mapping of address -> attestation + * + * @param slot - The slot to get the proposal map for + * @param proposalId - The proposalId to get the map for + * @returns The proposal map + */ + private getProposalMap(slot: string, proposalId: string): AztecMapWithSize { + const mapKey = this.getProposalMapKey(slot, proposalId); + return this.store.openMapWithSize(mapKey); + } + + public async addAttestations(attestations: BlockAttestation[]): Promise { + for (const attestation of attestations) { + const slotNumber = attestation.payload.header.globalVariables.slotNumber.toString(); + const proposalId = attestation.archive.toString(); + const address = attestation.getSender().toString(); + + // Index the proposalId in the slot map + await this.attestations.set(slotNumber, proposalId); + + // Store the actual attestation in the proposal map + const proposalMap = this.getProposalMap(slotNumber, proposalId); + await proposalMap.set(address, attestation.toBuffer()); + + this.log.verbose(`Added attestation for slot ${slotNumber} from ${address}`); + } + + this.metrics.recordAddedObjects(attestations.length); + } + + public getAttestationsForSlot(slot: bigint, proposalId: string): Promise { + const slotNumber = new Fr(slot).toString(); + const proposalMap = this.getProposalMap(slotNumber, proposalId); + const attestations = proposalMap.values(); + const attestationsArray = Array.from(attestations).map(attestation => BlockAttestation.fromBuffer(attestation)); + return Promise.resolve(attestationsArray); + } + + public async deleteAttestationsOlderThan(oldestSlot: bigint): Promise { + const olderThan = []; + + const slots = this.attestations.keys(); + for (const slot of slots) { + if (BigInt(slot) < oldestSlot) { + olderThan.push(slot); + } + } + + await Promise.all(olderThan.map(oldSlot => this.deleteAttestationsForSlot(BigInt(oldSlot)))); + return Promise.resolve(); + } + + public async deleteAttestationsForSlot(slot: bigint): Promise { + const deletionPromises = []; + + const slotString = new Fr(slot).toString(); + let numberOfAttestations = 0; + const proposalIds = this.attestations.getValues(slotString); + + if (proposalIds) { + for (const proposalId of proposalIds) { + const proposalMap = this.getProposalMap(slotString, proposalId); + numberOfAttestations += proposalMap.size(); + deletionPromises.push(proposalMap.clear()); + } + } + + await Promise.all(deletionPromises); + + this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot}`); + this.metrics.recordRemovedObjects(numberOfAttestations); + return Promise.resolve(); + } + + public async deleteAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise { + const deletionPromises = []; + + const slotString = new Fr(slot).toString(); + const exists = this.attestations.get(slotString); + + if (exists) { + // Remove the proposalId from the slot index + deletionPromises.push(this.attestations.deleteValue(slotString, proposalId)); + + // Delete all attestations for the proposalId + const proposalMap = this.getProposalMap(slotString, proposalId); + const numberOfAttestations = proposalMap.size(); + deletionPromises.push(proposalMap.clear()); + + this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot} and proposal ${proposalId}`); + this.metrics.recordRemovedObjects(numberOfAttestations); + } + + await Promise.all(deletionPromises); + return Promise.resolve(); + } + + public async deleteAttestations(attestations: BlockAttestation[]): Promise { + const deletionPromises = []; + + for (const attestation of attestations) { + const slotNumber = attestation.payload.header.globalVariables.slotNumber.toString(); + const proposalId = attestation.archive.toString(); + const proposalMap = this.getProposalMap(slotNumber, proposalId); + + if (proposalMap) { + const address = attestation.getSender().toString(); + deletionPromises.push(proposalMap.delete(address)); + this.log.debug(`Deleted attestation for slot ${slotNumber} from ${address}`); + } + + if (proposalMap.size() === 0) { + deletionPromises.push(this.attestations.deleteValue(slotNumber, proposalId)); + } + } + + await Promise.all(deletionPromises); + + this.metrics.recordRemovedObjects(attestations.length); + return Promise.resolve(); + } +} diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts index ef80dad21ec..5d2cd81b625 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts @@ -1,205 +1,13 @@ -import { type BlockAttestation, TxHash } from '@aztec/circuit-types'; -import { Secp256k1Signer } from '@aztec/foundation/crypto'; -import { Fr } from '@aztec/foundation/fields'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; -import { jest } from '@jest/globals'; -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { type PoolInstrumentation } from '../instrumentation.js'; +import { describeAttestationPool } from './attestation_pool_test_suite.js'; import { InMemoryAttestationPool } from './memory_attestation_pool.js'; -import { mockAttestation } from './mocks.js'; - -const NUMBER_OF_SIGNERS_PER_TEST = 4; - -describe('MemoryAttestationPool', () => { - let ap: InMemoryAttestationPool; - let signers: Secp256k1Signer[]; - const telemetry = new NoopTelemetryClient(); - - // Check that metrics are recorded correctly - let metricsMock: MockProxy>; +describe('In-Memory Attestation Pool', () => { + let inMemoryAttestationPool: InMemoryAttestationPool; beforeEach(() => { - // Use noop telemetry client while testing. - - ap = new InMemoryAttestationPool(telemetry); - signers = Array.from({ length: NUMBER_OF_SIGNERS_PER_TEST }, () => Secp256k1Signer.random()); - - metricsMock = mock>(); - // Can i overwrite this like this?? - (ap as any).metrics = metricsMock; - }); - - const createAttestationsForSlot = (slotNumber: number) => { - const archive = Fr.random(); - return signers.map(signer => mockAttestation(signer, slotNumber, archive)); - }; - - it('should add attestations to pool', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); - - await ap.addAttestations(attestations); - - // Check metrics have been updated. - expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); - - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - // Delete by slot - await ap.deleteAttestationsForSlot(BigInt(slotNumber)); - - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); - expect(retreivedAttestationsAfterDelete.length).toBe(0); - }); - - it('Should handle duplicate proposals in a slot', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const txs = [0, 1, 2, 3, 4, 5].map(() => TxHash.random()); - - // Use the same signer for all attestations - const attestations: BlockAttestation[] = []; - const signer = signers[0]; - for (let i = 0; i < NUMBER_OF_SIGNERS_PER_TEST; i++) { - attestations.push(mockAttestation(signer, slotNumber, archive, txs)); - } - - await ap.addAttestations(attestations); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); - expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestations[0]); - expect(retreivedAttestations[0].payload.txHashes).toEqual(txs); - expect(retreivedAttestations[0].getSender().toString()).toEqual(signer.address.toString()); - }); - - it('Should store attestations by differing slot', async () => { - const slotNumbers = [1, 2, 3, 4]; - const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i])); - - await ap.addAttestations(attestations); - - for (const attestation of attestations) { - const slot = attestation.payload.header.globalVariables.slotNumber; - const archive = attestation.archive.toString(); - - const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive); - expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestation); - expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); - } - }); - - it('Should store attestations by differing slot and archive', async () => { - const slotNumbers = [1, 2, 3, 4]; - const archives = [Fr.random(), Fr.random(), Fr.random(), Fr.random()]; - const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i], archives[i])); - - await ap.addAttestations(attestations); - - for (const attestation of attestations) { - const slot = attestation.payload.header.globalVariables.slotNumber; - const proposalId = attestation.archive.toString(); - - const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId); - expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestation); - expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); - } - }); - - it('Should delete attestations', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - await ap.deleteAttestations(attestations); - - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - - const gottenAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(gottenAfterDelete.length).toBe(0); - }); - - it('Should blanket delete attestations per slot', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive))); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - await ap.deleteAttestationsForSlot(BigInt(slotNumber)); - - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestationsAfterDelete.length).toBe(0); + inMemoryAttestationPool = new InMemoryAttestationPool(new NoopTelemetryClient()); }); - it('Should blanket delete attestations per slot and proposal', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); - - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestationsAfterDelete.length).toBe(0); - }); - - it('Should delete attestations older than a given slot', async () => { - const slotNumbers = [1, 2, 3, 69, 72, 74, 88, 420]; - const attestations = slotNumbers.map(slotNumber => createAttestationsForSlot(slotNumber)).flat(); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - const attestationsForSlot1 = await ap.getAttestationsForSlot(BigInt(1), proposalId); - expect(attestationsForSlot1.length).toBe(signers.length); - - const deleteAttestationsSpy = jest.spyOn(ap, 'deleteAttestationsForSlot'); - - await ap.deleteAttestationsOlderThan(BigInt(73)); - - const attestationsForSlot1AfterDelete = await ap.getAttestationsForSlot(BigInt(1), proposalId); - expect(attestationsForSlot1AfterDelete.length).toBe(0); - - expect(deleteAttestationsSpy).toHaveBeenCalledTimes(5); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(1)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(2)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(3)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(69)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(72)); - }); + describeAttestationPool(() => inMemoryAttestationPool); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts index 6b11f5b01c6..d94075e288c 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts @@ -69,6 +69,7 @@ const makeMockPools = () => { deleteAttestations: jest.fn(), deleteAttestationsForSlot: jest.fn(), deleteAttestationsOlderThan: jest.fn(), + deleteAttestationsForSlotAndProposal: jest.fn(), getAttestationsForSlot: jest.fn().mockReturnValue(undefined), }, epochProofQuotePool: {