From f05b0aefbb04ce715431bf039b8760e95f87dc93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Grze=C5=9Bkiewicz?= Date: Tue, 18 Jun 2024 17:40:40 +0200 Subject: [PATCH] feat(eth-sender): fix for missing eth_txs_history entries (#2236) This change introduces an invariant "If the transaction may have been sent, then the sent_at_block is set". This way we never remove eth_txs_history entries if the transaction may be included in block. The downside is that we can't distinguish between not sent transaction and one that has not been mined, but you can't have it both. The next step is to remove "send_unsent_transactions" step, but it can't be done in this PR as there might be transactions in db without sent_at_block set --------- Signed-off-by: tomg10 --- ...fc05eaa158a6f38a87187d7f2c2068a0112a.json} | 7 ++-- core/lib/dal/src/eth_sender_dal.rs | 9 ++++- core/node/eth_sender/src/eth_tx_manager.rs | 39 +++++++++---------- 3 files changed, 30 insertions(+), 25 deletions(-) rename core/lib/dal/.sqlx/{query-fe06e06c04466429bb85709e6fe8dd6c2ad2793c06071f4a067dcc31306adebc.json => query-45a968c6d667b13bbe9d895e7734fc05eaa158a6f38a87187d7f2c2068a0112a.json} (60%) diff --git a/core/lib/dal/.sqlx/query-fe06e06c04466429bb85709e6fe8dd6c2ad2793c06071f4a067dcc31306adebc.json b/core/lib/dal/.sqlx/query-45a968c6d667b13bbe9d895e7734fc05eaa158a6f38a87187d7f2c2068a0112a.json similarity index 60% rename from core/lib/dal/.sqlx/query-fe06e06c04466429bb85709e6fe8dd6c2ad2793c06071f4a067dcc31306adebc.json rename to core/lib/dal/.sqlx/query-45a968c6d667b13bbe9d895e7734fc05eaa158a6f38a87187d7f2c2068a0112a.json index 8f0065433012..36da129b5b77 100644 --- a/core/lib/dal/.sqlx/query-fe06e06c04466429bb85709e6fe8dd6c2ad2793c06071f4a067dcc31306adebc.json +++ b/core/lib/dal/.sqlx/query-45a968c6d667b13bbe9d895e7734fc05eaa158a6f38a87187d7f2c2068a0112a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n eth_txs_history (\n eth_tx_id,\n base_fee_per_gas,\n priority_fee_per_gas,\n tx_hash,\n signed_raw_tx,\n created_at,\n updated_at,\n blob_base_fee_per_gas\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW(), NOW(), $6)\n ON CONFLICT (tx_hash) DO NOTHING\n RETURNING\n id\n ", + "query": "\n INSERT INTO\n eth_txs_history (\n eth_tx_id,\n base_fee_per_gas,\n priority_fee_per_gas,\n tx_hash,\n signed_raw_tx,\n created_at,\n updated_at,\n blob_base_fee_per_gas,\n sent_at_block,\n sent_at\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW(), NOW(), $6, $7, NOW())\n ON CONFLICT (tx_hash) DO NOTHING\n RETURNING\n id\n ", "describe": { "columns": [ { @@ -16,12 +16,13 @@ "Int8", "Text", "Bytea", - "Int8" + "Int8", + "Int4" ] }, "nullable": [ false ] }, - "hash": "fe06e06c04466429bb85709e6fe8dd6c2ad2793c06071f4a067dcc31306adebc" + "hash": "45a968c6d667b13bbe9d895e7734fc05eaa158a6f38a87187d7f2c2068a0112a" } diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index ad1e910af12e..d32ed082131e 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -221,6 +221,7 @@ impl EthSenderDal<'_, '_> { Ok(eth_tx.into()) } + #[allow(clippy::too_many_arguments)] pub async fn insert_tx_history( &mut self, eth_tx_id: u32, @@ -229,6 +230,7 @@ impl EthSenderDal<'_, '_> { blob_base_fee_per_gas: Option, tx_hash: H256, raw_signed_tx: &[u8], + sent_at_block: u32, ) -> anyhow::Result> { let priority_fee_per_gas = i64::try_from(priority_fee_per_gas).context("Can't convert u64 to i64")?; @@ -247,10 +249,12 @@ impl EthSenderDal<'_, '_> { signed_raw_tx, created_at, updated_at, - blob_base_fee_per_gas + blob_base_fee_per_gas, + sent_at_block, + sent_at ) VALUES - ($1, $2, $3, $4, $5, NOW(), NOW(), $6) + ($1, $2, $3, $4, $5, NOW(), NOW(), $6, $7, NOW()) ON CONFLICT (tx_hash) DO NOTHING RETURNING id @@ -261,6 +265,7 @@ impl EthSenderDal<'_, '_> { tx_hash, raw_signed_tx, blob_base_fee_per_gas.map(|v| v as i64), + sent_at_block as i32 ) .fetch_optional(self.storage.conn()) .await? diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index ea07248aa813..f635d12bae13 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -190,12 +190,13 @@ impl EthTxManager { blob_base_fee_per_gas, signed_tx.hash, signed_tx.raw_tx.as_ref(), + current_block.0, ) .await .unwrap() { if let Err(error) = self - .send_raw_transaction(storage, tx_history_id, signed_tx.raw_tx, current_block) + .send_raw_transaction(storage, tx_history_id, signed_tx.raw_tx) .await { tracing::warn!( @@ -216,17 +217,9 @@ impl EthTxManager { storage: &mut Connection<'_, Core>, tx_history_id: u32, raw_tx: RawTransactionBytes, - current_block: L1BlockNumber, ) -> Result<(), EthSenderError> { match self.l1_interface.send_raw_tx(raw_tx).await { - Ok(_) => { - storage - .eth_sender_dal() - .set_sent_at_block(tx_history_id, current_block.0) - .await - .unwrap(); - Ok(()) - } + Ok(_) => Ok(()), Err(error) => { // In transient errors, server may have received the transaction // we don't want to loose record about it in case that happens @@ -401,16 +394,22 @@ impl EthTxManager { self.apply_tx_status(storage, ð_tx, tx_status, l1_block_numbers.finalized) .await; - } else if let Err(error) = self - .send_raw_transaction( - storage, - tx.id, - RawTransactionBytes::new_unchecked(tx.signed_raw_tx.clone()), - l1_block_numbers.latest, - ) - .await - { - tracing::warn!("Error sending transaction {tx:?}: {error}"); + } else { + storage + .eth_sender_dal() + .set_sent_at_block(tx.id, l1_block_numbers.latest.0) + .await + .unwrap(); + if let Err(error) = self + .send_raw_transaction( + storage, + tx.id, + RawTransactionBytes::new_unchecked(tx.signed_raw_tx.clone()), + ) + .await + { + tracing::warn!("Error sending transaction {tx:?}: {error}"); + } } } }