Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Build blocks using txs with higher fee first #11093

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions yarn-project/circuit-types/src/tx/tx.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
ClientIvcProof,
Fr,
type GasSettings,
PrivateKernelTailCircuitPublicInputs,
PrivateLog,
type PrivateToPublicAccumulatedData,
Expand Down Expand Up @@ -88,6 +89,10 @@ export class Tx extends Gossipable {
return this.publicTeardownFunctionCall.isEmpty() ? undefined : this.publicTeardownFunctionCall;
}

getGasSettings(): GasSettings {
return this.data.constants.txContext.gasSettings;
}

/**
* Deserializes the Tx object from a Buffer.
* @param buffer - Buffer or BufferReader object to deserialize.
Expand Down
101 changes: 57 additions & 44 deletions yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Tx, TxHash } from '@aztec/circuit-types';
import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap, type AztecSet } from '@aztec/kv-store';
import { type AztecKVStore, type AztecMap, type AztecMultiMap } from '@aztec/kv-store';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { PoolInstrumentation, PoolName } from '../instrumentation.js';
import { getPendingTxPriority } from './priority.js';
import { type TxPool } from './tx_pool.js';

/**
Expand All @@ -16,10 +17,11 @@ export class AztecKVTxPool implements TxPool {
/** Our tx pool, stored as a Map, with K: tx hash and V: the transaction. */
#txs: AztecMap<string, Buffer>;

/** Index for pending txs. */
#pendingTxs: AztecSet<string>;
/** Index for mined txs. */
#minedTxs: AztecMap<string, number>;
/** Index from tx hash to the block number in which they were mined, filtered by mined txs. */
#minedTxHashToBlock: AztecMap<string, number>;

/** Index from tx priority (stored as hex) to its tx hash, filtered by pending txs. */
#pendingTxPriorityToHash: AztecMultiMap<string, string>;

#log: Logger;

Expand All @@ -32,27 +34,34 @@ export class AztecKVTxPool implements TxPool {
*/
constructor(store: AztecKVStore, telemetry: TelemetryClient, log = createLogger('p2p:tx_pool')) {
this.#txs = store.openMap('txs');
this.#minedTxs = store.openMap('minedTxs');
this.#pendingTxs = store.openSet('pendingTxs');
this.#minedTxHashToBlock = store.openMap('txHashToBlockMined');
this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash');

this.#store = store;
this.#log = log;
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize());
}

public markAsMined(txHashes: TxHash[], blockNumber: number): Promise<void> {
if (txHashes.length === 0) {
return Promise.resolve();
}

let deletedPending = 0;
return this.#store.transaction(() => {
let deleted = 0;
for (const hash of txHashes) {
const key = hash.toString();
void this.#minedTxs.set(key, blockNumber);
if (this.#pendingTxs.has(key)) {
deleted++;
void this.#pendingTxs.delete(key);
void this.#minedTxHashToBlock.set(key, blockNumber);

const tx = this.getTxByHash(hash);
if (tx) {
deletedPending++;
const fee = getPendingTxPriority(tx);
void this.#pendingTxPriorityToHash.deleteValue(fee, key);
}
}
this.#metrics.recordRemovedObjects(deleted, 'pending');
this.#metrics.recordAddedObjects(txHashes.length, 'mined');
this.#metrics.recordRemovedObjects(deletedPending, 'pending');
});
}

Expand All @@ -61,44 +70,41 @@ export class AztecKVTxPool implements TxPool {
return Promise.resolve();
}

let markedAsPending = 0;
return this.#store.transaction(() => {
let deleted = 0;
let added = 0;
for (const hash of txHashes) {
const key = hash.toString();
if (this.#minedTxs.has(key)) {
deleted++;
void this.#minedTxs.delete(key);
}
void this.#minedTxHashToBlock.delete(key);

if (this.#txs.has(key)) {
added++;
void this.#pendingTxs.add(key);
const tx = this.getTxByHash(hash);
if (tx) {
void this.#pendingTxPriorityToHash.set(getPendingTxPriority(tx), key);
markedAsPending++;
}
}

this.#metrics.recordRemovedObjects(deleted, 'mined');
this.#metrics.recordAddedObjects(added, 'pending');
this.#metrics.recordAddedObjects(markedAsPending, 'pending');
this.#metrics.recordRemovedObjects(markedAsPending, 'mined');
});
}

