Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: simplify mirror implementation #1375

Merged
merged 3 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,12 +917,6 @@ pub struct MirrorConfig {
/// HTTP request headers to be passed to mirror server.
#[serde(default)]
pub headers: HashMap<String, String>,
/// Whether the authorization process is through mirror, default to false.
/// true: authorization through mirror, e.g. Using normal registry as mirror.
/// false: authorization through original registry,
/// e.g. when using Dragonfly server as mirror, authorization through it may affect performance.
#[serde(default)]
pub auth_through: bool,
/// Interval for mirror health checking, in seconds.
#[serde(default = "default_check_interval")]
pub health_check_interval: u64,
Expand All @@ -936,7 +930,6 @@ impl Default for MirrorConfig {
Self {
host: String::new(),
headers: HashMap::new(),
auth_through: false,
health_check_interval: 5,
failure_limit: 5,
ping_url: String::new(),
Expand Down Expand Up @@ -1825,7 +1818,6 @@ mod tests {
[[backend.oss.mirrors]]
host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
auth_through = true
health_check_interval = 10
failure_limit = 10
"#;
Expand Down Expand Up @@ -1859,7 +1851,6 @@ mod tests {
let mirror = &oss.mirrors[0];
assert_eq!(mirror.host, "http://127.0.0.1:65001");
assert_eq!(mirror.ping_url, "http://127.0.0.1:65001/ping");
assert!(mirror.auth_through);
assert!(mirror.headers.is_empty());
assert_eq!(mirror.health_check_interval, 10);
assert_eq!(mirror.failure_limit, 10);
Expand Down Expand Up @@ -1891,7 +1882,6 @@ mod tests {
[[backend.registry.mirrors]]
host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
auth_through = true
health_check_interval = 10
failure_limit = 10
"#;
Expand Down Expand Up @@ -1927,7 +1917,6 @@ mod tests {
let mirror = &registry.mirrors[0];
assert_eq!(mirror.host, "http://127.0.0.1:65001");
assert_eq!(mirror.ping_url, "http://127.0.0.1:65001/ping");
assert!(mirror.auth_through);
assert!(mirror.headers.is_empty());
assert_eq!(mirror.health_check_interval, 10);
assert_eq!(mirror.failure_limit, 10);
Expand Down
3 changes: 0 additions & 3 deletions docs/nydusd.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,6 @@ Currently, the mirror mode is only tested in the registry backend, and in theory
{
// Mirror server URL (including scheme), e.g. Dragonfly dfdaemon server URL
"host": "http://dragonfly1.io:65001",
// true: Send the authorization request to the mirror e.g. another docker registry.
// false: Authorization request won't be relayed by the mirror e.g. Dragonfly.
"auth_through": false,
// Headers for mirror server
"headers": {
// For Dragonfly dfdaemon server URL, we need to specify "X-Dragonfly-Registry" (including scheme).
Expand Down
4 changes: 0 additions & 4 deletions misc/configs/nydusd-blob-cache-entry-configuration-v2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
# HTTP request headers to be passed to mirror server.
# headers =
# Whether the authorization process is through mirror, default to false.
auth_through = true
# Interval for mirror health checking, in seconds.
health_check_interval = 5
# Maximum number of failures before marking a mirror as unusable.
Expand Down Expand Up @@ -108,8 +106,6 @@ host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
# HTTP request headers to be passed to mirror server.
# headers =
# Whether the authorization process is through mirror, default to false.
auth_through = true
# Interval for mirror health checking, in seconds.
health_check_interval = 5
# Maximum number of failures before marking a mirror as unusable.
Expand Down
4 changes: 0 additions & 4 deletions misc/configs/nydusd-blob-cache-entry.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
# HTTP request headers to be passed to mirror server.
# headers =
# Whether the authorization process is through mirror, default to false.
auth_through = true
# Interval for mirror health checking, in seconds.
health_check_interval = 5
# Maximum number of failures before marking a mirror as unusable.
Expand Down Expand Up @@ -113,8 +111,6 @@ host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
# HTTP request headers to be passed to mirror server.
# headers =
# Whether the authorization process is through mirror, default to false.
auth_through = true
# Interval for mirror health checking, in seconds.
health_check_interval = 5
# Maximum number of failures before marking a mirror as unusable.
Expand Down
4 changes: 0 additions & 4 deletions misc/configs/nydusd-config-v2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
# HTTP request headers to be passed to mirror server.
# headers =
# Whether the authorization process is through mirror, default to false.
auth_through = true
# Interval for mirror health checking, in seconds.
health_check_interval = 5
# Maximum number of failures before marking a mirror as unusable.
Expand Down Expand Up @@ -106,8 +104,6 @@ host = "http://127.0.0.1:65001"
ping_url = "http://127.0.0.1:65001/ping"
# HTTP request headers to be passed to mirror server.
# headers =
# Whether the authorization process is through mirror, default to false.
auth_through = true
# Interval for mirror health checking, in seconds.
health_check_interval = 5
# Maximum number of failures before marking a mirror as unusable.
Expand Down
59 changes: 20 additions & 39 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,17 @@ impl Connection {
} else {
mirror_cloned.config.ping_url.clone()
};
info!("Mirror health checking url: {}", mirror_health_url);
info!(
"[mirror] start health check, ping url: {}",
mirror_health_url
);

let client = Client::new();
loop {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"Mirror server {} unhealthy, try to recover",
"[mirror] server unhealthy, try to recover: {}",
mirror_cloned.config.host
);

Expand All @@ -422,14 +425,17 @@ impl Connection {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!("Mirror server {} recovered", mirror_cloned.config.host);
info!(
"[mirror] server recovered: {}",
mirror_cloned.config.host
);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"Mirror server {} is not recovered: {}",
"[mirror] failed to recover server: {}, {}",
mirror_cloned.config.host, e
);
});
Expand All @@ -448,13 +454,6 @@ impl Connection {
self.shutdown.store(true, Ordering::Release);
}

/// If the auth_through is enable, all requests are send to the mirror server.
/// If the auth_through disabled, e.g. P2P/Dragonfly, we try to avoid sending
/// non-authorization request to the mirror server, which causes performance loss.
/// requesting_auth means this request is to get authorization from a server,
/// which must be a non-authorization request.
/// IOW, only the requesting_auth is false and the headers contain authorization token,
/// we send this request to mirror.
#[allow(clippy::too_many_arguments)]
pub fn call<R: Read + Clone + Send + 'static>(
&self,
Expand All @@ -464,8 +463,6 @@ impl Connection {
data: Option<ReqBody<R>>,
headers: &mut HeaderMap,
catch_status: bool,
// This means the request is dedicated to authorization.
requesting_auth: bool,
) -> ConnectionResult<Response> {
if self.shutdown.load(Ordering::Acquire) {
return Err(ConnectionError::Disconnected);
Expand Down Expand Up @@ -524,27 +521,10 @@ impl Connection {
}
}

let mut mirror_enabled = false;
if !self.mirrors.is_empty() {
let mut fallback_due_auth = false;
mirror_enabled = true;
for mirror in self.mirrors.iter() {
// With configuration `auth_through` disabled, we should not intend to send authentication
// request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when
// relaying non-data requests. But it's still possible that ever returned token is expired.
// So mirror might still respond us with status code UNAUTHORIZED, which should be handle
// by sending authentication request to the original registry.
//
// - For non-authentication request with token in request header, handle is as usual requests to registry.
// This request should already take token in header.
// - For authentication request
// 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler.
// 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED.
if !mirror.config.auth_through
&& (!headers.contains_key(HEADER_AUTHORIZATION) || requesting_auth)
{
fallback_due_auth = true;
break;
}

if mirror.status.load(Ordering::Relaxed) {
let data_cloned = data.as_ref().cloned();

Expand All @@ -556,7 +536,7 @@ impl Connection {
}

let current_url = mirror.mirror_url(url)?;
debug!("mirror server url {}", current_url);
debug!("[mirror] replace to: {}", current_url);

let result = self.call_inner(
&self.client,
Expand All @@ -578,14 +558,14 @@ impl Connection {
}
Err(err) => {
warn!(
"request mirror server failed, mirror: {:?}, error: {:?}",
mirror, err
"[mirror] request failed, server: {:?}, {:?}",
mirror.config.host, err
);
mirror.failed_times.fetch_add(1, Ordering::Relaxed);

if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
warn!(
"reach to failure limit {}, disable mirror: {:?}",
"[mirror] exceed failure limit {}, server disabled: {:?}",
mirror.failure_limit, mirror
);
mirror.status.store(false, Ordering::Relaxed);
Expand All @@ -598,9 +578,10 @@ impl Connection {
headers.remove(HeaderName::from_str(key).unwrap());
}
}
if !fallback_due_auth {
warn!("Request to all mirror server failed, fallback to original server.");
}
}

if mirror_enabled {
warn!("[mirror] request all servers failed, fallback to original server.");
}

self.call_inner(
Expand Down
11 changes: 1 addition & 10 deletions storage/src/backend/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ impl BlobReader for HttpProxyReader {
None,
&mut HeaderMap::new(),
true,
false,
)
.map(|resp| resp.headers().to_owned())
.map_err(|e| HttpProxyError::RemoteRequest(e).into())
Expand Down Expand Up @@ -255,15 +254,7 @@ impl BlobReader for HttpProxyReader {
.map_err(|e| HttpProxyError::ConstructHeader(format!("{}", e)))?,
);
let mut resp = connection
.call::<&[u8]>(
Method::GET,
uri.as_str(),
None,
None,
&mut headers,
true,
false,
)
.call::<&[u8]>(Method::GET, uri.as_str(), None, None, &mut headers, true)
.map_err(HttpProxyError::RemoteRequest)?;

Ok(resp
Expand Down
20 changes: 2 additions & 18 deletions storage/src/backend/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,7 @@ where

let resp = self
.connection
.call::<&[u8]>(
Method::HEAD,
url.as_str(),
None,
None,
&mut headers,
true,
false,
)
.call::<&[u8]>(Method::HEAD, url.as_str(), None, None, &mut headers, true)
.map_err(ObjectStorageError::Request)?;
let content_length = resp
.headers()
Expand Down Expand Up @@ -136,15 +128,7 @@ where
// Safe because the the call() is a synchronous operation.
let mut resp = self
.connection
.call::<&[u8]>(
Method::GET,
url.as_str(),
None,
None,
&mut headers,
true,
false,
)
.call::<&[u8]>(Method::GET, url.as_str(), None, None, &mut headers, true)
.map_err(ObjectStorageError::Request)?;
Ok(resp
.copy_to(&mut buf)
Expand Down
Loading