Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added a JSON RPC to simulating L1 for consensus attestation (BFT-495) #2480

Merged
merged 16 commits into from
Jul 26, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 12 additions & 24 deletions core/lib/dal/src/consensus_dal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Context as _;
use bigdecimal::Zero;
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{BlockStoreState, ReplicaState};
use zksync_db_connection::{
Expand Down Expand Up @@ -378,9 +377,7 @@ impl ConsensusDal<'_, '_> {
) -> Result<(), InsertCertificateError> {
use InsertCertificateError as E;
let header = &cert.message.proposal;
let mut txn = self.storage.start_transaction().await?;
let want_payload = txn
.consensus_dal()
let want_payload = self
.block_payload(cert.message.proposal.number)
.await?
.ok_or(E::MissingPayload)?;
Expand All @@ -399,9 +396,8 @@ impl ConsensusDal<'_, '_> {
)
.instrument("insert_block_certificate")
.report_latency()
.execute(&mut txn)
.execute(self.storage)
.await?;
txn.commit().await.context("commit")?;
Ok(())
}

Expand All @@ -410,40 +406,31 @@ impl ConsensusDal<'_, '_> {
/// Insertion is allowed even if it creates gaps in the L1 batch history.
///
/// This method assumes that all payload validation has been carried out by the caller.
pub async fn insert_batch_certificate(
&mut self,
cert: &attester::BatchQC,
) -> Result<(), InsertCertificateError> {
let l1_batch_number = cert.message.number.0 as i64;

let res = sqlx::query!(
/// Verification cannot be performed internally, due to circular dependency on
/// `zksync_l1_contract_interface`.
pub async fn insert_batch_certificate(&mut self, cert: &attester::BatchQC) -> DalResult<()> {
sqlx::query!(
r#"
INSERT INTO
l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at)
VALUES
($1, $2, NOW(), NOW())
ON CONFLICT (l1_batch_number) DO NOTHING
"#,
l1_batch_number,
cert.message.number.0 as i64,
zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(),
)
.instrument("insert_batch_certificate")
.report_latency()
.execute(self.storage)
.await?;

if res.rows_affected().is_zero() {
tracing::debug!(l1_batch_number, "duplicate batch certificate");
}
pompon0 marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

/// Gets a number of the last L1 batch that was inserted. It might have gaps before it,
/// depending on the order in which votes have been collected over gossip by consensus.
pub async fn get_last_batch_certificate_number(
&mut self,
) -> DalResult<Option<attester::BatchNumber>> {
) -> anyhow::Result<Option<attester::BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
Expand All @@ -457,9 +444,10 @@ impl ConsensusDal<'_, '_> {
.fetch_one(self.storage)
.await?;

Ok(row
.number
.map(|number| attester::BatchNumber(number as u64)))
let Some(n) = row.number else { return Ok(None) };
Ok(Some(attester::BatchNumber(
n.try_into().context("overflow")?,
)))
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/lib/types/src/api/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ pub struct SyncBlock {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsensusGenesis(pub serde_json::Value);

/// Status of L1 simulated by the main node.
/// Used for testing L1 batch signing by consensus attesters.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimulatedL1Status {
pub next_batch_to_commit: L1BatchNumber,
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
}
7 changes: 7 additions & 0 deletions core/lib/web3_decl/src/namespaces/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ pub trait EnNamespace {
#[method(name = "genesisConfig")]
async fn genesis_config(&self) -> RpcResult<GenesisConfig>;

/// INTERNAL RPC:
/// Gets the status of L1 simulated by the main node.
/// This is a temporary RPC used for testing L1 batch signing
/// by consensus attesters.
#[method(name = "simulated_l1_status")]
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
async fn simulated_l1_status(&self) -> RpcResult<en::SimulatedL1Status>;

/// Get tokens that are white-listed and it can be used by paymasters.
#[method(name = "whitelistedTokensForAA")]
async fn whitelisted_tokens_for_aa(&self) -> RpcResult<Vec<Address>>;
Expand Down
1 change: 1 addition & 0 deletions core/node/api_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ categories.workspace = true

[dependencies]
zksync_config.workspace = true
zksync_consensus_roles.workspace = true
zksync_contracts.workspace = true
zksync_types.workspace = true
zksync_dal.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ impl EnNamespaceServer for EnNamespace {
.map_err(|err| self.current_method().map_err(err))
}

async fn simulated_l1_status(&self) -> RpcResult<en::SimulatedL1Status> {
self.simulated_l1_status_impl()
.await
.map_err(|err| self.current_method().map_err(err))
}

async fn sync_tokens(&self, block_number: Option<L2BlockNumber>) -> RpcResult<Vec<TokenInfo>> {
self.sync_tokens_impl(block_number)
.await
Expand Down
5 changes: 4 additions & 1 deletion core/node/api_server/src/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,10 @@ impl ApiServer {
.context("cannot merge zks namespace")?;
}
if namespaces.contains(&Namespace::En) {
rpc.merge(EnNamespace::new(rpc_state.clone()).into_rpc())
let n = EnNamespace::new(rpc_state.clone())
.await
.context("EnNamespace:::new()")?;
rpc.merge(n.into_rpc())
.context("cannot merge en namespace")?;
}
if namespaces.contains(&Namespace::Snapshots) {
Expand Down
70 changes: 66 additions & 4 deletions core/node/api_server/src/web3/namespaces/en.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Context as _;
use zksync_config::{configs::EcosystemContracts, GenesisConfig};
use zksync_consensus_roles::attester;
use zksync_dal::{CoreDal, DalError};
use zksync_types::{
api::en, protocol_version::ProtocolSemanticVersion, tokens::TokenInfo, Address, L1BatchNumber,
Expand All @@ -14,16 +15,60 @@ use crate::web3::{backend_jsonrpsee::MethodTracer, state::RpcState};
#[derive(Debug)]
pub(crate) struct EnNamespace {
state: RpcState,
/// First batch to commit to L1 simulated by the main node.
/// This is temporary and used only for testing L1 batch signing by consensus attesters.
first_batch_to_commit: L1BatchNumber,
}

fn to_l1_batch_number(n: attester::BatchNumber) -> anyhow::Result<L1BatchNumber> {
Ok(L1BatchNumber(
n.0.try_into().context("L1BatchNumber overflow")?,
))
}

impl EnNamespace {
pub fn new(state: RpcState) -> Self {
Self { state }
pub async fn new(state: RpcState) -> anyhow::Result<Self> {
let first_batch_to_commit = async {
let mut conn = state.acquire_connection().await.context("connection()")?;
// Try to continue from where we left.
if let Some(last) = conn
.consensus_dal()
.get_last_batch_certificate_number()
.await
.context("get_last_batch_certificate_number()")?
{
return to_l1_batch_number(last + 1);
}
// Otherwise start with the next sealed L1 batch.
if let Some(sealed) = conn
.blocks_dal()
.get_sealed_l1_batch_number()
.await
.context("get_sealed_l1_batch_number()")?
{
return Ok(sealed + 1);
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
}
// Otherwise start from the first non-pruned batch.
let info = conn
.pruning_dal()
.get_pruning_info()
.await
.context("get_pruning_info()")?;
Ok(info
.last_soft_pruned_l1_batch
.map(|n| n + 1)
.unwrap_or(L1BatchNumber(0)))
}
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
.await?;
Ok(Self {
state,
first_batch_to_commit,
})
}

pub async fn consensus_genesis_impl(&self) -> Result<Option<en::ConsensusGenesis>, Web3Error> {
let mut storage = self.state.acquire_connection().await?;
let Some(genesis) = storage
let mut conn = self.state.acquire_connection().await?;
let Some(genesis) = conn
.consensus_dal()
.genesis()
.await
Expand All @@ -36,6 +81,23 @@ impl EnNamespace {
)))
}

#[tracing::instrument(skip(self))]
pub async fn simulated_l1_status_impl(&self) -> Result<en::SimulatedL1Status, Web3Error> {
let mut conn = self.state.acquire_connection().await?;
let next_batch_to_commit = match conn
.consensus_dal()
.get_last_batch_certificate_number()
.await
.context("get_last_batch_certificate_number()")?
{
Some(n) => to_l1_batch_number(n + 1)?,
None => self.first_batch_to_commit,
};
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
Ok(en::SimulatedL1Status {
next_batch_to_commit,
})
}

pub(crate) fn current_method(&self) -> &MethodTracer {
&self.state.current_method
}
Expand Down
51 changes: 31 additions & 20 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, time};
use zksync_consensus_crypto::keccak256::Keccak256;
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{self as storage, BatchStoreState};
use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError};
use zksync_dal::{consensus_dal, consensus_dal::Payload, Core, CoreDal, DalError};
use zksync_l1_contract_interface::i_executor::structures::StoredBatchInfo;
use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState};
use zksync_state_keeper::io::common::IoCursor;
Expand Down Expand Up @@ -115,35 +116,26 @@ impl<'a> Connection<'a> {
.await??)
}

/// Wrapper for `consensus_dal().insert_batch_certificate()`.
/// Wrapper for `consensus_dal().insert_batch_certificate()`,
/// which additionally verifies that the batch hash matches the stored batch.
pub async fn insert_batch_certificate(
&mut self,
ctx: &ctx::Ctx,
cert: &attester::BatchQC,
) -> Result<(), InsertCertificateError> {
use crate::storage::consensus_dal::InsertCertificateError as E;

let l1_batch_number = L1BatchNumber(cert.message.number.0 as u32);

let Some(l1_batch) = self
.0
.blocks_dal()
.get_l1_batch_metadata(l1_batch_number)
use consensus_dal::InsertCertificateError as E;
let want_hash = self
.batch_hash(ctx, cert.message.number)
.await
.map_err(E::Dal)?
else {
return Err(E::MissingPayload.into());
};

let l1_batch_info = StoredBatchInfo::from(&l1_batch);

if l1_batch_info.hash().0 != *cert.message.hash.0.as_bytes() {
.wrap("batch_hash()")?
.ok_or(E::MissingPayload)?;
if want_hash != cert.message.hash {
return Err(E::PayloadMismatch.into());
}

Ok(ctx
.wait(self.0.consensus_dal().insert_batch_certificate(cert))
.await??)
.await?
.map_err(E::Dal)?)
}

/// Wrapper for `consensus_dal().replica_state()`.
Expand All @@ -166,6 +158,25 @@ impl<'a> Connection<'a> {
.context("sqlx")?)
}

/// Wrapper for `consensus_dal().batch_hash()`.
pub async fn batch_hash(
&mut self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::BatchHash>> {
let n = L1BatchNumber(number.0.try_into().context("overflow")?);
let Some(meta) = ctx
.wait(self.0.blocks_dal().get_l1_batch_metadata(n))
.await?
.context("sqlx")?
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
else {
return Ok(None);
};
Ok(Some(attester::BatchHash(Keccak256::from_bytes(
StoredBatchInfo::from(&meta).hash().0,
))))
}

/// Wrapper for `blocks_dal().get_l1_batch_metadata()`.
pub async fn batch(
&mut self,
Expand Down
9 changes: 9 additions & 0 deletions core/node/consensus/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ pub enum InsertCertificateError {
Inner(#[from] consensus_dal::InsertCertificateError),
}

impl From<ctx::Error> for InsertCertificateError {
fn from(err: ctx::Error) -> Self {
match err {
ctx::Error::Canceled(err) => Self::Canceled(err),
ctx::Error::Internal(err) => Self::Inner(err.into()),
}
}
}

#[derive(Debug)]
pub(crate) struct PayloadQueue {
inner: IoCursor,
Expand Down
Loading
Loading