Skip to content

Commit

Permalink
working on plans
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Nov 15, 2023
1 parent 3d5e2df commit ecb7b88
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 0 deletions.
1 change: 1 addition & 0 deletions crates/web3_test_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ structopt = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
uuid = { workspace = true }
snafu = "0.7.5"
83 changes: 83 additions & 0 deletions crates/web3_test_proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod error;
mod frontend;
mod problems;
mod plan;

extern crate core;

Expand All @@ -23,6 +24,7 @@ use structopt::StructOpt;
use crate::frontend::{frontend_serve, redirect_to_frontend};
use crate::problems::EndpointSimulateProblems;
use tokio::sync::Mutex;
use crate::plan::{ProblemProject, SortedProblemIterator};

#[derive(Debug, StructOpt, Clone)]
pub struct CliOptions {
Expand Down Expand Up @@ -63,6 +65,13 @@ pub struct CliOptions {
default_value = "10000"
)]
pub request_queue_size: usize,


#[structopt(
long = "problem-plan",
help = "Predefined schedule of problems",
)]
pub problem_plan: Option<String>,
}
macro_rules! return_on_error_json {
( $e:expr ) => {
Expand Down Expand Up @@ -610,6 +619,7 @@ pub async fn get_call(req: HttpRequest, server_data: Data<Box<ServerData>>) -> i
}))
}


async fn main_internal() -> Result<(), Web3ProxyError> {
if let Err(err) = dotenv::dotenv() {
log::error!("Cannot load .env file: {err}");
Expand All @@ -624,6 +634,75 @@ async fn main_internal() -> Result<(), Web3ProxyError> {
})),
}));

let server_data_ = server_data.clone();

let exit_cnd = Arc::new(std::sync::atomic::AtomicBool::new(false));
let exit_cnd_ = exit_cnd.clone();

#[allow(clippy::manual_map)]
let fut = if let Some(problem_plan) = cli.problem_plan.clone() {
Some(tokio::task::spawn(
async move {
let str = std::fs::read(problem_plan).expect("Cannot read problem plan");
let problem_plan: ProblemProject = serde_json::from_slice(&str).expect("Cannot parse problem plan");

let frame_interval = Duration::from_secs_f64(problem_plan.frame_interval);
let mut problem_project = SortedProblemIterator::from_problem_project(problem_plan);

let mut last_time = Instant::now();
let mut frame_no = 0;
loop {
if exit_cnd_.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
let server_data = server_data_.clone();

loop {
let sleep_time = frame_interval.as_secs_f64() - last_time.elapsed().as_secs_f64();
let sleep_time = sleep_time.min(0.1);
if frame_no > 0 && sleep_time > 0.0 {
tokio::time::sleep(Duration::from_secs_f64(sleep_time)).await;
} else {
break;
}
}

while let Some(problem_entry) = problem_project.get_next_entry(frame_no) {
let mut shared_data = server_data.shared_data.lock().await;
for key in &problem_entry.keys {


let key_data = match shared_data.keys.get_mut(key) {
Some(key_data) => key_data,
None => {
shared_data.keys.insert(key.to_string(), KeyData {
key: key.to_string(),
value: "1".to_string(),
total_calls: 0,
total_requests: 0,
calls: VecDeque::new(),
problems: EndpointSimulateProblems::default(),
});
shared_data.keys.get_mut(key).unwrap()
}
};

key_data.problems.apply_change(&problem_entry.values);
log::info!("Applied change for key: {}, frame: {}", key, frame_no);
}
}


frame_no += 1;
last_time = Instant::now();
}
}
))
} else {
None
};


