Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

race on gateway #403

Merged
merged 6 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions iroh-gateway/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Args {
#[clap(long)]
pub cfg: Option<PathBuf>,
#[clap(long)]
denylist: bool,
use_denylist: bool,
}

impl Args {
Expand All @@ -39,7 +39,7 @@ impl Args {
if let Some(cache) = self.cache {
map.insert("cache", cache.to_string());
}
map.insert("denylist", self.denylist.to_string());
map.insert("use_denylist", self.use_denylist.to_string());
map.insert("metrics.collect", self.metrics.to_string());
map.insert("metrics.tracing", self.tracing.to_string());
map
Expand Down
24 changes: 19 additions & 5 deletions iroh-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ pub struct Config {
/// default port to listen on
pub port: u16,
/// flag to toggle whether the gateway should use denylist on requests
pub denylist: bool,
pub use_denylist: bool,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heads up @Arqu, I've added a use_ prefix to avoid using up the denylist configuration key on a boolean flag.

/// URL of gateways to be used by the racing resolver.
/// strings can either be urls or subdomain gateway roots
/// values without https:// prefix are treated as subdomain gateways (eg: dweb.link)
/// values with are treated as IPFS path gateways (eg: https://ipfs.io)
pub http_resolvers: Option<Vec<String>>,
/// rpc addresses for the gateway & addresses for the rpc client to dial
pub rpc_client: RpcClientConfig,
/// metrics configuration
Expand All @@ -46,8 +51,9 @@ impl Config {
headers: HeaderMap::new(),
port,
rpc_client,
http_resolvers: None,
metrics: MetricsConfig::default(),
denylist: false,
use_denylist: false,
}
}

Expand Down Expand Up @@ -113,8 +119,9 @@ impl Default for Config {
headers: HeaderMap::new(),
port: DEFAULT_PORT,
rpc_client,
http_resolvers: None,
metrics: MetricsConfig::default(),
denylist: false,
use_denylist: false,
};
t.set_default_headers();
t
Expand All @@ -130,14 +137,18 @@ impl Source for Config {
let rpc_client = self.rpc_client.collect()?;
let mut map: Map<String, Value> = Map::new();
insert_into_config_map(&mut map, "public_url_base", self.public_url_base.clone());
insert_into_config_map(&mut map, "denylist", self.denylist);
insert_into_config_map(&mut map, "use_denylist", self.use_denylist);
// Some issue between deserializing u64 & u16, converting this to
// an signed int fixes the issue
insert_into_config_map(&mut map, "port", self.port as i32);
insert_into_config_map(&mut map, "headers", collect_headers(&self.headers)?);
insert_into_config_map(&mut map, "rpc_client", rpc_client);
let metrics = self.metrics.collect()?;
insert_into_config_map(&mut map, "metrics", metrics);

if let Some(http_resolvers) = &self.http_resolvers {
insert_into_config_map(&mut map, "http_resolvers", http_resolvers.clone());
}
Ok(map)
}
}
Expand Down Expand Up @@ -200,7 +211,10 @@ mod tests {
Value::new(None, default.public_url_base.clone()),
);
expect.insert("port".to_string(), Value::new(None, default.port as i64));
expect.insert("denylist".to_string(), Value::new(None, default.denylist));
expect.insert(
"use_denylist".to_string(),
Value::new(None, default.use_denylist),
);
expect.insert(
"headers".to_string(),
Value::new(None, collect_headers(&default.headers).unwrap()),
Expand Down
9 changes: 7 additions & 2 deletions iroh-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use iroh_gateway::{
core::Core,
metrics,
};
use iroh_resolver::racing::RacingLoader;
use iroh_rpc_client::Client as RpcClient;
use iroh_util::lock::ProgramLock;
use iroh_util::{iroh_config_path, make_config};
Expand Down Expand Up @@ -38,14 +39,18 @@ async fn main() -> Result<()> {
println!("{:#?}", config);

let metrics_config = config.metrics.clone();
let bad_bits = match config.denylist {
let bad_bits = match config.use_denylist {
true => Arc::new(Some(RwLock::new(BadBits::new()))),
false => Arc::new(None),
};
let rpc_addr = config
.server_rpc_addr()?
.ok_or_else(|| anyhow!("missing gateway rpc addr"))?;
let content_loader = RpcClient::new(config.rpc_client.clone()).await?;

let content_loader = RacingLoader::new(
RpcClient::new(config.rpc_client.clone()).await?,
config.http_resolvers.clone().unwrap_or_default(),
);
let handler = Core::new(
Arc::new(config),
rpc_addr,
Expand Down
9 changes: 0 additions & 9 deletions iroh-one/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub struct Config {
/// Path for the UDS socket for the gateway.
#[cfg(feature = "uds-gateway")]
pub gateway_uds_path: Option<PathBuf>,
/// URL of the gateway used by the racing resolver.
pub resolver_gateway: Option<String>,
/// Gateway specific configuration.
pub gateway: iroh_gateway::config::Config,
/// Store specific configuration.
Expand All @@ -46,7 +44,6 @@ impl Config {
p2p: iroh_p2p::config::Config,
rpc_client: RpcClientConfig,
#[cfg(feature = "uds-gateway")] gateway_uds_path: Option<PathBuf>,
resolver_gateway: Option<String>,
) -> Self {
Self {
gateway,
Expand All @@ -56,7 +53,6 @@ impl Config {
metrics: MetricsConfig::default(),
#[cfg(feature = "uds-gateway")]
gateway_uds_path,
resolver_gateway,
}
}

Expand Down Expand Up @@ -111,7 +107,6 @@ impl Default for Config {
p2p: default_p2p_config(rpc_client, metrics_config, key_store_path),
#[cfg(feature = "uds-gateway")]
gateway_uds_path: Some(gateway_uds_path),
resolver_gateway: None,
}
}
}
Expand Down Expand Up @@ -163,10 +158,6 @@ impl Source for Config {
uds_path.to_str().unwrap().to_string(),
);
}
if let Some(resolver_gateway) = &self.resolver_gateway {
insert_into_config_map(&mut map, "resolver_gateway", resolver_gateway.clone());
}

Ok(map)
}
}
Expand Down
1 change: 0 additions & 1 deletion iroh-one/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod cli;
pub mod config;
pub mod content_loader;
pub mod mem_p2p;
pub mod mem_store;
#[cfg(feature = "uds-gateway")]
Expand Down
8 changes: 4 additions & 4 deletions iroh-one/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroh_one::{
cli::Args,
config::{Config, CONFIG_FILE_NAME, ENV_PREFIX},
};
use iroh_resolver::racing::RacingLoader;
use iroh_rpc_client::Client as RpcClient;
use iroh_rpc_types::Addr;
use iroh_util::lock::ProgramLock;
Expand Down Expand Up @@ -70,15 +71,14 @@ async fn main() -> Result<()> {
.server_rpc_addr()?
.ok_or_else(|| anyhow!("missing gateway rpc addr"))?;

let bad_bits = match config.gateway.denylist {
let bad_bits = match config.gateway.use_denylist {
true => Arc::new(Some(RwLock::new(BadBits::new()))),
false => Arc::new(None),
};

// let content_loader = RpcClient::new(config.rpc_client.clone()).await?;
let content_loader = iroh_one::content_loader::RacingLoader::new(
let content_loader = RacingLoader::new(
RpcClient::new(config.rpc_client.clone()).await?,
config.resolver_gateway.clone(),
config.gateway.http_resolvers.clone().unwrap_or_default(),
);
let shared_state = Core::make_state(
Arc::new(config.clone()),
Expand Down
2 changes: 2 additions & 0 deletions iroh-resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ once_cell = "1.13.0"
tokio-util = { version = "0.7", features = ["io"] }
libp2p = { version = "0.50", default-features = false }
async-channel = "1.7.1"
reqwest = {version = "0.11", features = ["rustls-tls"], default-features = false}
rand = "0.8.5"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["async_tokio"] }
Expand Down
1 change: 1 addition & 0 deletions iroh-resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod balanced_tree;
pub mod chunker;
pub mod codecs;
pub mod hamt;
pub mod racing;
pub mod resolver;
pub mod unixfs;
pub mod unixfs_builder;
Expand Down
34 changes: 22 additions & 12 deletions iroh-one/src/content_loader.rs → iroh-resolver/src/racing.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
//! A content loader implementation for iroh-one.

use crate::resolver::{
parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE,
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use cid::{multibase, Cid};
use futures::{future::FutureExt, pin_mut, select};
use iroh_resolver::resolver::{
parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE,
};
use iroh_rpc_client::Client as RpcClient;
use rand::seq::SliceRandom;
use tracing::{debug, error, trace, warn};

#[derive(Clone, Debug)]
pub struct RacingLoader {
rpc_client: RpcClient,
gateway: Option<String>,
http_resolvers: Vec<String>,
}

impl RacingLoader {
pub fn new(rpc_client: RpcClient, gateway: Option<String>) -> Self {
pub fn new(rpc_client: RpcClient, http_resolvers: Vec<String>) -> Self {
Self {
rpc_client,
gateway,
http_resolvers,
}
}
}

impl RacingLoader {
pub fn try_raw_gateway(&self) -> Result<&String> {
self.gateway
.as_ref()
.ok_or_else(|| anyhow!("no gateway configured to fetch raw CIDs"))
match self.http_resolvers.len() {
0 => Err(anyhow!("no gateway configured to fetch raw CIDs")),
_ => {
let mut rng = rand::thread_rng();
let gw = self.http_resolvers.choose(&mut rng).unwrap();
Ok(gw)
}
}
}

async fn fetch_p2p(&self, ctx: ContextId, cid: &Cid) -> Result<Bytes, anyhow::Error> {
Expand All @@ -42,7 +46,13 @@ impl RacingLoader {
async fn fetch_http(&self, cid: &Cid) -> Result<(Bytes, String), anyhow::Error> {
let gateway = self.try_raw_gateway()?;
let cid_str = multibase::encode(multibase::Base::Base32Lower, cid.to_bytes().as_slice());
let gateway_url = format!("https://{}.ipfs.{}?format=raw", cid_str, gateway);
// support two gateway URL formats: subdomain gateways (eg: dweb.link)
// and full URL (eg: https://ipfs.io)
let gateway_url = if gateway.starts_with("https://") || gateway.starts_with("http://") {
format!("{}/ipfs/{}?format=raw", gateway, cid_str)
} else {
format!("https://{}.ipfs.{}?format=raw", cid_str, gateway)
};
debug!("Will fetch {}", gateway_url);
let response = reqwest::get(gateway_url).await?;
response
Expand Down