Skip to content

Commit

Permalink
storage: adjust token refresh interval automatically
Browse files Browse the repository at this point in the history
- Make registry mirror log pretty;
- Adjust token refresh interval automatically;

Signed-off-by: Yan Song <[email protected]>
  • Loading branch information
imeoer committed Jul 19, 2023
1 parent 0bb64c3 commit dc83210
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 31 deletions.
24 changes: 15 additions & 9 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,17 @@ impl Connection {
} else {
mirror_cloned.config.ping_url.clone()
};
info!("Mirror health checking url: {}", mirror_health_url);
info!(
"[mirror] start health check, ping url: {}",
mirror_health_url
);

let client = Client::new();
loop {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"Mirror server {} unhealthy, try to recover",
"[mirror] server unhealthy, try to recover: {}",
mirror_cloned.config.host
);

Expand All @@ -422,14 +425,17 @@ impl Connection {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!("Mirror server {} recovered", mirror_cloned.config.host);
info!(
"[mirror] server recovered: {}",
mirror_cloned.config.host
);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"Mirror server {} is not recovered: {}",
"[mirror] failed to recover server: {}, {}",
mirror_cloned.config.host, e
);
});
Expand Down Expand Up @@ -530,7 +536,7 @@ impl Connection {
}

let current_url = mirror.mirror_url(url)?;
debug!("mirror server url {}", current_url);
debug!("[mirror] replace to: {}", current_url);