let server = HttpServer::new(move || {
let cors = actix_cors::Cors::default()
.allow_any_origin()
Expand Down Expand Up @@ -678,6 +757,10 @@ async fn main_internal() -> Result<(), Web3ProxyError> {

server.await.unwrap();

if let Some(fut) = fut {
exit_cnd.store(true, std::sync::atomic::Ordering::Relaxed);
fut.await.unwrap();
}
println!("Hello, world!");
Ok(())
}
Expand Down
88 changes: 88 additions & 0 deletions crates/web3_test_proxy/src/plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use crate::problems::ValuesChangeOptions;



#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProblemEntry {
frames: Vec<u64>,
keys: Vec<String>,
values: ValuesChangeOptions,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProblemProject {
name : String,
plan_type: String,
pub frame_interval: f64,
entries: Vec<ProblemEntry>
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SortedProblemIterator {
sorted_entries: Vec<SortedProblemEntry>,
current_index: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SortedProblemEntry {
frame: u64,
pub keys: Vec<String>,
pub values: ValuesChangeOptions,
}

impl SortedProblemIterator {

// sort problems by frame
pub fn from_problem_project(problem_project: ProblemProject) -> SortedProblemIterator {
let sorted_entries: Vec<SortedProblemEntry> = problem_project.entries
.iter()
.flat_map(|entry| { // Use flat_map to handle nested structure
entry.frames.iter().map(move |&frame| { // Iterate over frames and map to SortedProblemEntry
SortedProblemEntry {
frame,
keys: entry.keys.clone(),
values: entry.values.clone(),
}
})
})
.collect(); // Collect into a Vec

// Sort the entries by frame.
let mut sorted_entries = sorted_entries;
sorted_entries.sort_by_key(|e| e.frame);

let mut check_for_conflict = HashSet::<(String, u64)>::new();

// Check for conflicts
for entry in &sorted_entries {
for key in &entry.keys {
let key_frame = (key.clone(), entry.frame);
if check_for_conflict.contains(&key_frame) {
panic!("Duplicate key frame: {:?}", key_frame);
}
check_for_conflict.insert(key_frame);
}
}

SortedProblemIterator{
sorted_entries,
current_index: 0,
}
}

pub fn get_next_entry(&mut self, current_frame: u64) -> Option<SortedProblemEntry> {
if let Some(problem_entry) = self.sorted_entries.get(self.current_index) {
if problem_entry.frame <= current_frame {
self.current_index += 1;
return Some(problem_entry.clone())
}
None
} else {
None
}
}
}
46 changes: 46 additions & 0 deletions crates/web3_test_proxy/src/problems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ pub struct EndpointSimulateProblems {
pub allow_only_single_calls: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ValuesChangeOptions {
pub timeout_chance: Option<f64>,
pub min_timeout_ms: Option<f64>,
pub max_timeout_ms: Option<f64>,
pub error_chance: Option<f64>,
pub malformed_response_chance: Option<f64>,
pub skip_sending_raw_transaction_chance: Option<f64>,
pub send_transaction_but_report_failure_chance: Option<f64>,
pub allow_only_parsed_calls: Option<bool>,
pub allow_only_single_calls: Option<bool>,
}

impl Default for EndpointSimulateProblems {
fn default() -> Self {
Self {
Expand All @@ -29,3 +43,35 @@ impl Default for EndpointSimulateProblems {
}
}
}

impl EndpointSimulateProblems {
pub fn apply_change(&mut self, change: &ValuesChangeOptions) {
if let Some(timeout_chance) = change.timeout_chance {
self.timeout_chance = timeout_chance;
}
if let Some(min_timeout_ms) = change.min_timeout_ms {
self.min_timeout_ms = min_timeout_ms;
}
if let Some(max_timeout_ms) = change.max_timeout_ms {
self.max_timeout_ms = max_timeout_ms;
}
if let Some(error_chance) = change.error_chance {
self.error_chance = error_chance;
}
if let Some(malformed_response_chance) = change.malformed_response_chance {
self.malformed_response_chance = malformed_response_chance;
}
if let Some(skip_sending_raw_transaction_chance) = change.skip_sending_raw_transaction_chance {
self.skip_sending_raw_transaction_chance = skip_sending_raw_transaction_chance;
}
if let Some(send_transaction_but_report_failure_chance) = change.send_transaction_but_report_failure_chance {
self.send_transaction_but_report_failure_chance = send_transaction_but_report_failure_chance;
}
if let Some(allow_only_parsed_calls) = change.allow_only_parsed_calls {
self.allow_only_parsed_calls = allow_only_parsed_calls;
}
if let Some(allow_only_single_calls) = change.allow_only_single_calls {
self.allow_only_single_calls = allow_only_single_calls;
}
}
}

0 comments on commit ecb7b88

Please sign in to comment.