diff --git a/storage/src/backend/connection.rs b/storage/src/backend/connection.rs index f5f889e0bcf..6b6b2e69e43 100644 --- a/storage/src/backend/connection.rs +++ b/storage/src/backend/connection.rs @@ -403,14 +403,17 @@ impl Connection { } else { mirror_cloned.config.ping_url.clone() }; - info!("Mirror health checking url: {}", mirror_health_url); + info!( + "[mirror] start health check, ping url: {}", + mirror_health_url + ); let client = Client::new(); loop { // Try to recover the mirror server when it is unavailable. if !mirror_cloned.status.load(Ordering::Relaxed) { info!( - "Mirror server {} unhealthy, try to recover", + "[mirror] server unhealthy, try to recover: {}", mirror_cloned.config.host ); @@ -422,14 +425,17 @@ impl Connection { // If the response status is less than StatusCode::INTERNAL_SERVER_ERROR, // the mirror server is recovered. if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { - info!("Mirror server {} recovered", mirror_cloned.config.host); + info!( + "[mirror] server recovered: {}", + mirror_cloned.config.host + ); mirror_cloned.failed_times.store(0, Ordering::Relaxed); mirror_cloned.status.store(true, Ordering::Relaxed); } }) .map_err(|e| { warn!( - "Mirror server {} is not recovered: {}", + "[mirror] failed to recover server: {}, {}", mirror_cloned.config.host, e ); }); @@ -530,7 +536,7 @@ impl Connection { } let current_url = mirror.mirror_url(url)?; - debug!("mirror server url {}", current_url); + debug!("[mirror] replace to: {}", current_url); let result = self.call_inner( &self.client, @@ -552,14 +558,14 @@ impl Connection { } Err(err) => { warn!( - "request mirror server failed, mirror: {:?}, error: {:?}", - mirror, err + "[mirror] request failed, server: {:?}, {:?}", + mirror.config.host, err ); mirror.failed_times.fetch_add(1, Ordering::Relaxed); if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit { warn!( - "reach to failure limit {}, disable mirror: {:?}", + "[mirror] exceed failure limit {}, server disabled: {:?}", mirror.failure_limit, mirror ); mirror.status.store(false, Ordering::Relaxed); @@ -575,7 +581,7 @@ impl Connection { } if mirror_enabled { - warn!("Request to all mirror server failed, fallback to original server."); + warn!("[mirror] request all servers failed, fallback to original server."); } self.call_inner( diff --git a/storage/src/backend/registry.rs b/storage/src/backend/registry.rs index b25fb624e0b..9c8d9f9540c 100644 --- a/storage/src/backend/registry.rs +++ b/storage/src/backend/registry.rs @@ -36,6 +36,8 @@ const REDIRECTED_STATUS_CODE: [StatusCode; 2] = [ StatusCode::TEMPORARY_REDIRECT, ]; +const REGISTRY_DEFAULT_TOKEN_EXPIRATION: u64 = 10 * 60; // in seconds + /// Error codes related to registry storage backend operations. #[derive(Debug)] pub enum RegistryError { @@ -116,13 +118,15 @@ impl HashCache { #[derive(Clone, serde::Deserialize)] struct TokenResponse { + /// Registry token string. token: String, + /// Registry token period of validity, in seconds. #[serde(default = "default_expires_in")] expires_in: u64, } fn default_expires_in() -> u64 { - 10 * 60 + REGISTRY_DEFAULT_TOKEN_EXPIRATION } #[derive(Debug)] @@ -189,8 +193,8 @@ struct RegistryState { // Example: RwLock", "">> cached_redirect: HashCache, - // The expiration time of the token, which is obtained from the registry server. - refresh_token_time: ArcSwapOption, + // The epoch timestamp of token expiration, which is obtained from the registry server. + token_expired_at: ArcSwapOption, // Cache bearer auth for refreshing token. cached_bearer_auth: ArcSwapOption, } @@ -233,7 +237,7 @@ impl RegistryState { } /// Request registry authentication server to get bearer token - fn get_token(&self, auth: BearerAuth, connection: &Arc) -> Result { + fn get_token(&self, auth: BearerAuth, connection: &Arc) -> Result { // The information needed for getting token needs to be placed both in // the query and in the body to be compatible with different registry // implementations, which have been tested on these platforms: @@ -274,7 +278,7 @@ impl RegistryState { )) })?; if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) { - self.refresh_token_time + self.token_expired_at .store(Some(Arc::new(now_timestamp.as_secs() + ret.expires_in))); debug!( "cached bearer auth, next time: {}", @@ -285,7 +289,7 @@ impl RegistryState { // Cache bearer auth for refreshing token. self.cached_bearer_auth.store(Some(Arc::new(auth))); - Ok(ret.token) + Ok(ret) } fn get_auth_header(&self, auth: Auth, connection: &Arc) -> Result { @@ -297,7 +301,7 @@ impl RegistryState { .ok_or_else(|| einval!("invalid auth config")), Auth::Bearer(auth) => { let token = self.get_token(auth, connection)?; - Ok(format!("Bearer {}", token)) + Ok(format!("Bearer {}", token.token)) } } } @@ -804,7 +808,7 @@ impl Registry { blob_url_scheme: config.blob_url_scheme.clone(), blob_redirected_host: config.blob_redirected_host.clone(), cached_redirect: HashCache::new(), - refresh_token_time: ArcSwapOption::new(None), + token_expired_at: ArcSwapOption::new(None), cached_bearer_auth: ArcSwapOption::new(None), }); @@ -851,30 +855,39 @@ impl Registry { fn start_refresh_token_thread(&self) { let conn = self.connection.clone(); let state = self.state.clone(); - // The default refresh token internal is 10 minutes. - let refresh_check_internal = 10 * 60; + // FIXME: we'd better allow users to specify the expiration time. + let mut refresh_interval = REGISTRY_DEFAULT_TOKEN_EXPIRATION; thread::spawn(move || { loop { if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) { - if let Some(next_refresh_timestamp) = state.refresh_token_time.load().as_deref() - { - // If the token will expire in next refresh check internal, get new token now. - // Add 20 seconds to handle critical cases. - if now_timestamp.as_secs() + refresh_check_internal + 20 - >= *next_refresh_timestamp - { + if let Some(token_expired_at) = state.token_expired_at.load().as_deref() { + // If the token will expire within the next refresh interval, + // refresh it immediately. + if now_timestamp.as_secs() + refresh_interval >= *token_expired_at { if let Some(cached_bearer_auth) = state.cached_bearer_auth.load().as_deref() { if let Ok(token) = state.get_token(cached_bearer_auth.to_owned(), &conn) { - let new_cached_auth = format!("Bearer {}", token); - info!("Authorization token for registry has been refreshed."); - // Refresh authorization token + let new_cached_auth = format!("Bearer {}", token.token); + debug!( + "[refresh_token_thread] registry token has been refreshed" + ); + // Refresh cached token. state .cached_auth .set(&state.cached_auth.get(), new_cached_auth); + // Reset refresh interval according to real expiration time, + // and advance 20s to handle the unexpected cases. + refresh_interval = token + .expires_in + .checked_sub(20) + .unwrap_or(token.expires_in); + } else { + error!( + "[refresh_token_thread] failed to refresh registry token" + ); } } } @@ -884,7 +897,7 @@ impl Registry { if conn.shutdown.load(Ordering::Acquire) { break; } - thread::sleep(Duration::from_secs(refresh_check_internal)); + thread::sleep(Duration::from_secs(refresh_interval)); if conn.shutdown.load(Ordering::Acquire) { break; } @@ -977,7 +990,7 @@ mod tests { blob_redirected_host: "oss.alibaba-inc.com".to_string(), cached_auth: Default::default(), cached_redirect: Default::default(), - refresh_token_time: ArcSwapOption::new(None), + token_expired_at: ArcSwapOption::new(None), cached_bearer_auth: ArcSwapOption::new(None), };