Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Optimize SQL queries in pull_missing_blocks #1458

Merged
merged 2 commits into from
Jan 7, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions packages/pipeline/src/scripts/pull_missing_blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,56 @@ import { handleError, INFURA_ROOT_URL } from '../utils';
// Number of blocks to save at once.
const BATCH_SAVE_SIZE = 1000;
// Maximum number of requests to send at once.
const MAX_CONCURRENCY = 10;
const MAX_CONCURRENCY = 20;
Copy link
Contributor Author

@albrow albrow Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also increased the maximum number of concurrent requests. I've been monitoring memory and CPU usage and this should speed up the script without taxing resources too much. We can adjust further in the future.

// Maximum number of blocks to query for at once. This is also the maximum
// number of blocks we will hold in memory prior to being saved to the database.
const MAX_BLOCKS_PER_QUERY = 1000;

let connection: Connection;

const tablesWithMissingBlocks = [
'raw.exchange_fill_events',
'raw.exchange_cancel_events',
'raw.exchange_cancel_up_to_events',
'raw.erc20_approval_events',
];

(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
const provider = web3Factory.getRpcProvider({
rpcUrl: INFURA_ROOT_URL,
});
const web3Source = new Web3Source(provider);
await getAllMissingBlocksAsync(web3Source);
for (const tableName of tablesWithMissingBlocks) {
await getAllMissingBlocksAsync(web3Source, tableName);
}
process.exit(0);
})().catch(handleError);

interface MissingBlocksResponse {
block_number: string;
}

async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise<void> {
async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> {
const blocksRepository = connection.getRepository(Block);
while (true) {
const blockNumbers = await getMissingBlockNumbersAsync();
console.log(`Checking for missing blocks in ${tableName}...`);
const blockNumbers = await getMissingBlockNumbersAsync(tableName);
if (blockNumbers.length === 0) {
// There are no more missing blocks. We're done.
break;
}
await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers);
}
const totalBlocks = await blocksRepository.count();
console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`);
console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`);
}

async function getMissingBlockNumbersAsync(): Promise<number[]> {
// Note(albrow): The easiest way to get all the blocks we need is to
// consider all the events tables together in a single query. If this query
// gets too slow, we should consider re-architecting so that we can work on
// getting the blocks for one type of event at a time.
async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> {
// This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers
// which are present in `tableName` but not in `raw.blocks`.
const response = (await connection.query(
`WITH all_events AS (
SELECT block_number FROM raw.exchange_fill_events
UNION SELECT block_number FROM raw.exchange_cancel_events
UNION SELECT block_number FROM raw.exchange_cancel_up_to_events
UNION SELECT block_number FROM raw.erc20_approval_events
)
SELECT DISTINCT(block_number) FROM all_events
WHERE block_number NOT IN (SELECT number FROM raw.blocks)
ORDER BY block_number ASC LIMIT $1`,
`SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`,
[MAX_BLOCKS_PER_QUERY],
)) as MissingBlocksResponse[];
const blockNumberStrings = R.pluck('block_number', response);
Expand All @@ -86,4 +86,5 @@ async function getAndSaveBlocksAsync(
const blocks = R.map(parseBlock, rawBlocks);
console.log(`Saving ${blocks.length} blocks...`);
await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) });
console.log('Done saving this batch of blocks');
}