Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent race conditions around data pulled from L1 #2577

Merged
merged 7 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 84 additions & 7 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,27 @@ describe('Archiver', () => {
blocks[1].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(
1000n,
2501n,
blocks[2].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(102n, [
makeL1ToL2MessageAddedEvents(2502n, [
messageToCancel1,
messageToCancel2,
messageToStayPending1,
messageToStayPending2,
]),
];
publicClient.getBlockNumber.mockResolvedValueOnce(2500n).mockResolvedValueOnce(2501n).mockResolvedValueOnce(2502n);
publicClient.getBlockNumber.mockResolvedValueOnce(2500n).mockResolvedValueOnce(2600n).mockResolvedValueOnce(2700n);
// logs should be created in order of how archiver syncs.
publicClient.getLogs
.mockResolvedValueOnce(l1ToL2MessageAddedEvents.slice(0, 2).flat())
.mockResolvedValueOnce([]) // no messages to cancel
.mockResolvedValueOnce([makeL2BlockProcessedEvent(101n, 1n)])
.mockResolvedValueOnce([makeContractDeploymentEvent(103n, blocks[0])])
.mockResolvedValueOnce([makeContractDeploymentEvent(103n, blocks[0])]) // the first loop of the archiver ends here at block 2500
.mockResolvedValueOnce(l1ToL2MessageAddedEvents.slice(2, 4).flat())
.mockResolvedValueOnce(makeL1ToL2MessageCancelledEvents(1100n, l1ToL2MessagesToCancel))
.mockResolvedValueOnce([makeL2BlockProcessedEvent(1101n, 2n), makeL2BlockProcessedEvent(1150n, 3n)])
.mockResolvedValueOnce([makeContractDeploymentEvent(1102n, blocks[1])])
.mockResolvedValueOnce(makeL1ToL2MessageCancelledEvents(2503n, l1ToL2MessagesToCancel))
.mockResolvedValueOnce([makeL2BlockProcessedEvent(2510n, 2n), makeL2BlockProcessedEvent(2520n, 3n)])
.mockResolvedValueOnce([makeContractDeploymentEvent(2540n, blocks[1])])
.mockResolvedValue([]);
rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

Expand Down Expand Up @@ -127,6 +127,83 @@ describe('Archiver', () => {

await archiver.stop();
}, 10_000);

it('does not sync past current block number', async () => {
const numL2BlocksInTest = 2;
const archiver = new Archiver(
publicClient,
EthAddress.fromString(rollupAddress),
EthAddress.fromString(inboxAddress),
EthAddress.fromString(registryAddress),
EthAddress.fromString(contractDeploymentEmitterAddress),
0,
archiverStore,
1000,
);

let latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(0);

const createL1ToL2Messages = () => {
return [Fr.random().toString(true), Fr.random().toString(true)];
};

const blocks = blockNums.map(x => L2Block.random(x, 4, x, x + 1, x * 2, x * 3));
const rollupTxs = blocks.map(makeRollupTx);
// `L2Block.random(x)` creates some l1 to l2 messages. We add those,
// since it is expected by the test that these would be consumed.
// Archiver removes such messages from pending store.
// Also create some more messages to cancel and some that will stay pending.

const additionalL1ToL2MessagesBlock102 = createL1ToL2Messages();
const additionalL1ToL2MessagesBlock103 = createL1ToL2Messages();

const l1ToL2MessageAddedEvents = [
makeL1ToL2MessageAddedEvents(
100n,
blocks[0].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(
101n,
blocks[1].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(102n, additionalL1ToL2MessagesBlock102),
makeL1ToL2MessageAddedEvents(103n, additionalL1ToL2MessagesBlock103),
];

// Here we set the current L1 block number to 102. L1 to L2 messages after this should not be read.
publicClient.getBlockNumber.mockResolvedValue(102n);
// add all of the L1 to L2 messages to the mock
publicClient.getLogs
.mockImplementationOnce((args?: any) => {
return Promise.resolve(
l1ToL2MessageAddedEvents
.flat()
.filter(x => x.blockNumber! >= args.fromBlock && x.blockNumber! < args.toBlock),
);
})
.mockResolvedValueOnce([])
.mockResolvedValueOnce([makeL2BlockProcessedEvent(70n, 1n), makeL2BlockProcessedEvent(80n, 2n)])
.mockResolvedValue([]);
rollupTxs.slice(0, numL2BlocksInTest).forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

await archiver.start(false);

// Wait until block 3 is processed. If this won't happen the test will fail with timeout.
while ((await archiver.getBlockNumber()) !== numL2BlocksInTest) {
await sleep(100);
}

latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(numL2BlocksInTest);

// Check that the only pending L1 to L2 messages are those from eth bock 102
const expectedPendingMessageKeys = additionalL1ToL2MessagesBlock102;
const actualPendingMessageKeys = (await archiver.getPendingL1ToL2Messages(100)).map(key => key.toString(true));
expect(actualPendingMessageKeys).toEqual(expectedPendingMessageKeys);

await archiver.stop();
}, 10_000);
});

/**
Expand Down
35 changes: 31 additions & 4 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,49 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
return;
}

// ********** Ensuring Consistency of data pulled from L1 **********

/**
* There are a number of calls in this sync operation to L1 for retrieving
* events and transaction data. There are a couple of things we need to bear in mind
* to ensure that data is read exactly once.
*
* The first is the problem of eventually consistent ETH service providers like Infura.
* We are not currently handling this correctly in the case of L1 to L2 messages and we will
* want to re-visit L2 Block and contract data retrieval at a later stage. This is not
* currently a problem but will need to be addressed before a mainnet release.
*
* The second is that in between the various calls to L1, the block number can move meaning some
* of the following calls will return data for blocks that were not present during earlier calls.
* This is a problem for example when setting the last block number marker for L1 to L2 messages -
* this.lastProcessedBlockNumber = currentBlockNumber;
* It's possible that we actually received messages in block currentBlockNumber + 1 meaning the next time
* we do this sync we get the same message again. Addtionally, the call to get cancelled L1 to L2 messages
* could read from a block not present when retrieving pending messages. If a message was added and cancelled
* in the same eth block then we could try and cancel a non-existent pending message.
*
* To combat this for the time being we simply ensure that all data retrieval methods only retrieve
* data up to the currentBlockNumber captured at the top of this function. We might want to improve on this
* in future but for the time being it should give us the guarantees that we need
*
*/

// ********** Events that are processed in between blocks **********

// Process l1ToL2Messages, these are consumed as time passes, not each block
const retrievedPendingL1ToL2Messages = await retrieveNewPendingL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
currentBlockNumber,
this.lastProcessedBlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block
currentBlockNumber,
);
const retrievedCancelledL1ToL2Messages = await retrieveNewCancelledL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
currentBlockNumber,
this.lastProcessedBlockNumber + 1n,
currentBlockNumber,
);

// TODO (#717): optimise this - there could be messages in confirmed that are also in pending.
Expand All @@ -188,8 +215,8 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.publicClient,
this.rollupAddress,
blockUntilSynced,
currentBlockNumber,
this.nextL2BlockFromBlock,
currentBlockNumber,
nextExpectedL2BlockNum,
);

Expand All @@ -202,8 +229,8 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.publicClient,
this.contractDeploymentEmitterAddress,
blockUntilSynced,
currentBlockNumber,
this.nextL2BlockFromBlock,
currentBlockNumber,
blockHashMapping,
);
if (retrievedBlocks.retrievedData.length === 0) {
Expand Down
48 changes: 30 additions & 18 deletions yarn-project/archiver/src/archiver/data_retrieval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,30 @@ type DataRetrieval<T> = {
* @param publicClient - The viem public client to use for transaction retrieval.
* @param rollupAddress - The address of the rollup contract.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentL1BlockNum - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @param expectedNextL2BlockNum - The next L2 block number that we expect to find.
* @returns An array of L2 Blocks and the next eth block to search from
*/
export async function retrieveBlocks(
publicClient: PublicClient,
rollupAddress: EthAddress,
blockUntilSynced: boolean,
currentL1BlockNum: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
expectedNextL2BlockNum: bigint,
): Promise<DataRetrieval<L2Block>> {
const retrievedBlocks: L2Block[] = [];
do {
if (searchStartBlock > currentL1BlockNum) {
if (searchStartBlock > searchEndBlock) {
break;
}
const l2BlockProcessedLogs = await getL2BlockProcessedLogs(publicClient, rollupAddress, searchStartBlock);
const l2BlockProcessedLogs = await getL2BlockProcessedLogs(
publicClient,
rollupAddress,
searchStartBlock,
searchEndBlock,
);
if (l2BlockProcessedLogs.length === 0) {
break;
}
Expand All @@ -61,7 +66,7 @@ export async function retrieveBlocks(
retrievedBlocks.push(...newBlocks);
searchStartBlock = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1].blockNumber! + 1n;
expectedNextL2BlockNum += BigInt(newBlocks.length);
} while (blockUntilSynced && searchStartBlock <= currentL1BlockNum);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedBlocks };
}

Expand All @@ -70,36 +75,37 @@ export async function retrieveBlocks(
* @param publicClient - The viem public client to use for transaction retrieval.
* @param contractDeploymentEmitterAddress - The address of the contract deployment emitter contract.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentBlockNumber - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @param blockHashMapping - A mapping from block number to relevant block hash.
* @returns An array of ExtendedContractData and their equivalent L2 Block number along with the next eth block to search from..
*/
export async function retrieveNewContractData(
publicClient: PublicClient,
contractDeploymentEmitterAddress: EthAddress,
blockUntilSynced: boolean,
currentBlockNumber: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
blockHashMapping: { [key: number]: Buffer | undefined },
): Promise<DataRetrieval<[ExtendedContractData[], number]>> {
let retrievedNewContracts: [ExtendedContractData[], number][] = [];
do {
if (searchStartBlock > currentBlockNumber) {
if (searchStartBlock > searchEndBlock) {
break;
}
const contractDataLogs = await getContractDeploymentLogs(
publicClient,
contractDeploymentEmitterAddress,
searchStartBlock,
searchEndBlock,
);
if (contractDataLogs.length === 0) {
break;
}
const newContracts = processContractDeploymentLogs(blockHashMapping, contractDataLogs);
retrievedNewContracts = retrievedNewContracts.concat(newContracts);
searchStartBlock = (contractDataLogs.findLast(cd => !!cd)?.blockNumber || searchStartBlock) + 1n;
} while (blockUntilSynced && searchStartBlock <= currentBlockNumber);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedNewContracts };
}

Expand All @@ -108,28 +114,33 @@ export async function retrieveNewContractData(
* @param publicClient - The viem public client to use for transaction retrieval.
* @param inboxAddress - The address of the inbox contract to fetch messages from.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentBlockNumber - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @returns An array of L1ToL2Message and next eth block to search from.
*/
export async function retrieveNewPendingL1ToL2Messages(
publicClient: PublicClient,
inboxAddress: EthAddress,
blockUntilSynced: boolean,
currentBlockNumber: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
): Promise<DataRetrieval<L1ToL2Message>> {
const retrievedNewL1ToL2Messages: L1ToL2Message[] = [];
do {
if (searchStartBlock > currentBlockNumber) {
if (searchStartBlock > searchEndBlock) {
break;
}
const newL1ToL2MessageLogs = await getPendingL1ToL2MessageLogs(publicClient, inboxAddress, searchStartBlock);
const newL1ToL2MessageLogs = await getPendingL1ToL2MessageLogs(
publicClient,
inboxAddress,
searchStartBlock,
searchEndBlock,
);
const newL1ToL2Messages = processPendingL1ToL2MessageAddedLogs(newL1ToL2MessageLogs);
retrievedNewL1ToL2Messages.push(...newL1ToL2Messages);
// handles the case when there are no new messages:
searchStartBlock = (newL1ToL2MessageLogs.findLast(msgLog => !!msgLog)?.blockNumber || searchStartBlock) + 1n;
} while (blockUntilSynced && searchStartBlock <= currentBlockNumber);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedNewL1ToL2Messages };
}

Expand All @@ -138,32 +149,33 @@ export async function retrieveNewPendingL1ToL2Messages(
* @param publicClient - The viem public client to use for transaction retrieval.
* @param inboxAddress - The address of the inbox contract to fetch messages from.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentBlockNumber - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @returns An array of message keys that were cancelled and next eth block to search from.
*/
export async function retrieveNewCancelledL1ToL2Messages(
publicClient: PublicClient,
inboxAddress: EthAddress,
blockUntilSynced: boolean,
currentBlockNumber: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
): Promise<DataRetrieval<Fr>> {
const retrievedNewCancelledL1ToL2Messages: Fr[] = [];
do {
if (searchStartBlock > currentBlockNumber) {
if (searchStartBlock > searchEndBlock) {
break;
}
const newL1ToL2MessageCancelledLogs = await getL1ToL2MessageCancelledLogs(
publicClient,
inboxAddress,
searchStartBlock,
searchEndBlock,
);
const newCancelledL1ToL2Messages = processCancelledL1ToL2MessagesLogs(newL1ToL2MessageCancelledLogs);
retrievedNewCancelledL1ToL2Messages.push(...newCancelledL1ToL2Messages);
// handles the case when there are no new messages:
searchStartBlock =
(newL1ToL2MessageCancelledLogs.findLast(msgLog => !!msgLog)?.blockNumber || searchStartBlock) + 1n;
} while (blockUntilSynced && searchStartBlock <= currentBlockNumber);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedNewCancelledL1ToL2Messages };
}
Loading