Skip to content

Commit

Permalink
Merge pull request #1998 from eqlabs/sistemd/nonce-updates-interning
Browse files Browse the repository at this point in the history
feat(db): intern contract addresses in nonce_updates table
  • Loading branch information
sistemd authored May 10, 2024
2 parents 14c4f9e + befc3fc commit 4a18125
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 43 deletions.
Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
98 changes: 55 additions & 43 deletions crates/storage/src/connection/state_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ impl Transaction<'_> {
block_number: BlockNumber,
state_update: &StateUpdate,
) -> anyhow::Result<()> {
let mut insert_nonce = self
.inner()
.prepare_cached(
"INSERT INTO nonce_updates (block_number, contract_address, nonce) VALUES (?, ?, \
?)",
)
.context("Preparing nonce insert statement")?;

let mut query_contract_address = self
.inner()
.prepare_cached("SELECT id FROM contract_addresses WHERE contract_address = ?")
Expand All @@ -61,6 +53,14 @@ impl Transaction<'_> {
)
.context("Preparing storage address insert statement")?;

let mut insert_nonce = self
.inner()
.prepare_cached(
"INSERT INTO nonce_updates (block_number, contract_address_id, nonce) VALUES (?, \
?, ?)",
)
.context("Preparing nonce insert statement")?;

let mut insert_storage = self
.inner()
.prepare_cached(
Expand Down Expand Up @@ -91,22 +91,22 @@ impl Transaction<'_> {
.context("Inserting deployed contract")?;
}

let contract_address_id = query_contract_address
.query_map(params![address], |row| row.get::<_, i64>(0))
.context("Querying contract address")?
.next()
.unwrap_or_else(|| {
insert_contract_address.query_row(params![address], |row| row.get::<_, i64>(0))
})
.context("Inserting contract address")?;

if let Some(nonce) = &update.nonce {
insert_nonce
.execute(params![&block_number, address, nonce])
.execute(params![&block_number, &contract_address_id, nonce])
.context("Inserting nonce update")?;
}

for (key, value) in &update.storage {
let contract_address_id = query_contract_address
.query_map(params![address], |row| row.get::<_, i64>(0))
.context("Querying contract address")?
.next()
.unwrap_or_else(|| {
insert_contract_address
.query_row(params![address], |row| row.get::<_, i64>(0))
})
.context("Inserting contract address")?;
let storage_address_id = query_storage_address
.query_map(params![key], |row| row.get::<_, i64>(0))
.context("Querying storage address")?
Expand All @@ -127,16 +127,15 @@ impl Transaction<'_> {
}

