Skip to content

Commit

Permalink
feat: better tracing/metrics in validator and archiver (#9108)
Browse files Browse the repository at this point in the history
## Overview

Adds tracing to the block proposal + attestations 

This pr is ground work to tracking attestations and proposals across the
network

---------

Co-authored-by: Maddiaa0 <[email protected]>
  • Loading branch information
just-mitch and Maddiaa0 authored Oct 10, 2024
1 parent 3138078 commit 1801f5b
Show file tree
Hide file tree
Showing 20 changed files with 150 additions and 33 deletions.
1 change: 1 addition & 0 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ export class Archiver implements ArchiveSource {
// if we are here then we must have a valid proven epoch number
await this.store.setProvenL2EpochNumber(Number(provenEpochNumber));
}
this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber));
};

// This is an edge case that we only hit if there are no proposed blocks.
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ export class AztecNodeService implements AztecNode {

const simulationProvider = await createSimulationProvider(config, log);

const validatorClient = createValidatorClient(config, p2pClient);
const validatorClient = createValidatorClient(config, p2pClient, telemetry);

// now create the sequencer
const sequencer = config.disableSequencer
Expand Down
11 changes: 8 additions & 3 deletions yarn-project/circuit-types/src/p2p/block_attestation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { type EthAddress } from '@aztec/circuits.js';
import { Buffer32 } from '@aztec/foundation/buffer';
import { recoverAddress } from '@aztec/foundation/crypto';
import { keccak256, recoverAddress } from '@aztec/foundation/crypto';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Signature } from '@aztec/foundation/eth-signature';
import { type Fr } from '@aztec/foundation/fields';
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';

import { ConsensusPayload } from './consensus_payload.js';
Expand Down Expand Up @@ -37,7 +38,11 @@ export class BlockAttestation extends Gossipable {
}

override p2pMessageIdentifier(): Buffer32 {
return BlockAttestationHash.fromField(this.payload.archive);
return new BlockAttestationHash(keccak256(this.signature.toBuffer()));
}

get archive(): Fr {
return this.payload.archive;
}

