Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feat: cached provider PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
gakonst committed Feb 22, 2022
1 parent 3effda2 commit 84c99a2
Showing 1 changed file with 101 additions and 2 deletions.
103 changes: 101 additions & 2 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,17 @@ use thiserror::Error;
use url::{ParseError, Url};

use futures_util::{lock::Mutex, try_join};
use std::{convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::BTreeMap,
convert::TryFrom,
fmt::Debug,
fs::File,
io::{BufReader, BufWriter},
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex as SyncMutex},
time::Duration,
};
use tracing::trace;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -83,12 +93,77 @@ pub struct Provider<P> {
ens: Option<Address>,
interval: Option<Duration>,
from: Option<Address>,
cache: Option<Cache>,
/// Node client hasn't been checked yet = `None`
/// Unsupported node client = `Some(None)`
/// Supported node client = `Some(Some(NodeClient))`
_node_client: Arc<Mutex<Option<NodeClient>>>,
}

// ~/cache/forge-requests-cache/chain_id.json
#[derive(Clone, Debug, Default)]
struct Cache {
path: PathBuf,
// serialized request / response pair
requests: Arc<SyncMutex<BTreeMap<String, String>>>,
}

#[derive(Serialize, Deserialize)]
struct CachedRequest<'a, T> {
method: &'a str,
params: T,
}

impl Cache {
fn new(path: PathBuf) -> Result<Self, ProviderError> {
// try to read the already existing requests
let reader =
BufReader::new(File::options().write(true).read(true).create(true).open(&path)?);
let requests = serde_json::from_reader(reader).unwrap_or_default();
Ok(Self { path, requests: Arc::new(SyncMutex::new(requests)) })
}

pub fn get<T: Serialize, R: DeserializeOwned>(
&self,
method: &str,
params: &T,
) -> Result<Option<R>, ProviderError> {
let key = serde_json::to_string(&CachedRequest { method, params })?;
let requests = self.requests.lock().unwrap();
let value = requests.get(&key);
value.map(|x| serde_json::from_str(x).map_err(ProviderError::SerdeJson)).transpose()
}

pub fn set<T: Serialize, R: Serialize>(
&self,
method: &str,
params: T,
response: R,
) -> Result<(), ProviderError> {
let key = serde_json::to_string(&CachedRequest { method, params })?;
let value = serde_json::to_string(&response)?;
self.requests.lock().unwrap().insert(key, value);
Ok(())
}
}

impl Drop for Cache {
fn drop(&mut self) {
let file = match File::options().write(true).read(true).create(true).open(&self.path) {
Ok(inner) => BufWriter::new(inner),
Err(err) => {
tracing::error!("could not open cache file {}", err);
return
}
};

// overwrite the cache
if let Err(err) = serde_json::to_writer(file, &(*self.requests.lock().unwrap())) {
tracing::error!("could not write to cache file {}", err);
};
}
}

impl<P> AsRef<P> for Provider<P> {
fn as_ref(&self) -> &P {
&self.inner
Expand Down Expand Up @@ -132,6 +207,9 @@ pub enum ProviderError {

#[error("Attempted to sign a transaction with no available signer. Hint: did you mean to use a SignerMiddleware?")]
SignerUnavailable,

#[error(transparent)]
Io(#[from] std::io::Error),
}

/// Types of filters supported by the JSON-RPC.
Expand All @@ -157,6 +235,7 @@ impl<P: JsonRpcClient> Provider<P> {
interval: None,
from: None,
_node_client: Arc::new(Mutex::new(None)),
cache: None,
}
}

Expand Down Expand Up @@ -193,9 +272,22 @@ impl<P: JsonRpcClient> Provider<P> {
tracing::trace_span!("rpc", method = method, params = ?serde_json::to_string(&params)?);
// https://docs.rs/tracing/0.1.22/tracing/span/struct.Span.html#in-asynchronous-code
let res = async move {
// if there's a cache hit, return it
if let Some(ref cache) = self.cache {
if let Some(res) = cache.get(method, &params)? {
return Ok(res)
}
}

trace!("tx");
let res: R = self.inner.request(method, params).await.map_err(Into::into)?;
let res: R = self.inner.request(method, &params).await.map_err(Into::into)?;
trace!(rx = ?serde_json::to_string(&res)?);

// save the response if there was a cache set
if let Some(ref cache) = self.cache {
cache.set(method, params, &res)?;
}

Ok::<_, ProviderError>(res)
}
.instrument(span)
Expand Down Expand Up @@ -1176,6 +1268,13 @@ impl<P: JsonRpcClient> Provider<P> {
self
}

#[must_use]
/// Sets the provider's cache to avoid making redundant network requests.
pub fn with_cache(mut self, cache: PathBuf) -> Self {
self.cache = Some(Cache::new(cache).unwrap());
self
}

/// Gets the polling interval which the provider currently uses for event filters
/// and pending transactions (default: 7 seconds)
pub fn get_interval(&self) -> Duration {
Expand Down

0 comments on commit 84c99a2

Please sign in to comment.