Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Nov 16, 2023
1 parent 618de29 commit 17b1623
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions crates/erc20_rpc_pool/src/rpc_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Web3RpcPool {
//last_verified: Option<DateTime<Utc>>,
pub endpoints: Vec<Arc<RwLock<Web3RpcEndpoint>>>,
pub verify_mutex: tokio::sync::Mutex<()>,
pub last_chosen_endpoints: Arc<Mutex<VecDeque<usize>>>,
pub last_success_endpoints: Arc<Mutex<VecDeque<usize>>>,
}

impl Web3RpcPool {
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Web3RpcPool {
chain_id,
endpoints: web3_endpoints,
verify_mutex: tokio::sync::Mutex::new(()),
last_chosen_endpoints: Arc::new(Mutex::new(VecDeque::new())),
last_success_endpoints: Arc::new(Mutex::new(VecDeque::new())),
}
}

Expand Down Expand Up @@ -120,30 +120,30 @@ impl Web3RpcPool {
let mut extra_score = 0;

{
let mut last_chosen_endpoints = self.last_chosen_endpoints.lock().unwrap();
while last_chosen_endpoints.len() > 4 {
last_chosen_endpoints.pop_back();
let mut last_success_endpoints = self.last_success_endpoints.lock().unwrap();
while last_success_endpoints.len() > 4 {
last_success_endpoints.pop_back();
}

if let Some(last_chosen) = last_chosen_endpoints.get(0) {
if let Some(last_chosen) = last_success_endpoints.get(0) {
extra_score_idx = *last_chosen as i64;
extra_score += 10;
}
if let Some(last_chosen) = last_chosen_endpoints.get(1) {
if let Some(last_chosen) = last_success_endpoints.get(1) {
if extra_score_idx == *last_chosen as i64 {
extra_score += 7;
} else {
return (extra_score_idx, extra_score);
}
}
if let Some(last_chosen) = last_chosen_endpoints.get(2) {
if let Some(last_chosen) = last_success_endpoints.get(2) {
if extra_score_idx == *last_chosen as i64 {
extra_score += 5;
} else {
return (extra_score_idx, extra_score);
}
}
if let Some(last_chosen) = last_chosen_endpoints.get(3) {
if let Some(last_chosen) = last_success_endpoints.get(3) {
if extra_score_idx == *last_chosen as i64 {
extra_score += 3;
} else {
Expand All @@ -165,7 +165,7 @@ impl Web3RpcPool {
};
}

let mut end = self
let mut allowed_endpoints = self
.endpoints
.iter()
.enumerate()
Expand All @@ -177,19 +177,16 @@ impl Web3RpcPool {
.map(|(idx, _element)| idx)
.collect::<Vec<usize>>();

end.sort_by_key(|idx| (self.endpoints[*idx].read().unwrap().get_score() * 1000.0) as i64);
end.reverse();
allowed_endpoints
.sort_by_key(|idx| (self.endpoints[*idx].read().unwrap().get_score() * 1000.0) as i64);
allowed_endpoints.reverse();

if let Some(first) = end.first() {
if !allowed_endpoints.is_empty() {
//todo change type system to allow that call

let self_cloned = self.clone();
tokio::task::spawn(self_cloned.verify_unverified_endpoints());
self.last_chosen_endpoints
.lock()
.unwrap()
.push_front(*first);
end
allowed_endpoints
} else {
let self_cloned = self.clone();
let verify_task = tokio::task::spawn(self_cloned.verify_unverified_endpoints());
Expand All @@ -210,7 +207,6 @@ impl Web3RpcPool {
})
.map(|(idx, _element)| idx)
{
self.last_chosen_endpoints.lock().unwrap().push_front(el);
return vec![el];
}

Expand Down Expand Up @@ -254,6 +250,9 @@ impl Web3RpcPool {
.write()
.unwrap()
.web3_rpc_info;

self.last_success_endpoints.lock().unwrap().push_front(idx);

let el = if let Some(entry) = stats.web3_rpc_stats.request_stats.get_mut(&method) {
entry
} else {
Expand Down

0 comments on commit 17b1623

Please sign in to comment.