/**Get sender
Expand Down
7 changes: 6 additions & 1 deletion yarn-project/circuit-types/src/p2p/block_proposal.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { type EthAddress } from '@aztec/circuits.js';
import { Buffer32 } from '@aztec/foundation/buffer';
import { recoverAddress } from '@aztec/foundation/crypto';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Signature } from '@aztec/foundation/eth-signature';
import { type Fr } from '@aztec/foundation/fields';
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';

import { ConsensusPayload } from './consensus_payload.js';
Expand Down Expand Up @@ -40,6 +41,10 @@ export class BlockProposal extends Gossipable {
return BlockProposalHash.fromField(this.payload.archive);
}

get archive(): Fr {
return this.payload.archive;
}

static async createProposalFromSigner(
payload: ConsensusPayload,
payloadSigner: (payload: Buffer32) => Promise<Signature>,
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ export const createP2PClient = async (
proofVerifier,
worldStateSynchronizer,
store,
telemetry,
);
} else {
p2pService = new DummyP2PService();
}
return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor);
return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor, telemetry);
};

async function configureP2PClientAddresses(_config: P2PConfig & DataStoreConfig): Promise<P2PConfig & DataStoreConfig> {
Expand Down
13 changes: 8 additions & 5 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { mockEpochProofQuote, mockTx } from '@aztec/circuit-types';
import { retryUntil } from '@aztec/foundation/retry';
import { type AztecKVStore } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/utils';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { expect, jest } from '@jest/globals';

Expand All @@ -28,6 +30,7 @@ describe('In-Memory P2P Client', () => {
let p2pService: Mockify<P2PService>;
let kvStore: AztecKVStore;
let client: P2PClient;
const telemetryClient: TelemetryClient = new NoopTelemetryClient();

beforeEach(() => {
txPool = {
Expand Down Expand Up @@ -73,7 +76,7 @@ describe('In-Memory P2P Client', () => {
};

kvStore = openTmpStore();
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);
});

const advanceToProvenBlock = async (getProvenBlockNumber: number, provenEpochNumber = getProvenBlockNumber) => {
Expand Down Expand Up @@ -143,7 +146,7 @@ describe('In-Memory P2P Client', () => {
await client.start();
await client.stop();

const client2 = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
const client2 = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);
expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum());
});

Expand All @@ -158,7 +161,7 @@ describe('In-Memory P2P Client', () => {
});

it('deletes txs after waiting the set number of blocks', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10, telemetryClient);
blockSource.setProvenBlockNumber(0);
await client.start();
expect(txPool.deleteTxs).not.toHaveBeenCalled();
Expand All @@ -175,7 +178,7 @@ describe('In-Memory P2P Client', () => {
});

it('stores and returns epoch proof quotes', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);

blockSource.setProvenEpochNumber(2);
await client.start();
Expand Down Expand Up @@ -206,7 +209,7 @@ describe('In-Memory P2P Client', () => {
});

it('deletes expired proof quotes', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);

blockSource.setProvenEpochNumber(1);
blockSource.setProvenBlockNumber(1);
Expand Down
12 changes: 11 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants';
import { createDebugLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store';
import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';

import { type ENR } from '@chainsafe/enr';

Expand Down Expand Up @@ -169,7 +170,7 @@ export interface P2P {
/**
* The P2P client implementation.
*/
export class P2PClient implements P2P {
export class P2PClient extends WithTracer implements P2P {
/** L2 block download to stay in sync with latest blocks. */
private latestBlockDownloader: L2BlockDownloader;

Expand Down Expand Up @@ -210,8 +211,11 @@ export class P2PClient implements P2P {
mempools: MemPools,
private p2pService: P2PService,
private keepProvenTxsFor: number,
telemetryClient: TelemetryClient,
private log = createDebugLogger('aztec:p2p'),
) {
super(telemetryClient, 'P2PClient');

const { blockCheckIntervalMS: checkInterval, l2QueueSize: p2pL2QueueSize } = getP2PConfigEnvVars();
const l2DownloaderOpts = { maxQueueSize: p2pL2QueueSize, pollIntervalMS: checkInterval };
// TODO(palla/prover-node): This effectively downloads blocks twice from the archiver, which is an issue
Expand Down Expand Up @@ -318,6 +322,12 @@ export class P2PClient implements P2P {
this.log.info('P2P client stopped.');
}

@trackSpan('p2pClient.broadcastProposal', proposal => ({
[Attributes.BLOCK_NUMBER]: proposal.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: proposal.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: proposal.archive.toString(),
[Attributes.P2P_ID]: proposal.p2pMessageIdentifier().toString(),
}))
public broadcastProposal(proposal: BlockProposal): void {
this.log.verbose(`Broadcasting proposal ${proposal.p2pMessageIdentifier()} to peers`);
return this.p2pService.propagate(proposal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ describe('MemoryAttestationPool', () => {
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));

const proposalId = attestations[0].p2pMessageIdentifier().toString();

await ap.addAttestations(attestations);

// Check metrics have been updated.
expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString());

expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST);
expect(retreivedAttestations).toEqual(attestations);
Expand All @@ -52,7 +50,7 @@ describe('MemoryAttestationPool', () => {

expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString());
expect(retreivedAttestationsAfterDelete.length).toBe(0);
});

Expand All @@ -64,9 +62,9 @@ describe('MemoryAttestationPool', () => {

for (const attestation of attestations) {
const slot = attestation.payload.header.globalVariables.slotNumber;
const proposalId = attestation.p2pMessageIdentifier().toString();
const archive = attestation.archive.toString();

const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId);
const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive);
expect(retreivedAttestations.length).toBe(1);
expect(retreivedAttestations[0]).toEqual(attestation);
expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot);
Expand All @@ -84,7 +82,7 @@ describe('MemoryAttestationPool', () => {

for (const attestation of attestations) {
const slot = attestation.payload.header.globalVariables.slotNumber;
const proposalId = attestation.p2pMessageIdentifier().toString();
const proposalId = attestation.archive.toString();

const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId);
expect(retreivedAttestations.length).toBe(1);
Expand All @@ -97,7 +95,7 @@ describe('MemoryAttestationPool', () => {
const slotNumber = 420;
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));
const proposalId = attestations[0].p2pMessageIdentifier().toString();
const proposalId = attestations[0].archive.toString();

await ap.addAttestations(attestations);

Expand All @@ -119,7 +117,7 @@ describe('MemoryAttestationPool', () => {
const slotNumber = 420;
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));
const proposalId = attestations[0].p2pMessageIdentifier().toString();
const proposalId = attestations[0].archive.toString();

await ap.addAttestations(attestations);

Expand All @@ -137,7 +135,7 @@ describe('MemoryAttestationPool', () => {
const slotNumber = 420;
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));
const proposalId = attestations[0].p2pMessageIdentifier().toString();
const proposalId = attestations[0].archive.toString();

await ap.addAttestations(attestations);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class InMemoryAttestationPool implements AttestationPool {
// Perf: order and group by slot before insertion
const slotNumber = attestation.payload.header.globalVariables.slotNumber;

const proposalId = attestation.p2pMessageIdentifier().toString();
const proposalId = attestation.archive.toString();
const address = attestation.getSender();

const slotAttestationMap = getSlotOrDefault(this.attestations, slotNumber.toBigInt());
Expand Down Expand Up @@ -89,7 +89,7 @@ export class InMemoryAttestationPool implements AttestationPool {
const slotNumber = attestation.payload.header.globalVariables.slotNumber;
const slotAttestationMap = this.attestations.get(slotNumber.toBigInt());
if (slotAttestationMap) {
const proposalId = attestation.p2pMessageIdentifier().toString();
const proposalId = attestation.archive.toString();
const proposalAttestationMap = getProposalOrDefault(slotAttestationMap, proposalId);
if (proposalAttestationMap) {
const address = attestation.getSender();
Expand Down
37 changes: 35 additions & 2 deletions yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { createDebugLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
import { RunningPromise } from '@aztec/foundation/running-promise';
import type { AztecKVStore } from '@aztec/kv-store';
import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';

import { type ENR } from '@chainsafe/enr';
import { type GossipSub, type GossipSubComponents, gossipsub } from '@chainsafe/libp2p-gossipsub';
Expand Down Expand Up @@ -77,7 +78,7 @@ export async function createLibP2PPeerId(privateKey?: string): Promise<PeerId> {
/**
* Lib P2P implementation of the P2PService interface.
*/
export class LibP2PService implements P2PService {
export class LibP2PService extends WithTracer implements P2PService {
private jobQueue: SerialQueue = new SerialQueue();
private peerManager: PeerManager;
private discoveryRunningPromise?: RunningPromise;
Expand All @@ -100,9 +101,13 @@ export class LibP2PService implements P2PService {
private l2BlockSource: L2BlockSource,
private proofVerifier: ClientProtocolCircuitVerifier,
private worldStateSynchronizer: WorldStateSynchronizer,
telemetry: TelemetryClient,
private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS,
private logger = createDebugLogger('aztec:libp2p_service'),
) {
// Instatntiate tracer
super(telemetry, 'LibP2PService');

this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger);
this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => {
return this.peerManager.getPeerScore(peerId);
Expand Down Expand Up @@ -204,6 +209,7 @@ export class LibP2PService implements P2PService {
proofVerifier: ClientProtocolCircuitVerifier,
worldStateSynchronizer: WorldStateSynchronizer,
store: AztecKVStore,
telemetry: TelemetryClient,
) {
const { tcpListenAddress, tcpAnnounceAddress, minPeerCount, maxPeerCount } = config;
const bindAddrTcp = convertToMultiaddr(tcpListenAddress, 'tcp');
Expand Down Expand Up @@ -306,6 +312,7 @@ export class LibP2PService implements P2PService {
l2BlockSource,
proofVerifier,
worldStateSynchronizer,
telemetry,
requestResponseHandlers,
);
}
Expand Down Expand Up @@ -397,6 +404,12 @@ export class LibP2PService implements P2PService {
*
* @param attestation - The attestation to process.
*/
@trackSpan('Libp2pService.processAttestationFromPeer', attestation => ({
[Attributes.BLOCK_NUMBER]: attestation.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: attestation.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: attestation.archive.toString(),
[Attributes.P2P_ID]: attestation.p2pMessageIdentifier().toString(),
}))
private async processAttestationFromPeer(attestation: BlockAttestation): Promise<void> {
this.logger.debug(`Received attestation ${attestation.p2pMessageIdentifier()} from external peer.`);
await this.mempools.attestationPool.addAttestations([attestation]);
Expand All @@ -409,17 +422,37 @@ export class LibP2PService implements P2PService {
* @param block - The block to process.
*/
// REVIEW: callback pattern https://github.com/AztecProtocol/aztec-packages/issues/7963
@trackSpan('Libp2pService.processBlockFromPeer', block => ({
[Attributes.BLOCK_NUMBER]: block.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: block.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: block.archive.toString(),
[Attributes.P2P_ID]: block.p2pMessageIdentifier().toString(),
}))
private async processBlockFromPeer(block: BlockProposal): Promise<void> {
this.logger.verbose(`Received block ${block.p2pMessageIdentifier()} from external peer.`);
const attestation = await this.blockReceivedCallback(block);

// TODO: fix up this pattern - the abstraction is not nice
// The attestation can be undefined if no handler is registered / the validator deems the block invalid
if (attestation != undefined) {
this.propagate(attestation);
this.broadcastAttestation(attestation);
}
}

/**
* Broadcast an attestation to all peers.
* @param attestation - The attestation to broadcast.
*/
@trackSpan('Libp2pService.broadcastAttestation', attestation => ({
[Attributes.BLOCK_NUMBER]: attestation.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: attestation.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: attestation.archive.toString(),
[Attributes.P2P_ID]: attestation.p2pMessageIdentifier().toString(),
}))
private broadcastAttestation(attestation: BlockAttestation): void {
this.propagate(attestation);
}

private processEpochProofQuoteFromPeer(epochProofQuote: EpochProofQuote): void {
this.logger.verbose(`Received epoch proof quote ${epochProofQuote.p2pMessageIdentifier()} from external peer.`);
this.mempools.epochProofQuotePool.addQuote(epochProofQuote);
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/sequencer-client/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ export class Sequencer {
this.isFlushing = true;
}

@trackSpan('Sequencer.collectAttestations', (block, txHashes) => ({
[Attributes.BLOCK_NUMBER]: block.number,
[Attributes.BLOCK_ARCHIVE]: block.archive.toString(),
[Attributes.BLOCK_TXS_COUNT]: txHashes.length,
}))
protected async collectAttestations(block: L2Block, txHashes: TxHash[]): Promise<Signature[] | undefined> {
// TODO(https://github.com/AztecProtocol/aztec-packages/issues/7962): inefficient to have a round trip in here - this should be cached
const committee = await this.publisher.getCurrentEpochCommittee();
Expand Down
Loading

0 comments on commit 1801f5b

Please sign in to comment.