-
Notifications
You must be signed in to change notification settings - Fork 293
/
Copy pathpublic_processor.ts
281 lines (248 loc) · 10.6 KB
/
public_processor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import {
type FailedTx,
NestedProcessReturnValues,
type ProcessedTx,
type ProcessedTxHandler,
Tx,
type TxValidator,
makeProcessedTx,
validateProcessedTx,
} from '@aztec/circuit-types';
import {
AztecAddress,
ContractClassRegisteredEvent,
FEE_JUICE_ADDRESS,
type GlobalVariables,
type Header,
MAX_PUBLIC_DATA_UPDATE_REQUESTS_PER_TX,
PROTOCOL_PUBLIC_DATA_UPDATE_REQUESTS_PER_TX,
PublicDataUpdateRequest,
} from '@aztec/circuits.js';
import { times } from '@aztec/foundation/collection';
import { createDebugLogger } from '@aztec/foundation/log';
import { Timer } from '@aztec/foundation/timer';
import { ClassRegistererAddress } from '@aztec/protocol-contracts/class-registerer';
import { Attributes, type TelemetryClient, type Tracer, trackSpan } from '@aztec/telemetry-client';
import { type ContractDataSource } from '@aztec/types/contracts';
import { type MerkleTreeOperations } from '@aztec/world-state';
import { type SimulationProvider } from '../providers/index.js';
import { EnqueuedCallsProcessor } from './enqueued_calls_processor.js';
import { PublicExecutor } from './executor.js';
import { computeFeePayerBalanceLeafSlot, computeFeePayerBalanceStorageSlot } from './fee_payment.js';
import { WorldStateDB } from './public_db_sources.js';
import { RealPublicKernelCircuitSimulator } from './public_kernel.js';
import { type PublicKernelCircuitSimulator } from './public_kernel_circuit_simulator.js';
import { PublicProcessorMetrics } from './public_processor_metrics.js';
/**
* Creates new instances of PublicProcessor given the provided merkle tree db and contract data source.
*/
export class PublicProcessorFactory {
constructor(
private merkleTree: MerkleTreeOperations,
private contractDataSource: ContractDataSource,
private simulator: SimulationProvider,
private telemetryClient: TelemetryClient,
) {}
/**
* Creates a new instance of a PublicProcessor.
* @param historicalHeader - The header of a block previous to the one in which the tx is included.
* @param globalVariables - The global variables for the block being processed.
* @returns A new instance of a PublicProcessor.
*/
public create(maybeHistoricalHeader: Header | undefined, globalVariables: GlobalVariables): PublicProcessor {
const { merkleTree, telemetryClient } = this;
const historicalHeader = maybeHistoricalHeader ?? merkleTree.getInitialHeader();
const worldStateDB = new WorldStateDB(merkleTree, this.contractDataSource);
const publicExecutor = new PublicExecutor(worldStateDB, historicalHeader, telemetryClient);
const publicKernelSimulator = new RealPublicKernelCircuitSimulator(this.simulator);
return PublicProcessor.create(
merkleTree,
publicExecutor,
publicKernelSimulator,
globalVariables,
historicalHeader,
worldStateDB,
this.telemetryClient,
);
}
}
/**
* Converts Txs lifted from the P2P module into ProcessedTx objects by executing
* any public function calls in them. Txs with private calls only are unaffected.
*/
export class PublicProcessor {
private metrics: PublicProcessorMetrics;
constructor(
protected db: MerkleTreeOperations,
protected publicExecutor: PublicExecutor,
protected publicKernel: PublicKernelCircuitSimulator,
protected globalVariables: GlobalVariables,
protected historicalHeader: Header,
protected worldStateDB: WorldStateDB,
protected enqueuedCallsProcessor: EnqueuedCallsProcessor,
telemetryClient: TelemetryClient,
private log = createDebugLogger('aztec:sequencer:public-processor'),
) {
this.metrics = new PublicProcessorMetrics(telemetryClient, 'PublicProcessor');
}
static create(
db: MerkleTreeOperations,
publicExecutor: PublicExecutor,
publicKernelSimulator: PublicKernelCircuitSimulator,
globalVariables: GlobalVariables,
historicalHeader: Header,
worldStateDB: WorldStateDB,
telemetryClient: TelemetryClient,
) {
const enqueuedCallsProcessor = EnqueuedCallsProcessor.create(
db,
publicExecutor,
publicKernelSimulator,
globalVariables,
historicalHeader,
worldStateDB,
);
return new PublicProcessor(
db,
publicExecutor,
publicKernelSimulator,
globalVariables,
historicalHeader,
worldStateDB,
enqueuedCallsProcessor,
telemetryClient,
);
}
get tracer(): Tracer {
return this.metrics.tracer;
}
/**
* Run each tx through the public circuit and the public kernel circuit if needed.
* @param txs - Txs to process.
* @param processedTxHandler - Handler for processed txs in the context of block building or proving.
* @returns The list of processed txs with their circuit simulation outputs.
*/
public async process(
txs: Tx[],
maxTransactions = txs.length,
processedTxHandler?: ProcessedTxHandler,
txValidator?: TxValidator<ProcessedTx>,
): Promise<[ProcessedTx[], FailedTx[], NestedProcessReturnValues[]]> {
// The processor modifies the tx objects in place, so we need to clone them.
txs = txs.map(tx => Tx.clone(tx));
const result: ProcessedTx[] = [];
const failed: FailedTx[] = [];
let returns: NestedProcessReturnValues[] = [];
for (const tx of txs) {
// only process up to the limit of the block
if (result.length >= maxTransactions) {
break;
}
try {
const [processedTx, returnValues] = !tx.hasPublicCalls()
? [makeProcessedTx(tx, tx.data.toKernelCircuitPublicInputs(), [])]
: await this.processTxWithPublicCalls(tx);
this.log.debug(`Processed tx`, {
txHash: processedTx.hash,
historicalHeaderHash: processedTx.data.constants.historicalHeader.hash(),
blockNumber: processedTx.data.constants.globalVariables.blockNumber,
lastArchiveRoot: processedTx.data.constants.historicalHeader.lastArchive.root,
});
// Set fee payment update request into the processed tx
processedTx.finalPublicDataUpdateRequests = await this.createFinalDataUpdateRequests(processedTx);
// Commit the state updates from this transaction
await this.worldStateDB.commit();
validateProcessedTx(processedTx);
// Re-validate the transaction
if (txValidator) {
// Only accept processed transactions that are not double-spends,
// public functions emitting nullifiers would pass earlier check but fail here.
// Note that we're checking all nullifiers generated in the private execution twice,
// we could store the ones already checked and skip them here as an optimization.
const [_, invalid] = await txValidator.validateTxs([processedTx]);
if (invalid.length) {
throw new Error(`Transaction ${invalid[0].hash} invalid after processing public functions`);
}
}
// if we were given a handler then send the transaction to it for block building or proving
if (processedTxHandler) {
await processedTxHandler.addNewTx(processedTx);
}
result.push(processedTx);
returns = returns.concat(returnValues ?? []);
} catch (err: any) {
const errorMessage = err instanceof Error ? err.message : 'Unknown error';
this.log.warn(`Failed to process tx ${tx.getTxHash()}: ${errorMessage} ${err?.stack}`);
failed.push({
tx,
error: err instanceof Error ? err : new Error(errorMessage),
});
returns.push(new NestedProcessReturnValues([]));
}
}
return [result, failed, returns];
}
/**
* Creates the final set of data update requests for the transaction. This includes the
* set of public data update requests as returned by the public kernel, plus a data update
* request for updating fee balance. It also updates the local public state db.
* See build_or_patch_payment_update_request in base_rollup_inputs.nr for more details.
*/
private async createFinalDataUpdateRequests(tx: ProcessedTx) {
const finalPublicDataUpdateRequests = [
...tx.data.end.publicDataUpdateRequests,
...times(PROTOCOL_PUBLIC_DATA_UPDATE_REQUESTS_PER_TX, () => PublicDataUpdateRequest.empty()),
];
const feePayer = tx.data.feePayer;
if (feePayer.isZero()) {
return finalPublicDataUpdateRequests;
}
const feeJuiceAddress = AztecAddress.fromBigInt(FEE_JUICE_ADDRESS);
const balanceSlot = computeFeePayerBalanceStorageSlot(feePayer);
const leafSlot = computeFeePayerBalanceLeafSlot(feePayer);
const txFee = tx.data.getTransactionFee(this.globalVariables.gasFees);
this.log.debug(`Deducting ${txFee} balance in Fee Juice for ${feePayer}`);
const existingBalanceWriteIndex = finalPublicDataUpdateRequests.findIndex(request =>
request.leafSlot.equals(leafSlot),
);
const balance =
existingBalanceWriteIndex > -1
? finalPublicDataUpdateRequests[existingBalanceWriteIndex].newValue
: await this.worldStateDB.storageRead(feeJuiceAddress, balanceSlot);
if (balance.lt(txFee)) {
throw new Error(`Not enough balance for fee payer to pay for transaction (got ${balance} needs ${txFee})`);
}
const updatedBalance = balance.sub(txFee);
await this.worldStateDB.storageWrite(feeJuiceAddress, balanceSlot, updatedBalance);
finalPublicDataUpdateRequests[
existingBalanceWriteIndex > -1 ? existingBalanceWriteIndex : MAX_PUBLIC_DATA_UPDATE_REQUESTS_PER_TX
] = new PublicDataUpdateRequest(leafSlot, updatedBalance, 0);
return finalPublicDataUpdateRequests;
}
@trackSpan('PublicProcessor.processTxWithPublicCalls', tx => ({
[Attributes.TX_HASH]: tx.getTxHash().toString(),
}))
private async processTxWithPublicCalls(tx: Tx): Promise<[ProcessedTx, NestedProcessReturnValues[]]> {
const timer = new Timer();
const { tailKernelOutput, returnValues, revertReason, provingRequests, gasUsed, processedPhases } =
await this.enqueuedCallsProcessor.process(tx);
if (!tailKernelOutput) {
this.metrics.recordFailedTx();
throw new Error('Final public kernel was not executed.');
}
processedPhases.forEach(phase => {
if (phase.revertReason) {
this.metrics.recordRevertedPhase(phase.phase);
} else {
this.metrics.recordPhaseDuration(phase.phase, phase.durationMs);
}
});
this.metrics.recordClassRegistration(
...ContractClassRegisteredEvent.fromLogs(tx.unencryptedLogs.unrollLogs(), ClassRegistererAddress),
);
const phaseCount = processedPhases.length;
this.metrics.recordTx(phaseCount, timer.ms());
const processedTx = makeProcessedTx(tx, tailKernelOutput, provingRequests, revertReason, gasUsed);
return [processedTx, returnValues];
}
}