From 3e81ef14566a91ea4f89a1699090367f9450cabd Mon Sep 17 00:00:00 2001 From: Ethan Donowitz <8703826+ethowitz@users.noreply.github.com> Date: Mon, 13 Jun 2022 12:08:45 -0400 Subject: [PATCH] bug: fix Spanner node query (#1332) Closes #1331 --- syncstorage/src/tokenserver/db/models.rs | 168 ++++++++++++++++++---- syncstorage/src/tokenserver/db/params.rs | 2 + syncstorage/src/tokenserver/extractors.rs | 2 + syncstorage/src/tokenserver/mod.rs | 2 + syncstorage/src/tokenserver/settings.rs | 3 + 5 files changed, 149 insertions(+), 28 deletions(-) diff --git a/syncstorage/src/tokenserver/db/models.rs b/syncstorage/src/tokenserver/db/models.rs index 19fa862d05..6fa34269de 100644 --- a/syncstorage/src/tokenserver/db/models.rs +++ b/syncstorage/src/tokenserver/db/models.rs @@ -192,6 +192,7 @@ impl TokenserverDb { /// Gets the least-loaded node that has available slots. fn get_best_node_sync(&self, params: params::GetBestNode) -> DbResult { + const DEFAULT_CAPACITY_RELEASE_RATE: f32 = 0.1; const GET_BEST_NODE_QUERY: &str = r#" SELECT id, node FROM nodes @@ -211,43 +212,60 @@ impl TokenserverDb { AND capacity > current_load AND downed = 0 "#; - const DEFAULT_CAPACITY_RELEASE_RATE: f32 = 0.1; + const SPANNER_QUERY: &str = r#" + SELECT id, node + FROM nodes + WHERE id = ? + LIMIT 1 + "#; let mut metrics = self.metrics.clone(); metrics.start_timer("storage.get_best_node", None); - // We may have to retry the query if we need to release more capacity. This loop allows - // a maximum of five retries before bailing out. - for _ in 0..5 { - let maybe_result = diesel::sql_query(GET_BEST_NODE_QUERY) - .bind::(params.service_id) + if let Some(spanner_node_id) = params.spanner_node_id { + diesel::sql_query(SPANNER_QUERY) + .bind::(spanner_node_id) .get_result::(&self.inner.conn) - .optional()?; + .map_err(|e| { + let mut db_error = + DbError::internal(&format!("unable to get Spanner node: {}", e)); + db_error.status = StatusCode::SERVICE_UNAVAILABLE; + db_error + }) + } else { + // We may have to retry the query if we need to release more capacity. This loop allows + // a maximum of five retries before bailing out. + for _ in 0..5 { + let maybe_result = diesel::sql_query(GET_BEST_NODE_QUERY) + .bind::(params.service_id) + .get_result::(&self.inner.conn) + .optional()?; + + if let Some(result) = maybe_result { + return Ok(result); + } - if let Some(result) = maybe_result { - return Ok(result); + // There were no available nodes. Try to release additional capacity from any nodes + // that are not fully occupied. + let affected_rows = diesel::sql_query(RELEASE_CAPACITY_QUERY) + .bind::( + params + .capacity_release_rate + .unwrap_or(DEFAULT_CAPACITY_RELEASE_RATE), + ) + .bind::(params.service_id) + .execute(&self.inner.conn)?; + + // If no nodes were affected by the last query, give up. + if affected_rows == 0 { + break; + } } - // There were no available nodes. Try to release additional capacity from any nodes - // that are not fully occupied. - let affected_rows = diesel::sql_query(RELEASE_CAPACITY_QUERY) - .bind::( - params - .capacity_release_rate - .unwrap_or(DEFAULT_CAPACITY_RELEASE_RATE), - ) - .bind::(params.service_id) - .execute(&self.inner.conn)?; - - // If no nodes were affected by the last query, give up. - if affected_rows == 0 { - break; - } + let mut db_error = DbError::internal("unable to get a node"); + db_error.status = StatusCode::SERVICE_UNAVAILABLE; + Err(db_error) } - - let mut db_error = DbError::internal("unable to get a node"); - db_error.status = StatusCode::SERVICE_UNAVAILABLE; - Err(db_error) } fn add_user_to_node_sync( @@ -367,6 +385,7 @@ impl TokenserverDb { client_state: raw_user.client_state.clone(), keys_changed_at: raw_user.keys_changed_at, capacity_release_rate: params.capacity_release_rate, + spanner_node_id: params.spanner_node_id, })? }; @@ -414,6 +433,7 @@ impl TokenserverDb { let node = self.get_best_node_sync(params::GetBestNode { service_id: params.service_id, capacity_release_rate: params.capacity_release_rate, + spanner_node_id: params.spanner_node_id, })?; // Decrement `available` and increment `current_load` on the node assigned to the user. @@ -1200,6 +1220,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() })?; assert_eq!(user.node, "https://node1"); @@ -1253,6 +1274,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() })?; let user2 = db.allocate_user_sync(params::AllocateUser { @@ -1262,6 +1284,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() })?; // Because users are always assigned to the least-loaded node, the users should have been @@ -1305,6 +1328,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }); let error = result.unwrap_err(); assert_eq!(error.to_string(), "Unexpected error: unable to get a node"); @@ -1346,6 +1370,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }); let error = result.unwrap_err(); assert_eq!(error.to_string(), "Unexpected error: unable to get a node"); @@ -1386,6 +1411,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() })?; let user1 = db .get_user(params::GetUser { @@ -1409,6 +1435,7 @@ mod tests { client_state: "bbbb".to_owned(), keys_changed_at: Some(1235), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1459,6 +1486,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1470,6 +1498,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1529,6 +1558,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1540,6 +1570,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1551,6 +1582,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1562,6 +1594,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1594,6 +1627,7 @@ mod tests { client_state: user.client_state.clone(), keys_changed_at: user.keys_changed_at, capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1623,6 +1657,7 @@ mod tests { client_state: user.client_state.clone(), keys_changed_at: user.keys_changed_at, capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1681,6 +1716,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1698,6 +1734,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1717,6 +1754,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1734,6 +1772,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1752,6 +1791,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1769,6 +1809,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1787,6 +1828,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await; @@ -1834,6 +1876,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1853,6 +1896,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1896,6 +1940,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1912,6 +1957,7 @@ mod tests { client_state: "aaaa".to_owned(), keys_changed_at: Some(1234), capacity_release_rate: None, + ..Default::default() }) .await?; @@ -1921,6 +1967,72 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_spanner_node() -> DbResult<()> { + let pool = db_pool().await?; + let db = pool.get().await?; + + // Add a service + let service_id = db + .post_service(params::PostService { + service: "sync-1.5".to_owned(), + pattern: "{node}/1.5/{uid}".to_owned(), + }) + .await? + .id; + + // Add a node with capacity and available set to 0 + let spanner_node_id = db + .post_node(params::PostNode { + service_id, + node: "https://spanner_node".to_owned(), + current_load: 1000, + capacity: 0, + available: 0, + ..Default::default() + }) + .await? + .id; + + // Add another node with available capacity + db.post_node(params::PostNode { + service_id, + node: "https://another_node".to_owned(), + current_load: 0, + capacity: 1000, + available: 1000, + ..Default::default() + }) + .await?; + + // Ensure the Spanner node is selected if the Spanner node ID is provided as a parameter + assert_eq!( + db.get_best_node(params::GetBestNode { + service_id, + capacity_release_rate: None, + spanner_node_id: Some(spanner_node_id as i32) + }) + .await? + .id, + spanner_node_id + ); + + // Ensure the node with available capacity is selected if the Spanner node ID is not + // provided as a parameter + assert_ne!( + db.get_best_node(params::GetBestNode { + service_id, + capacity_release_rate: None, + spanner_node_id: None + }) + .await? + .id, + spanner_node_id + ); + + Ok(()) + } + async fn db_pool() -> DbResult { let _ = env_logger::try_init(); diff --git a/syncstorage/src/tokenserver/db/params.rs b/syncstorage/src/tokenserver/db/params.rs index a95f0bb1f8..bff137d2f4 100644 --- a/syncstorage/src/tokenserver/db/params.rs +++ b/syncstorage/src/tokenserver/db/params.rs @@ -35,6 +35,7 @@ pub struct GetOrCreateUser { pub client_state: String, pub keys_changed_at: Option, pub capacity_release_rate: Option, + pub spanner_node_id: Option, } pub type AllocateUser = GetOrCreateUser; @@ -84,6 +85,7 @@ pub struct GetNodeId { pub struct GetBestNode { pub service_id: i32, pub capacity_release_rate: Option, + pub spanner_node_id: Option, } #[derive(Default)] diff --git a/syncstorage/src/tokenserver/extractors.rs b/syncstorage/src/tokenserver/extractors.rs index c3a9fccd99..4ab297d75e 100644 --- a/syncstorage/src/tokenserver/extractors.rs +++ b/syncstorage/src/tokenserver/extractors.rs @@ -232,6 +232,7 @@ impl FromRequest for TokenserverRequest { client_state: auth_data.client_state.clone(), keys_changed_at: auth_data.keys_changed_at, capacity_release_rate: state.node_capacity_release_rate, + spanner_node_id: state.spanner_node_id, }) .await?; log_items_mutator.insert("first_seen_at".to_owned(), user.first_seen_at.to_string()); @@ -1299,6 +1300,7 @@ mod tests { ) .unwrap(), ), + spanner_node_id: None, } } } diff --git a/syncstorage/src/tokenserver/mod.rs b/syncstorage/src/tokenserver/mod.rs index 872c3036fa..bff3151103 100644 --- a/syncstorage/src/tokenserver/mod.rs +++ b/syncstorage/src/tokenserver/mod.rs @@ -36,6 +36,7 @@ pub struct ServerState { pub node_type: NodeType, pub service_id: Option, pub metrics: Box, + pub spanner_node_id: Option, } impl ServerState { @@ -72,6 +73,7 @@ impl ServerState { node_type: settings.node_type, metrics: Box::new(metrics), service_id, + spanner_node_id: settings.spanner_node_id, } }) .map_err(Into::into) diff --git a/syncstorage/src/tokenserver/settings.rs b/syncstorage/src/tokenserver/settings.rs index 981f8d69c6..35057a2d0c 100644 --- a/syncstorage/src/tokenserver/settings.rs +++ b/syncstorage/src/tokenserver/settings.rs @@ -52,6 +52,8 @@ pub struct Settings { pub statsd_label: String, /// Whether or not to run the Tokenserver migrations upon startup. pub run_migrations: bool, + /// The database ID of the Spanner node. + pub spanner_node_id: Option, } #[derive(Clone, Debug, Deserialize)] @@ -88,6 +90,7 @@ impl Default for Settings { node_type: NodeType::Spanner, statsd_label: "syncstorage.tokenserver".to_owned(), run_migrations: cfg!(test), + spanner_node_id: None, } } }