public getPendingTxHashes(): TxHash[] {
return Array.from(this.#pendingTxs.entries()).map(x => TxHash.fromString(x));
return Array.from(this.#pendingTxPriorityToHash.values({ reverse: true })).map(x => TxHash.fromString(x));
}

public getMinedTxHashes(): [TxHash, number][] {
return Array.from(this.#minedTxs.entries()).map(([txHash, blockNumber]) => [
return Array.from(this.#minedTxHashToBlock.entries()).map(([txHash, blockNumber]) => [
TxHash.fromString(txHash),
blockNumber,
]);
}

public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined {
const key = txHash.toString();
if (this.#pendingTxs.has(key)) {
return 'pending';
} else if (this.#minedTxs.has(key)) {
if (this.#minedTxHashToBlock.has(key)) {
return 'mined';
} else if (this.#txs.has(key)) {
return 'pending';
} else {
return undefined;
}
Expand All @@ -120,22 +126,22 @@ export class AztecKVTxPool implements TxPool {
* @returns Empty promise.
*/
public addTxs(txs: Tx[]): Promise<void> {
const txHashes = txs.map(tx => tx.getTxHash());
return this.#store.transaction(() => {
let pendingCount = 0;
for (const [i, tx] of txs.entries()) {
const txHash = txHashes[i];
for (const tx of txs) {
const txHash = tx.getTxHash();
this.#log.verbose(`Adding tx ${txHash.toString()} to pool`, {
eventName: 'tx-added-to-pool',
...tx.getStats(),
} satisfies TxAddedToPoolStats);

const key = txHash.toString();
void this.#txs.set(key, tx.toBuffer());
if (!this.#minedTxs.has(key)) {

if (!this.#minedTxHashToBlock.has(key)) {
pendingCount++;
// REFACTOR: Use an lmdb conditional write to avoid race conditions with this write tx
void this.#pendingTxs.add(key);
void this.#pendingTxPriorityToHash.set(getPendingTxPriority(tx), key);
this.#metrics.recordSize(tx);
}
}
Expand All @@ -150,20 +156,27 @@ export class AztecKVTxPool implements TxPool {
* @returns The number of transactions that was deleted from the pool.
*/
public deleteTxs(txHashes: TxHash[]): Promise<void> {
let pendingDeleted = 0;
let minedDeleted = 0;

return this.#store.transaction(() => {
let pendingDeleted = 0;
let minedDeleted = 0;
for (const hash of txHashes) {
const key = hash.toString();
void this.#txs.delete(key);
if (this.#pendingTxs.has(key)) {
pendingDeleted++;
void this.#pendingTxs.delete(key);
}
const tx = this.getTxByHash(hash);

if (tx) {
const fee = getPendingTxPriority(tx);
void this.#pendingTxPriorityToHash.deleteValue(fee, key);

const isMined = this.#minedTxHashToBlock.has(key);
if (isMined) {
minedDeleted++;
} else {
pendingDeleted++;
}

if (this.#minedTxs.has(key)) {
minedDeleted++;
void this.#minedTxs.delete(key);
void this.#txs.delete(key);
void this.#minedTxHashToBlock.delete(key);
}
}

Expand Down
6 changes: 5 additions & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@aztec/foundation/log';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { PoolInstrumentation, PoolName } from '../instrumentation.js';
import { getPendingTxPriority } from './priority.js';
import { type TxPool } from './tx_pool.js';

/**
Expand Down Expand Up @@ -68,7 +69,10 @@ export class InMemoryTxPool implements TxPool {
}

public getPendingTxHashes(): TxHash[] {
return Array.from(this.pendingTxs).map(x => TxHash.fromBigInt(x));
return this.getAllTxs()
.sort((tx1, tx2) => -getPendingTxPriority(tx1).localeCompare(getPendingTxPriority(tx2)))
.map(tx => tx.getTxHash())
.filter(txHash => this.pendingTxs.has(txHash.toBigInt()));
}

public getMinedTxHashes(): [TxHash, number][] {
Expand Down
13 changes: 13 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool/priority.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { type Tx } from '@aztec/circuit-types';
import { Buffer32 } from '@aztec/foundation/buffer';

/**
* Returns a string representing the priority of a tx.
* Txs with a higher priority value are returned first when retrieving pending tx hashes.
* We currently use the sum of the priority fees for the tx for this value, represented as hex.
*/
export function getPendingTxPriority(tx: Tx): string {
const priorityFees = tx.getGasSettings().maxPriorityFeesPerGas;
const totalFees = priorityFees.feePerDaGas.toBigInt() + priorityFees.feePerL2Gas.toBigInt();
return Buffer32.fromBigInt(totalFees).toString();
}
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface TxPool {
getAllTxHashes(): TxHash[];

/**
* Gets the hashes of pending transactions currently in the tx pool.
* Gets the hashes of pending transactions currently in the tx pool sorted by priority (see getPendingTxPriority).
* @returns An array of pending transaction hashes found in the tx pool.
*/
getPendingTxHashes(): TxHash[];
Expand Down
22 changes: 21 additions & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { mockTx } from '@aztec/circuit-types';
import { type Tx, mockTx } from '@aztec/circuit-types';
import { GasFees } from '@aztec/circuits.js';
import { unfreeze } from '@aztec/foundation/types';

import { type TxPool } from './tx_pool.js';

Expand Down Expand Up @@ -101,4 +103,22 @@ export function describeTxPool(getTxPool: () => TxPool) {
expect(poolTxHashes).toHaveLength(3);
expect(poolTxHashes).toEqual(expect.arrayContaining([tx1.getTxHash(), tx2.getTxHash(), tx3.getTxHash()]));
});

it('Returns pending tx hashes sorted by priority', async () => {
const withPriorityFee = (tx: Tx, fee: number) => {
unfreeze(tx.data.constants.txContext.gasSettings).maxPriorityFeesPerGas = new GasFees(fee, fee);
return tx;
};

const tx1 = withPriorityFee(mockTx(0), 1000);
const tx2 = withPriorityFee(mockTx(1), 100);
const tx3 = withPriorityFee(mockTx(2), 200);
const tx4 = withPriorityFee(mockTx(3), 3000);

await pool.addTxs([tx1, tx2, tx3, tx4]);

const poolTxHashes = pool.getPendingTxHashes();
expect(poolTxHashes).toHaveLength(4);
expect(poolTxHashes).toEqual([tx4, tx1, tx3, tx2].map(tx => tx.getTxHash()));
});
}
Loading