let result = self.call_inner(
&self.client,
Expand All @@ -552,14 +558,14 @@ impl Connection {
}
Err(err) => {
warn!(
"request mirror server failed, mirror: {:?}, error: {:?}",
mirror, err
"[mirror] request failed, server: {:?}, {:?}",
mirror.config.host, err
);
mirror.failed_times.fetch_add(1, Ordering::Relaxed);

if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
warn!(
"reach to failure limit {}, disable mirror: {:?}",
"[mirror] exceed failure limit {}, server disabled: {:?}",
mirror.failure_limit, mirror
);
mirror.status.store(false, Ordering::Relaxed);
Expand All @@ -575,7 +581,7 @@ impl Connection {
}

if mirror_enabled {
warn!("Request to all mirror server failed, fallback to original server.");
warn!("[mirror] request all servers failed, fallback to original server.");
}

self.call_inner(
Expand Down
57 changes: 35 additions & 22 deletions storage/src/backend/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const REDIRECTED_STATUS_CODE: [StatusCode; 2] = [
StatusCode::TEMPORARY_REDIRECT,
];

const REGISTRY_DEFAULT_TOKEN_EXPIRATION: u64 = 10 * 60; // in seconds

/// Error codes related to registry storage backend operations.
#[derive(Debug)]
pub enum RegistryError {
Expand Down Expand Up @@ -116,13 +118,15 @@ impl HashCache {

#[derive(Clone, serde::Deserialize)]
struct TokenResponse {
/// Registry token string.
token: String,
/// Registry token period of validity, in seconds.
#[serde(default = "default_expires_in")]
expires_in: u64,
}

fn default_expires_in() -> u64 {
10 * 60
REGISTRY_DEFAULT_TOKEN_EXPIRATION
}

#[derive(Debug)]
Expand Down Expand Up @@ -189,8 +193,8 @@ struct RegistryState {
// Example: RwLock<HashMap<"<blob_id>", "<redirected_url>">>
cached_redirect: HashCache,

// The expiration time of the token, which is obtained from the registry server.
refresh_token_time: ArcSwapOption<u64>,
// The epoch timestamp of token expiration, which is obtained from the registry server.
token_expired_at: ArcSwapOption<u64>,
// Cache bearer auth for refreshing token.
cached_bearer_auth: ArcSwapOption<BearerAuth>,
}
Expand Down Expand Up @@ -235,7 +239,7 @@ impl RegistryState {
}

/// Request registry authentication server to get bearer token
fn get_token(&self, auth: BearerAuth, connection: &Arc<Connection>) -> Result<String> {
fn get_token(&self, auth: BearerAuth, connection: &Arc<Connection>) -> Result<TokenResponse> {
// The information needed for getting token needs to be placed both in
// the query and in the body to be compatible with different registry
// implementations, which have been tested on these platforms:
Expand Down Expand Up @@ -276,7 +280,7 @@ impl RegistryState {
))
})?;
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
self.refresh_token_time
self.token_expired_at
.store(Some(Arc::new(now_timestamp.as_secs() + ret.expires_in)));
debug!(
"cached bearer auth, next time: {}",
Expand All @@ -287,7 +291,7 @@ impl RegistryState {
// Cache bearer auth for refreshing token.
self.cached_bearer_auth.store(Some(Arc::new(auth)));

Ok(ret.token)
Ok(ret)
}

fn get_auth_header(&self, auth: Auth, connection: &Arc<Connection>) -> Result<String> {
Expand All @@ -299,7 +303,7 @@ impl RegistryState {
.ok_or_else(|| einval!("invalid auth config")),
Auth::Bearer(auth) => {
let token = self.get_token(auth, connection)?;
Ok(format!("Bearer {}", token))
Ok(format!("Bearer {}", token.token))
}
}
}
Expand Down Expand Up @@ -806,7 +810,7 @@ impl Registry {
blob_url_scheme: config.blob_url_scheme.clone(),
blob_redirected_host: config.blob_redirected_host.clone(),
cached_redirect: HashCache::new(),
refresh_token_time: ArcSwapOption::new(None),
token_expired_at: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
});

Expand Down Expand Up @@ -853,30 +857,39 @@ impl Registry {
fn start_refresh_token_thread(&self) {
let conn = self.connection.clone();
let state = self.state.clone();
// The default refresh token internal is 10 minutes.
let refresh_check_internal = 10 * 60;
// FIXME: we'd better allow users to specify the expiration time.
let mut refresh_interval = REGISTRY_DEFAULT_TOKEN_EXPIRATION;
thread::spawn(move || {
loop {
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
if let Some(next_refresh_timestamp) = state.refresh_token_time.load().as_deref()
{
// If the token will expire in next refresh check internal, get new token now.
// Add 20 seconds to handle critical cases.
if now_timestamp.as_secs() + refresh_check_internal + 20
>= *next_refresh_timestamp
{
if let Some(token_expired_at) = state.token_expired_at.load().as_deref() {
// If the token will expire within the next refresh interval,
// refresh it immediately.
if now_timestamp.as_secs() + refresh_interval >= *token_expired_at {
if let Some(cached_bearer_auth) =
state.cached_bearer_auth.load().as_deref()
{
if let Ok(token) =
state.get_token(cached_bearer_auth.to_owned(), &conn)
{
let new_cached_auth = format!("Bearer {}", token);
info!("Authorization token for registry has been refreshed.");
// Refresh authorization token
let new_cached_auth = format!("Bearer {}", token.token);
debug!(
"[refresh_token_thread] registry token has been refreshed"
);
// Refresh cached token.
state
.cached_auth
.set(&state.cached_auth.get(), new_cached_auth);
// Reset refresh interval according to real expiration time,
// and advance 20s to handle the unexpected cases.
refresh_interval = token
.expires_in
.checked_sub(20)
.unwrap_or(token.expires_in);
} else {
error!(
"[refresh_token_thread] failed to refresh registry token"
);
}
}
}
Expand All @@ -886,7 +899,7 @@ impl Registry {
if conn.shutdown.load(Ordering::Acquire) {
break;
}
thread::sleep(Duration::from_secs(refresh_check_internal));
thread::sleep(Duration::from_secs(refresh_interval));
if conn.shutdown.load(Ordering::Acquire) {
break;
}
Expand Down Expand Up @@ -979,7 +992,7 @@ mod tests {
blob_redirected_host: "oss.alibaba-inc.com".to_string(),
cached_auth: Default::default(),
cached_redirect: Default::default(),
refresh_token_time: ArcSwapOption::new(None),
token_expired_at: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
};

Expand Down

0 comments on commit dc83210

Please sign in to comment.