Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(BOUN-1299): bump ic-agent #51

Merged
merged 8 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
501 changes: 280 additions & 221 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 6 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ http-body = "1.0.1"
http-body-util = "0.1.2"
humantime = "2.1.0"
hyper-util = "0.1.10"
ic-agent = { version = "0.37.1", features = ["reqwest"] }
ic-agent = { version = "0.39.1", features = [
"ring",
"_internal_dynamic-routing",
] }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "526d34d15cfbf369d8baf2dae9932aa18d570a1d" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b0" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b2" }
itertools = "0.13.0"
lazy_static = "1.5.0"
maxminddb = "0.24.0"
mockall = "0.13.0"
moka = { version = "0.12.8", features = ["sync", "future"] }
ocsp-stapler = "0.4.1"
once_cell = "1.20.2"
Expand Down Expand Up @@ -99,6 +101,7 @@ x509-parser = "0.16.0"
zstd = "0.13.2"

[dev-dependencies]
mockall = "0.13.0"
hex-literal = "0.4.1"
hyper = "1.5.0"
criterion = { version = "0.5.1", features = ["async_tokio"] }
Expand All @@ -113,8 +116,3 @@ panic = "abort"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[patch.crates-io]
ic-agent = { package = "ic-agent", git = "https://github.com/dfinity/agent-rs", branch = "dynamic_route", features = [
"reqwest",
] }
19 changes: 17 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,24 @@ pub struct Ic {
#[clap(env, long)]
pub ic_root_key: Option<PathBuf>,

/// Maximum number of request retries for connection failures.
/// Maximum number of request retries for connection failures and HTTP code 429.
/// First attempt is not counted.
#[clap(env, long, default_value = "5")]
pub ic_max_request_retries: u32,
pub ic_request_retries: usize,

/// How long to wait between retries.
/// With each retry this duration will be doubled.
/// E.g. first delay 25ms, next 50ms and so on.
#[clap(env, long, default_value = "25ms", value_parser = parse_duration)]
pub ic_request_retry_interval: Duration,

/// Max request body size to allow from the client
#[clap(env, long, default_value = "10MB", value_parser = parse_size_usize)]
pub ic_request_max_size: usize,

/// Max response size to allow from the IC
#[clap(env, long, default_value = "3MB", value_parser = parse_size_usize)]
pub ic_response_max_size: usize,

/// Disable response verification for the IC requests.
#[clap(env, long)]
Expand Down
7 changes: 3 additions & 4 deletions src/routing/ic/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ic_http_gateway::{CanisterRequest, HttpGatewayClient, HttpGatewayRequestArgs
use crate::routing::{
error_cause::ErrorCause,
ic::{
transport::{Context, CONTEXT},
http_service::{Context, CONTEXT},
IcResponseStatus,
},
middleware::request_id::RequestId,
Expand All @@ -22,13 +22,12 @@ use crate::routing::{

use super::{BNRequestMetadata, BNResponseMetadata};

const MAX_REQUEST_BODY_SIZE: usize = 10 * 1_048_576;

#[derive(derive_new::new)]
pub struct HandlerState {
client: HttpGatewayClient,
verify_response: bool,
body_read_timeout: Duration,
request_max_size: usize,
}

// Main HTTP->IC request handler
Expand All @@ -46,7 +45,7 @@ pub async fn handler(

let (parts, body) = request.into_parts();

let body = buffer_body(body, MAX_REQUEST_BODY_SIZE, state.body_read_timeout).await;
let body = buffer_body(body, state.request_max_size, state.body_read_timeout).await;
let body = match body {
Ok(v) => v,
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion src/routing/ic/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use http::{Method, StatusCode};
use ic_agent::agent::http_transport::dynamic_routing::{
use ic_agent::agent::route_provider::dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderError,
health_check::{HealthCheck, HealthCheckStatus},
node::Node,
Expand Down
110 changes: 110 additions & 0 deletions src/routing/ic/http_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::{cell::RefCell, sync::Arc, time::Duration};

use async_trait::async_trait;
use http::StatusCode;
use ic_agent::{agent::HttpService, AgentError};
use ic_bn_lib::http::Client as HttpClient;
use reqwest::{
header::{HeaderMap, HeaderValue},
Request, Response,
};
use tokio::task_local;

pub struct Context {
pub hostname: Option<String>,
pub headers_in: HeaderMap<HeaderValue>,
pub headers_out: HeaderMap<HeaderValue>,
}

impl Context {
pub fn new() -> RefCell<Self> {
RefCell::new(Self {
hostname: None,
headers_in: HeaderMap::new(),
headers_out: HeaderMap::new(),
})
}
}

task_local! {
pub static CONTEXT: RefCell<Context>;
}

/// Service that executes requests on IC-Agent's behalf
#[derive(Debug, derive_new::new)]
pub struct AgentHttpService {
client: Arc<dyn HttpClient>,
retry_interval: Duration,
}

impl AgentHttpService {
async fn execute(&self, mut request: Request) -> Result<Response, reqwest::Error> {
let read_state = request.url().path().ends_with("/read_state");

// Add HTTP headers if requested
let _ = CONTEXT.try_with(|x| {
let mut ctx = x.borrow_mut();
ctx.hostname = Some(request.url().authority().to_string());

for (k, v) in &ctx.headers_out {
request.headers_mut().insert(k, v.clone());
}
});

let response = self.client.execute(request).await?;

// Add response headers.
// Don't do it for the read_state calls because for a single incoming request
// the agent can do several outgoing requests (e.g. read_state to get keys and then query)
// and we need only one set of response headers.
if !read_state {
let _ = CONTEXT.try_with(|x| {
let mut ctx = x.borrow_mut();

for (k, v) in response.headers() {
ctx.headers_in.insert(k, v.clone());
}
});
}

Ok(response)
}
}

#[async_trait]
impl HttpService for AgentHttpService {
async fn call<'a>(
&'a self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
max_retries: usize,
) -> Result<Response, AgentError> {
let mut retry = 0;
let mut interval = self.retry_interval;

loop {
// TODO should we retry on Agent's request generation failure?
let request = req()?;

match self.execute(request).await {
Ok(v) => {
// Retry only on 429 for now
if v.status() != StatusCode::TOO_MANY_REQUESTS || retry >= max_retries {
return Ok(v);
}
blind-oracle marked this conversation as resolved.
Show resolved Hide resolved
}

Err(e) => {
// Don't retry on any errors except connect for now
if !e.is_connect() || retry >= max_retries {
return Err(AgentError::TransportError(e));
}
}
}

// Wait & backoff
tokio::time::sleep(interval).await;
retry += 1;
interval *= 2;
}
}
}
28 changes: 17 additions & 11 deletions src/routing/ic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@

pub mod handler;
pub mod health_check;
pub mod http_service;
pub mod nodes_fetcher;
pub mod route_provider;
pub mod transport;

use std::{fs, sync::Arc};

use anyhow::{Context, Error};
use http::{header::HeaderName, HeaderMap};
use http_body_util::Either;
use ic_agent::agent::http_transport::route_provider::RouteProvider;
use ic_agent::agent::route_provider::RouteProvider;
use ic_bn_lib::http::{
headers::{
X_IC_CACHE_BYPASS_REASON, X_IC_CACHE_STATUS, X_IC_CANISTER_ID_CBOR, X_IC_ERROR_CAUSE,
Expand All @@ -21,7 +21,9 @@ use ic_bn_lib::http::{
},
Client as HttpClient,
};
use ic_http_gateway::{HttpGatewayClient, HttpGatewayResponse, HttpGatewayResponseMetadata};
use ic_http_gateway::{
HttpGatewayClient, HttpGatewayClientBuilder, HttpGatewayResponse, HttpGatewayResponseMetadata,
};

use crate::Cli;

Expand Down Expand Up @@ -99,25 +101,29 @@ pub fn setup(
http_client: Arc<dyn HttpClient>,
route_provider: Arc<dyn RouteProvider>,
) -> Result<HttpGatewayClient, Error> {
let transport = transport::ReqwestTransport::create_with_client_route(
route_provider,
let http_service = Arc::new(http_service::AgentHttpService::new(
http_client,
cli.ic.ic_max_request_retries,
);
cli.ic.ic_request_retry_interval,
));

let agent = ic_agent::Agent::builder()
.with_transport(transport)
.with_arc_http_middleware(http_service)
.with_max_response_body_size(cli.ic.ic_response_max_size)
.with_max_tcp_error_retries(cli.ic.ic_request_retries)
.with_arc_route_provider(route_provider)
.with_verify_query_signatures(cli.ic.ic_enable_replica_signed_queries)
.build()?;
.build()
.context("unable to build Agent")?;

if let Some(v) = &cli.ic.ic_root_key {
let key = fs::read(v).context("unable to read IC root key")?;
agent.set_root_key(key);
}

let client = ic_http_gateway::HttpGatewayClientBuilder::new()
let client = HttpGatewayClientBuilder::new()
.with_agent(agent)
.build()?;
.build()
.context("unable to build HTTP gateway client")?;

Ok(client)
}
19 changes: 7 additions & 12 deletions src/routing/ic/nodes_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use async_trait::async_trait;
use candid::Principal;
use ic_agent::{
agent::http_transport::{
dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderError, node::Node, nodes_fetch::Fetch,
},
ReqwestTransport,
agent::route_provider::dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderError, node::Node, nodes_fetch::Fetch,
},
Agent,
};
Expand Down Expand Up @@ -39,23 +36,20 @@ impl NodesFetcher {
#[async_trait]
impl Fetch for NodesFetcher {
async fn fetch(&self, url: Url) -> Result<Vec<Node>, DynamicRouteProviderError> {
let transport = ReqwestTransport::create_with_client(url, self.http_client.clone())
.map_err(|err| {
DynamicRouteProviderError::NodesFetchError(format!(
"Failed to build transport: {err}"
))
})?;
let agent = Agent::builder()
.with_transport(transport)
.with_http_client(self.http_client.clone())
.with_url(url)
.build()
.map_err(|err| {
DynamicRouteProviderError::NodesFetchError(format!(
"Failed to build the agent: {err}"
))
})?;

if let Some(key) = self.root_key.clone() {
agent.set_root_key(key);
}

let api_bns = agent
.fetch_api_boundary_nodes_by_subnet_id(self.subnet_id)
.await
Expand All @@ -64,6 +58,7 @@ impl Fetch for NodesFetcher {
"Failed to fetch API nodes: {err}"
))
})?;

// If some API BNs have invalid domain names, they are discarded.
let nodes = api_bns
.iter()
Expand Down
4 changes: 2 additions & 2 deletions src/routing/ic/route_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use anyhow::anyhow;
use candid::Principal;
use ic_agent::agent::http_transport::reqwest_transport::reqwest::Client as AgentClient;
use ic_agent::agent::http_transport::{
use ic_agent::agent::route_provider::{
dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
snapshot::latency_based_routing::LatencyRoutingSnapshot,
},
route_provider::{RoundRobinRouteProvider, RouteProvider},
RoundRobinRouteProvider, RouteProvider,
};
use tracing::info;
use url::Url;
Expand Down
Loading