for (address, update) in &state_update.system_contract_updates {
let contract_address_id = query_contract_address
.query_map(params![address], |row| row.get::<_, i64>(0))
.context("Querying contract address")?
.next()
.unwrap_or_else(|| {
insert_contract_address.query_row(params![address], |row| row.get::<_, i64>(0))
})
.context("Inserting contract address")?;
for (key, value) in &update.storage {
let contract_address_id = query_contract_address
.query_map(params![address], |row| row.get::<_, i64>(0))
.context("Querying contract address")?
.next()
.unwrap_or_else(|| {
insert_contract_address
.query_row(params![address], |row| row.get::<_, i64>(0))
})
.context("Inserting contract address")?;
let storage_address_id = query_storage_address
.query_map(params![key], |row| row.get::<_, i64>(0))
.context("Querying storage address")?
Expand Down Expand Up @@ -265,7 +264,11 @@ impl Transaction<'_> {
let mut stmt = self
.inner()
.prepare_cached(
"SELECT contract_address, nonce FROM nonce_updates WHERE block_number = ?",
r"
SELECT contract_address, nonce FROM nonce_updates
JOIN contract_addresses ON contract_addresses.id = nonce_updates.contract_address_id
WHERE block_number = ?
",
)
.context("Preparing nonce update query statement")?;

Expand Down Expand Up @@ -626,29 +629,38 @@ impl Transaction<'_> {
match block_id {
BlockId::Latest => {
let mut stmt = self.inner().prepare_cached(
r"SELECT nonce FROM nonce_updates
WHERE contract_address = ?
ORDER BY block_number DESC LIMIT 1",
r"
SELECT nonce FROM nonce_updates
JOIN contract_addresses ON contract_addresses.id = nonce_updates.contract_address_id
WHERE contract_address = ?
ORDER BY block_number DESC LIMIT 1
",
)?;
stmt.query_row(params![&contract_address], |row| row.get_contract_nonce(0))
}
BlockId::Number(number) => {
let mut stmt = self.inner().prepare_cached(
r"SELECT nonce FROM nonce_updates
WHERE contract_address = ? AND block_number <= ?
ORDER BY block_number DESC LIMIT 1",
r"
SELECT nonce FROM nonce_updates
JOIN contract_addresses ON contract_addresses.id = nonce_updates.contract_address_id
WHERE contract_address = ? AND block_number <= ?
ORDER BY block_number DESC LIMIT 1
",
)?;
stmt.query_row(params![&contract_address, &number], |row| {
row.get_contract_nonce(0)
})
}
BlockId::Hash(hash) => {
let mut stmt = self.inner().prepare_cached(
r"SELECT nonce FROM nonce_updates
WHERE contract_address = ? AND block_number <= (
SELECT number FROM canonical_blocks WHERE hash = ?
)
ORDER BY block_number DESC LIMIT 1",
r"
SELECT nonce FROM nonce_updates
JOIN contract_addresses ON contract_addresses.id = nonce_updates.contract_address_id
WHERE contract_address = ? AND block_number <= (
SELECT number FROM canonical_blocks WHERE hash = ?
)
ORDER BY block_number DESC LIMIT 1
",
)?;
stmt.query_row(params![&contract_address, &hash], |row| {
row.get_contract_nonce(0)
Expand Down Expand Up @@ -852,10 +864,10 @@ impl Transaction<'_> {
) -> anyhow::Result<Vec<(ContractAddress, Option<ContractNonce>)>> {
let mut stmt = self.inner().prepare_cached(
r"WITH
updated_nonces(contract_address) AS (
SELECT DISTINCT
contract_address
updated_nonces(contract_address_id, contract_address) AS (
SELECT DISTINCT contract_address_id, contract_address
FROM nonce_updates
JOIN contract_addresses ON contract_addresses.id = nonce_updates.contract_address_id
WHERE
block_number > ?2 AND block_number <= ?1
)
Expand All @@ -865,7 +877,7 @@ impl Transaction<'_> {
SELECT nonce
FROM nonce_updates
WHERE
contract_address=updated_nonces.contract_address AND block_number <= ?2
contract_address_id=updated_nonces.contract_address_id AND block_number <= ?2
ORDER BY block_number DESC
LIMIT 1
) AS old_nonce
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod revision_0055;
mod revision_0056;
mod revision_0057;
mod revision_0058;
mod revision_0059;

pub(crate) use base::base_schema;

Expand All @@ -44,6 +45,7 @@ pub fn migrations() -> &'static [MigrationFn] {
revision_0056::migrate,
revision_0057::migrate,
revision_0058::migrate,
revision_0059::migrate,
]
}

Expand Down
98 changes: 98 additions & 0 deletions crates/storage/src/schema/revision_0059.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::time::Instant;

use anyhow::Context;
use rusqlite::params;

pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
tx.execute_batch(
r"
CREATE TABLE nonce_updates_normalized (
block_number INTEGER REFERENCES canonical_blocks(number) ON DELETE CASCADE,
contract_address_id INTEGER NOT NULL REFERENCES contract_addresses(id),
nonce BLOB NOT NULL
);
",
)
.context("Creating nonce_updates_normalized table")?;

let mut nonce_updates_query = tx
.prepare_cached("SELECT block_number, contract_address, nonce FROM nonce_updates")
.context("Querying nonce_updates")?;
let mut nonce_updates = nonce_updates_query
.query_map([], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, Vec<u8>>(1)?,
row.get::<_, Vec<u8>>(2)?,
))
})
.context("Iterating over nonce_updates")?;
let mut contract_addresses_query = tx
.prepare_cached("SELECT id FROM contract_addresses WHERE contract_address = ?")
.context("Querying contract_addresses")?;
let mut contract_addresses_insert = tx
.prepare_cached("INSERT INTO contract_addresses (contract_address) VALUES (?) RETURNING id")
.context("Inserting into contract_addresses")?;
let mut nonce_updates_insert = tx
.prepare_cached(
"INSERT INTO nonce_updates_normalized (block_number, contract_address_id, nonce) \
VALUES (?, ?, ?)",
)
.context("Inserting into nonce_updates_normalized")?;
let nonce_updates_count = tx
.query_row("SELECT COUNT(*) FROM nonce_updates", [], |row| {
row.get::<_, i64>(0)
})
.context("Counting nonce_updates")?;
let mut last_progress = Instant::now();
let mut i = 0;
loop {
let Some((block_number, contract_address, nonce)) = nonce_updates.next().transpose()?
else {
break;
};
if last_progress.elapsed().as_secs() >= 10 {
tracing::info!(
"Migrating nonce_updates: {:.2}% ({}/{})",
i as f64 / nonce_updates_count as f64 * 100.0,
i,
nonce_updates_count
);
last_progress = Instant::now();
}
let contract_address_id = contract_addresses_query
.query_map(params![&contract_address], |row| row.get::<_, i64>(0))
.context("Querying contract_addresses")?
.next()
.unwrap_or_else(|| {
contract_addresses_insert
.query_row(params![&contract_address], |row| row.get::<_, i64>(0))
})
.context("Inserting into contract_addresses")?;
nonce_updates_insert
.execute(params![block_number, contract_address_id, nonce])
.context("Inserting into nonce_updates_normalized")?;
i += 1;
}

tracing::info!(
"Migrating nonce_updates: {:.2}% ({}/{})",
i as f64 / nonce_updates_count as f64 * 100.0,
i,
nonce_updates_count
);

tracing::info!("Dropping nonce_updates and renaming temporary table, creating indices");

tx.execute_batch(
r"
DROP TABLE nonce_updates;
ALTER TABLE nonce_updates_normalized RENAME TO nonce_updates;
CREATE INDEX nonce_updates_block_number ON nonce_updates(block_number);
CREATE INDEX nonce_updates_contract_address_id_block_number ON nonce_updates(contract_address_id, block_number);
",
)
.context("Dropping nonce_updates and creating indices")?;

Ok(())
}

0 comments on commit 4a18125

Please sign in to comment.