Skip to content

Commit

Permalink
feat : updated test
Browse files Browse the repository at this point in the history
  • Loading branch information
ocdbytes committed Aug 29, 2024
1 parent e48f135 commit bf4350e
Show file tree
Hide file tree
Showing 18 changed files with 5,453 additions and 5,019 deletions.
9 changes: 9 additions & 0 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use alloy::primitives::Address;
use alloy::providers::RootProvider;
use std::str::FromStr;
use std::sync::Arc;

use crate::data_storage::aws_s3::config::{AWSS3Config, AWSS3ConfigType, S3LocalStackConfig};
Expand Down Expand Up @@ -172,6 +175,12 @@ pub async fn build_settlement_client(
) -> Box<dyn SettlementClient + Send + Sync> {
match get_env_var_or_panic("SETTLEMENT_LAYER").as_str() {
"ethereum" => Box::new(EthereumSettlementClient::with_settings(settings_provider)),
"ethereum_test" => Box::new(EthereumSettlementClient::with_test_settings(
RootProvider::new_http(get_env_var_or_panic("ETHEREUM_RPC_URL").as_str().parse().unwrap()),
Address::from_str(&get_env_var_or_panic("DEFAULT_L1_CORE_CONTRACT_ADDRESS")).unwrap(),
Url::from_str(get_env_var_or_panic("ETHEREUM_RPC_URL").as_str()).unwrap(),
Some(Address::from_str(get_env_var_or_panic("STARKNET_OPERATOR_ADDRESS").as_str()).unwrap()),
)),
"starknet" => Box::new(StarknetSettlementClient::with_settings(settings_provider).await),
_ => panic!("Unsupported Settlement layer"),
}
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
31 changes: 11 additions & 20 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::ops::{Add, Mul, Rem};
use std::str::FromStr;

use async_trait::async_trait;
use bytes::Buf;
use color_eyre::eyre::WrapErr;
use lazy_static::lazy_static;
use majin_blob_types::state_diffs::DataJson;
use num_bigint::{BigUint, ToBigUint};
use num_traits::{Num, Zero};
use starknet::core::types::{BlockId, FieldElement, MaybePendingStateUpdate, StateUpdate, StorageEntry};
Expand All @@ -18,6 +18,7 @@ use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec;

lazy_static! {
/// EIP-4844 BLS12-381 modulus.
Expand Down Expand Up @@ -102,9 +103,12 @@ impl Job for DaJob {
// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());

// data transformation on the data
let transformed_data = fft_transformation(blob_data_biguint);

store_blob_data(transformed_data.clone(), block_no, config).await?;

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;

Expand Down Expand Up @@ -199,7 +203,7 @@ pub fn convert_to_biguint(elements: Vec<FieldElement>) -> Vec<BigUint> {
biguint_vec
}

fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>>, JobError> {
pub fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>>, JobError> {
// Validate blob size
if blob_size < 32 {
return Err(DaError::InsufficientBlobSize { blob_size })?;
Expand Down Expand Up @@ -269,8 +273,6 @@ pub async fn state_update_to_blob_data(
.await
.wrap_err("Failed to get nonce ".to_string())?;

log::info!("Address : {addr} | Nonce : {get_current_nonce_result}");

nonce = Some(get_current_nonce_result);
}
let da_word = da_word(class_flag.is_some(), nonce, writes.len() as u64);
Expand Down Expand Up @@ -302,29 +304,18 @@ pub async fn state_update_to_blob_data(
blob_data.push(*compiled_class_hash);
}

// saving the blob data of the block to storage client
store_blob_data(blob_data.clone(), block_no, config).await?;

Ok(blob_data)
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: &Config) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let data_blob_big_uint = convert_to_biguint(blob_data.clone());

let blobs_array = data_to_blobs(config.da_client().max_bytes_per_blob().await, data_blob_big_uint)?;

let blob = blobs_array.clone();

// converting Vec<Vec<u8> into Vec<u8>
let blob_vec_u8 = bincode::serialize(&blob)
.wrap_err("Unable to Serialize blobs (Vec<Vec<u8> into Vec<u8>)".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice());

if !blobs_array.is_empty() {
storage_client.put_data(blob_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
if !blob_data_vec_u8.is_empty() {
storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down Expand Up @@ -374,7 +365,7 @@ pub mod test {
use crate::jobs::da_job::da_word;
use std::fs;
use std::fs::File;
use std::io::{Read};
use std::io::Read;
use std::sync::Arc;

use crate::config::config;
Expand Down
11 changes: 7 additions & 4 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{JobError, OtherError};
use crate::config::{config, Config};
use crate::constants::SNOS_OUTPUT_FILE_NAME;
use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY;
use crate::jobs::state_update_job::utils::fetch_blob_data_for_block;
use crate::jobs::state_update_job::utils::{fetch_blob_data_for_block, fetch_program_data_for_block};
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;

Expand Down Expand Up @@ -146,6 +146,8 @@ impl Job for StateUpdateJob {
/// 1. the last settlement tx hash is successful,
/// 2. the expected last settled block from our configuration is indeed the one found in the provider.
async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
log::info!("job item : {:?}", job);

let attempt_no = job
.metadata
.get(JOB_PROCESS_ATTEMPT_METADATA_KEY)
Expand Down Expand Up @@ -280,12 +282,13 @@ impl StateUpdateJob {
unimplemented!("update_state_for_block not implemented as of now for calldata DA.")
} else if snos.use_kzg_da == Felt252::ONE {
let blob_data = fetch_blob_data_for_block(block_no).await.map_err(|e| JobError::Other(OtherError(e)))?;

// TODO : Fetch Program Output from data storage client
let program_output =
fetch_program_data_for_block(block_no).await.map_err(|e| JobError::Other(OtherError(e)))?;
// TODO :
// Fetching nonce before the transaction is run
// Sending update_state transaction from the settlement client
settlement_client
.update_state_with_blobs(vec![], blob_data, nonce)
.update_state_with_blobs(program_output, blob_data, nonce)
.await
.map_err(|e| JobError::Other(OtherError(e)))?
} else {
Expand Down
139 changes: 135 additions & 4 deletions crates/orchestrator/src/jobs/state_update_job/utils.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,147 @@
use std::fmt::Write;
use std::io::{BufRead, Cursor, Read};
use std::str::FromStr;

use crate::config::config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::constants::{BLOB_DATA_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME};
use alloy::primitives::U256;
use color_eyre::eyre::eyre;
use num_bigint::BigUint;

/// Fetching the blob data (stored in remote storage during DA job) for a particular block
pub async fn fetch_blob_data_for_block(block_number: u64) -> color_eyre::Result<Vec<Vec<u8>>> {
let config = config().await;
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let blob_data = storage_client.get_data(&key).await?;
let blob_vec_data: Vec<Vec<u8>> =
bincode::deserialize(&blob_data).expect("Not able to convert Vec<u8> to Vec<Vec<u8>> during deserialization.");
Ok(blob_vec_data)
Ok(vec![blob_data.to_vec()])
}

/// Fetching the blob data (stored in remote storage during DA job) for a particular block
pub async fn fetch_program_data_for_block(block_number: u64) -> color_eyre::Result<Vec<[u8; 32]>> {
let config = config().await;
let storage_client = config.storage();
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
let blob_data = storage_client.get_data(&key).await?;
let transformed_blob_vec_u8 = bytes_to_vec_u8(blob_data.as_ref());
Ok(transformed_blob_vec_u8)
}

fn bytes_to_vec_u8(bytes: &[u8]) -> Vec<[u8; 32]> {
let cursor = Cursor::new(bytes);
let reader = std::io::BufReader::new(cursor);

let mut program_output: Vec<[u8; 32]> = Vec::new();

for line in reader.lines() {
let line = line.expect("can't read line");
let trimmed = line.trim();
assert!(!trimmed.is_empty());

let result = U256::from_str(trimmed).expect("Unable to convert line");
let res_vec = result.to_be_bytes_vec();
let hex = to_padded_hex(res_vec.as_slice());
let vec_hex = hex_string_to_u8_vec(&hex).unwrap();
program_output.push(vec_hex.try_into().unwrap());
}

program_output
}

fn to_padded_hex(slice: &[u8]) -> String {
assert!(slice.len() <= 32, "Slice length must not exceed 32");
let hex = slice.iter().fold(String::new(), |mut output, byte| {
// 0: pads with zeros
// 2: specifies the minimum width (2 characters)
// x: formats the number as lowercase hexadecimal
// writes a byte value as a two-digit hexadecimal number (padded with a leading zero if necessary) to the specified output.
let _ = write!(output, "{byte:02x}");
output
});
format!("{:0<64}", hex)
}

#[cfg(test)]
mod test {
use crate::config::config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::jobs::da_job::fft_transformation;
use crate::jobs::state_update_job::utils::{biguint_to_32_bytes, biguint_vec_to_u8_vec, hex_string_to_u8_vec};
use crate::tests::config::TestConfigBuilder;
use majin_blob_core::blob;
use num_bigint::BigUint;
use rstest::rstest;
use std::fs;

#[rstest]
#[tokio::test]
async fn test_fetch_blob_data_for_block() -> color_eyre::Result<()> {
dotenvy::from_filename("../.env.test").expect("Failed to load the .env file");

TestConfigBuilder::new().build().await;

let res = biguint_to_32_bytes(
&BigUint::parse_bytes(b"32114628705813240320780031224394235025697957640420683367072248844003647429056", 10)
.unwrap(),
);
println!("{:?}", res);

let blob_data = fs::read_to_string(
"/Users/ocdbytes/Karnot/madara-orchestrator/crates/orchestrator/src/tests/jobs/da_job/test_data/test_blob/671070.txt"
)
.unwrap();
let blob_data_vec = hex_string_to_u8_vec(&blob_data).unwrap();

// let fetch_from_s3 = fetch_blob_data_for_block(671070).await.unwrap();

let original_blob_data = majin_blob_types::serde::parse_file_to_blob_data("/Users/ocdbytes/Karnot/madara-orchestrator/crates/orchestrator/src/tests/jobs/da_job/test_data/test_blob/671070.txt");
let recovered_blob_data = blob::recover(original_blob_data.clone());
// println!("recovered_blob_data : {:?}", recovered_blob_data.len());
let fft_blob = fft_transformation(recovered_blob_data);
// println!("{:?}", fft_blob);
let fft_blob_vec_u8 = biguint_vec_to_u8_vec(fft_blob.as_slice());

let key = "671070/".to_string() + BLOB_DATA_FILE_NAME;
let config = config().await;
let storage_client = config.storage();

storage_client.put_data(fft_blob_vec_u8.clone().into(), &key).await.unwrap();

let blob_data = storage_client.get_data(&key).await?;
let blob_vec_data = blob_data.to_vec();

// 131072
// 32114628705813240320780031224394235025697957640420683367072248844003647429056
// 7258306880938333000768807635232118825372104144766057789408335876908913440392
assert_eq!(blob_data_vec, blob_vec_data);

Ok(())
}
}

pub fn biguint_vec_to_u8_vec(nums: &[BigUint]) -> Vec<u8> {
let mut result: Vec<u8> = Vec::new();

for num in nums {
result.extend_from_slice(biguint_to_32_bytes(num).as_slice());
}

result
}

pub fn biguint_to_32_bytes(num: &BigUint) -> [u8; 32] {
let bytes = num.to_bytes_be();
let mut result = [0u8; 32];

if bytes.len() > 32 {
// If we have more than 32 bytes, take only the last 32
result.copy_from_slice(&bytes[bytes.len() - 32..]);
} else {
// If we have 32 or fewer bytes, pad with zeros at the beginning
result[32 - bytes.len()..].copy_from_slice(&bytes);
}

result
}

// Util Functions
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn create_sqs_queues() -> color_eyre::Result<()> {
for queue_url in queue_urls {
match sqs_client.delete_queue().queue_url(queue_url).send().await {
Ok(_) => log::debug!("Successfully deleted queue: {}", queue_url),
Err(_e) => { },
Err(_e) => {}
}
}

Expand Down
Loading

0 comments on commit bf4350e

Please sign in to comment.