Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip POI fetch until cache TTL expires #860

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 25 additions & 55 deletions graph-gateway/src/network/indexer_indexing_poi_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ use url::Url;

use crate::{indexers, indexers::public_poi::Error as PublicPoiFetchError};

/// The default TTL for cache entries is 20 minutes. Entries are considered expired after this time.
pub const DEFAULT_CACHE_TLL: Duration = Duration::from_secs(20 * 60); // 20 minutes

/// The timeout for the indexer indexings' POI resolution.
pub const DEFAULT_INDEXER_INDEXING_POIS_RESOLUTION_TIMEOUT: Duration = Duration::from_secs(5);

/// The number of Public POI queries in a single request.
const POIS_PER_REQUEST_BATCH_SIZE: usize = 10;

Expand All @@ -39,7 +33,8 @@ pub enum ResolutionError {
Timeout,
}

/// A resolver for the Proof of Indexing (POI) of indexers.
/// A resolver for the Proof of Indexing (POI) of indexers. Results are cached for some TTL to avoid
/// making the same request multiple times.
#[allow(clippy::type_complexity)]
pub struct PoiResolver {
client: reqwest::Client,
Expand All @@ -48,27 +43,8 @@ pub struct PoiResolver {
}

impl PoiResolver {
/// Create a new [`PoiResolver`] with the given client.
///
/// The client is used to make requests to indexers. The resolver caches the results of these
/// requests to avoid making the same request multiple times.
///
/// By default, the cache has a TTL of 20 minutes, [`DEFAULT_CACHE_TLL`]. Entries are considered
/// expired after this time causing the resolver to make a new requests to the indexer.
pub fn new(client: reqwest::Client) -> Self {
Self {
client,
timeout: DEFAULT_INDEXER_INDEXING_POIS_RESOLUTION_TIMEOUT,
cache: RwLock::new(TtlHashMap::with_ttl(DEFAULT_CACHE_TLL)),
}
}

/// Create a new [`PoiResolver`] with the given timeout and cache TTL.
pub fn with_timeout_and_cache_ttl(
client: reqwest::Client,
timeout: Duration,
cache_ttl: Duration,
) -> Self {
pub fn new(client: reqwest::Client, timeout: Duration, cache_ttl: Duration) -> Self {
Self {
client,
timeout,
Expand Down Expand Up @@ -149,37 +125,31 @@ impl PoiResolver {
poi_requests: &[(DeploymentId, BlockNumber)],
) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> {
let url_string = url.to_string();
let mut results = self.get_from_cache(&url_string, poi_requests);
let missing_requests: Vec<(DeploymentId, BlockNumber)> = poi_requests
.iter()
.filter(|r| !results.contains_key(r))
.cloned()
.collect();
if missing_requests.is_empty() {
return results;
}

// Fetch the indexings' indexing progress
let fetched = self.fetch_indexer_public_pois(url, poi_requests).await;

// Filter out the failures
let fresh_data = fetched
let fetched: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = self
.fetch_indexer_public_pois(url, &missing_requests)
.await
.into_iter()
.filter_map(|(meta, result)| {
// TODO: Report the errors instead of filtering them out
Some((meta, result.ok()?))
.filter_map(|(key, result)| match result {
Ok(poi) => Some((key, poi)),
Err(poi_fetch_err) => {
tracing::warn!(%poi_fetch_err);
None
}
})
.collect::<HashMap<_, _>>();

// Update the cache with the fetched data, if any
if !fresh_data.is_empty() {
self.update_cache(&url_string, &fresh_data);
}

// Get the cached data for the missing deployments
let cached_info = {
// Get the list of deployments that are missing from the fetched data
let missing_indexings = fresh_data
.keys()
.filter(|meta| !poi_requests.contains(meta));

// Get the cached data for the missing deployments
self.get_from_cache(&url_string, missing_indexings)
};

// Merge the fetched and cached data
cached_info.into_iter().chain(fresh_data).collect()
.collect();
self.update_cache(&url_string, &fetched);
results.extend(fetched);
results
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ fn test_service_state(

if !pois_blocklist.is_empty() {
let pois_blocklist = PoiBlocklist::new(pois_blocklist);
let pois_resolver = PoiResolver::new(indexers_http_client.clone());
let pois_resolver = PoiResolver::new(
indexers_http_client.clone(),
Duration::from_secs(30),
Duration::MAX,
);
state.indexer_indexing_pois_blocklist = Some((pois_resolver, pois_blocklist));
}

Expand Down
10 changes: 4 additions & 6 deletions graph-gateway/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use super::{
CostModelResolver, DEFAULT_INDEXER_INDEXING_COST_MODEL_RESOLUTION_TIMEOUT,
},
indexer_indexing_poi_blocklist::PoiBlocklist,
indexer_indexing_poi_resolver::{
PoiResolver, DEFAULT_INDEXER_INDEXING_POIS_RESOLUTION_TIMEOUT,
},
indexer_indexing_poi_resolver::PoiResolver,
indexer_indexing_progress_resolver::IndexingProgressResolver,
indexer_version_resolver::{VersionResolver, DEFAULT_INDEXER_VERSION_RESOLUTION_TIMEOUT},
internal::{
Expand Down Expand Up @@ -276,10 +274,10 @@ impl NetworkServiceBuilder {
mut self,
blocklist: HashSet<((DeploymentId, BlockNumber), ProofOfIndexing)>,
) -> Self {
let resolver = PoiResolver::with_timeout_and_cache_ttl(
let resolver = PoiResolver::new(
self.indexer_client.clone(),
DEFAULT_INDEXER_INDEXING_POIS_RESOLUTION_TIMEOUT, // 5s
DEFAULT_TTL, // Duration::MAX
Duration::from_secs(5),
Duration::from_secs(20 * 60),
);
let blocklist = PoiBlocklist::new(blocklist);

Expand Down
Loading