Skip to content

Commit

Permalink
storage: implement simpler first token request
Browse files Browse the repository at this point in the history
Nydusd uses a registry backend which generates a surge of blob requests without
auth tokens on initial startup. This caused mirror backends (e.g. dragonfly)
to process very slowly, the commit fixes this problem.

It implements waiting for the first blob request to complete before making other
blob requests, this ensures the first request caches a valid registry auth token,
and subsequent concurrent blob requests can reuse the cached token.

This change is worthwhile to reduce concurrent token requests, it also makes the
behavior consistent with containerd, which first requests the image manifest and
caches the token before concurrently requesting blobs.

Signed-off-by: Yan Song <[email protected]>
  • Loading branch information
imeoer committed Jul 18, 2023
1 parent 4e3c954 commit 73f5703
Showing 1 changed file with 63 additions and 20 deletions.
83 changes: 63 additions & 20 deletions storage/src/backend/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::HashMap;
use std::error::Error;
use std::io::{Read, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Once, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fmt, thread};

Expand Down Expand Up @@ -369,6 +369,7 @@ struct RegistryReader {
connection: Arc<Connection>,
state: Arc<RegistryState>,
metrics: Arc<BackendMetrics>,
first: Arc<Once>,
}

impl RegistryReader {
Expand Down Expand Up @@ -637,18 +638,52 @@ impl RegistryReader {
.map_err(RegistryError::Transport)
.map(|size| size as usize)
}

/// When invoking concurrently, only one of the handle methods will be executed first,
/// then subsequent handle methods will be allowed to execute concurrently.
///
/// Nydusd uses a registry backend which generates a surge of blob requests without
/// auth tokens on initial startup, this caused mirror backends (e.g. dragonfly)
/// to process very slowly. The method implements waiting for the first blob request
/// to complete before making other blob requests, this ensures the first request
/// caches a valid registry auth token, and subsequent concurrent blob requests can
/// reuse the cached token.
fn handle<F, T>(&self, handle: &mut F) -> T
where
F: FnMut() -> T,
{
let mut done = false;
let mut ret = None;
self.first.call_once(|| {
done = true;
// FIXME: when handle() returns Err, we need to reset the Once instance,
// so that the retry logic still be executed in the Once.
ret = Some(handle());
});
if !done {
ret = Some(handle());
}
// It's safe to unwrap because the ret must be Some.
ret.unwrap()
}
}

impl BlobReader for RegistryReader {
fn blob_size(&self) -> BackendResult<u64> {
let url = format!("/blobs/sha256:{}", self.blob_id);
let url = self
.state
.url(&url, &[])
.map_err(|e| RegistryError::Url(url, e))?;

let resp =
match self.request::<&[u8]>(Method::HEAD, url.as_str(), None, HeaderMap::new(), true) {
self.handle(&mut || -> BackendResult<u64> {
let url = format!("/blobs/sha256:{}", self.blob_id);
let url = self
.state
.url(&url, &[])
.map_err(|e| RegistryError::Url(url, e))?;

let resp = match self.request::<&[u8]>(
Method::HEAD,
url.as_str(),
None,
HeaderMap::new(),
true,
) {
Ok(res) => res,
Err(RegistryError::Request(ConnectionError::Common(e)))
if self.state.needs_fallback_http(&e) =>
Expand All @@ -665,21 +700,26 @@ impl BlobReader for RegistryReader {
return Err(BackendError::Registry(e));
}
};
let content_length = resp
.headers()
.get(CONTENT_LENGTH)
.ok_or_else(|| RegistryError::Common("invalid content length".to_string()))?;
let content_length = resp
.headers()
.get(CONTENT_LENGTH)
.ok_or_else(|| RegistryError::Common("invalid content length".to_string()))?;

Ok(content_length
.to_str()
.map_err(|err| RegistryError::Common(format!("invalid content length: {:?}", err)))?
.parse::<u64>()
.map_err(|err| RegistryError::Common(format!("invalid content length: {:?}", err)))?)
Ok(content_length
.to_str()
.map_err(|err| RegistryError::Common(format!("invalid content length: {:?}", err)))?
.parse::<u64>()
.map_err(|err| {
RegistryError::Common(format!("invalid content length: {:?}", err))
})?)
})
}

fn try_read(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize> {
self._try_read(buf, offset, true)
.map_err(BackendError::Registry)
self.handle(&mut || -> BackendResult<usize> {
self._try_read(buf, offset, true)
.map_err(BackendError::Registry)
})
}

fn metrics(&self) -> &BackendMetrics {
Expand All @@ -696,6 +736,7 @@ pub struct Registry {
connection: Arc<Connection>,
state: Arc<RegistryState>,
metrics: Arc<BackendMetrics>,
first: Arc<Once>,
}

impl Registry {
Expand Down Expand Up @@ -751,6 +792,7 @@ impl Registry {
connection,
state,
metrics: BackendMetrics::new(id, "registry"),
first: Arc::new(Once::new()),
};

for mirror in mirrors.iter() {
Expand Down Expand Up @@ -851,6 +893,7 @@ impl BlobBackend for Registry {
state: self.state.clone(),
connection: self.connection.clone(),
metrics: self.metrics.clone(),
first: self.first.clone(),
}))
}
}
Expand Down

0 comments on commit 73f5703

Please sign in to comment.