Skip to content

Commit

Permalink
bug: fix Spanner node query (#1332)
Browse files Browse the repository at this point in the history
Closes #1331
  • Loading branch information
ethowitz authored Jun 13, 2022
1 parent 7d02a3a commit 3e81ef1
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 28 deletions.
168 changes: 140 additions & 28 deletions syncstorage/src/tokenserver/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ impl TokenserverDb {

/// Gets the least-loaded node that has available slots.
fn get_best_node_sync(&self, params: params::GetBestNode) -> DbResult<results::GetBestNode> {
const DEFAULT_CAPACITY_RELEASE_RATE: f32 = 0.1;
const GET_BEST_NODE_QUERY: &str = r#"
SELECT id, node
FROM nodes
Expand All @@ -211,43 +212,60 @@ impl TokenserverDb {
AND capacity > current_load
AND downed = 0
"#;
const DEFAULT_CAPACITY_RELEASE_RATE: f32 = 0.1;
const SPANNER_QUERY: &str = r#"
SELECT id, node
FROM nodes
WHERE id = ?
LIMIT 1
"#;

let mut metrics = self.metrics.clone();
metrics.start_timer("storage.get_best_node", None);

// We may have to retry the query if we need to release more capacity. This loop allows
// a maximum of five retries before bailing out.
for _ in 0..5 {
let maybe_result = diesel::sql_query(GET_BEST_NODE_QUERY)
.bind::<Integer, _>(params.service_id)
if let Some(spanner_node_id) = params.spanner_node_id {
diesel::sql_query(SPANNER_QUERY)
.bind::<Integer, _>(spanner_node_id)
.get_result::<results::GetBestNode>(&self.inner.conn)
.optional()?;
.map_err(|e| {
let mut db_error =
DbError::internal(&format!("unable to get Spanner node: {}", e));
db_error.status = StatusCode::SERVICE_UNAVAILABLE;
db_error
})
} else {
// We may have to retry the query if we need to release more capacity. This loop allows
// a maximum of five retries before bailing out.
for _ in 0..5 {
let maybe_result = diesel::sql_query(GET_BEST_NODE_QUERY)
.bind::<Integer, _>(params.service_id)
.get_result::<results::GetBestNode>(&self.inner.conn)
.optional()?;

if let Some(result) = maybe_result {
return Ok(result);
}

if let Some(result) = maybe_result {
return Ok(result);
// There were no available nodes. Try to release additional capacity from any nodes
// that are not fully occupied.
let affected_rows = diesel::sql_query(RELEASE_CAPACITY_QUERY)
.bind::<Float, _>(
params
.capacity_release_rate
.unwrap_or(DEFAULT_CAPACITY_RELEASE_RATE),
)
.bind::<Integer, _>(params.service_id)
.execute(&self.inner.conn)?;

// If no nodes were affected by the last query, give up.
if affected_rows == 0 {
break;
}
}

// There were no available nodes. Try to release additional capacity from any nodes
// that are not fully occupied.
let affected_rows = diesel::sql_query(RELEASE_CAPACITY_QUERY)
.bind::<Float, _>(
params
.capacity_release_rate
.unwrap_or(DEFAULT_CAPACITY_RELEASE_RATE),
)
.bind::<Integer, _>(params.service_id)
.execute(&self.inner.conn)?;

// If no nodes were affected by the last query, give up.
if affected_rows == 0 {
break;
}
let mut db_error = DbError::internal("unable to get a node");
db_error.status = StatusCode::SERVICE_UNAVAILABLE;
Err(db_error)
}

let mut db_error = DbError::internal("unable to get a node");
db_error.status = StatusCode::SERVICE_UNAVAILABLE;
Err(db_error)
}

fn add_user_to_node_sync(
Expand Down Expand Up @@ -367,6 +385,7 @@ impl TokenserverDb {
client_state: raw_user.client_state.clone(),
keys_changed_at: raw_user.keys_changed_at,
capacity_release_rate: params.capacity_release_rate,
spanner_node_id: params.spanner_node_id,
})?
};

Expand Down Expand Up @@ -414,6 +433,7 @@ impl TokenserverDb {
let node = self.get_best_node_sync(params::GetBestNode {
service_id: params.service_id,
capacity_release_rate: params.capacity_release_rate,
spanner_node_id: params.spanner_node_id,
})?;

// Decrement `available` and increment `current_load` on the node assigned to the user.
Expand Down Expand Up @@ -1200,6 +1220,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})?;
assert_eq!(user.node, "https://node1");

Expand Down Expand Up @@ -1253,6 +1274,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})?;

let user2 = db.allocate_user_sync(params::AllocateUser {
Expand All @@ -1262,6 +1284,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})?;

// Because users are always assigned to the least-loaded node, the users should have been
Expand Down Expand Up @@ -1305,6 +1328,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
});
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Unexpected error: unable to get a node");
Expand Down Expand Up @@ -1346,6 +1370,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
});
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Unexpected error: unable to get a node");
Expand Down Expand Up @@ -1386,6 +1411,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})?;
let user1 = db
.get_user(params::GetUser {
Expand All @@ -1409,6 +1435,7 @@ mod tests {
client_state: "bbbb".to_owned(),
keys_changed_at: Some(1235),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand Down Expand Up @@ -1459,6 +1486,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1470,6 +1498,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand Down Expand Up @@ -1529,6 +1558,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1540,6 +1570,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1551,6 +1582,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1562,6 +1594,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand Down Expand Up @@ -1594,6 +1627,7 @@ mod tests {
client_state: user.client_state.clone(),
keys_changed_at: user.keys_changed_at,
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand Down Expand Up @@ -1623,6 +1657,7 @@ mod tests {
client_state: user.client_state.clone(),
keys_changed_at: user.keys_changed_at,
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand Down Expand Up @@ -1681,6 +1716,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1698,6 +1734,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1717,6 +1754,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1734,6 +1772,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1752,6 +1791,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1769,6 +1809,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1787,6 +1828,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await;

Expand Down Expand Up @@ -1834,6 +1876,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1853,6 +1896,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand Down Expand Up @@ -1896,6 +1940,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1912,6 +1957,7 @@ mod tests {
client_state: "aaaa".to_owned(),
keys_changed_at: Some(1234),
capacity_release_rate: None,
..Default::default()
})
.await?;

Expand All @@ -1921,6 +1967,72 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_get_spanner_node() -> DbResult<()> {
let pool = db_pool().await?;
let db = pool.get().await?;

// Add a service
let service_id = db
.post_service(params::PostService {
service: "sync-1.5".to_owned(),
pattern: "{node}/1.5/{uid}".to_owned(),
})
.await?
.id;

// Add a node with capacity and available set to 0
let spanner_node_id = db
.post_node(params::PostNode {
service_id,
node: "https://spanner_node".to_owned(),
current_load: 1000,
capacity: 0,
available: 0,
..Default::default()
})
.await?
.id;

// Add another node with available capacity
db.post_node(params::PostNode {
service_id,
node: "https://another_node".to_owned(),
current_load: 0,
capacity: 1000,
available: 1000,
..Default::default()
})
.await?;

// Ensure the Spanner node is selected if the Spanner node ID is provided as a parameter
assert_eq!(
db.get_best_node(params::GetBestNode {
service_id,
capacity_release_rate: None,
spanner_node_id: Some(spanner_node_id as i32)
})
.await?
.id,
spanner_node_id
);

// Ensure the node with available capacity is selected if the Spanner node ID is not
// provided as a parameter
assert_ne!(
db.get_best_node(params::GetBestNode {
service_id,
capacity_release_rate: None,
spanner_node_id: None
})
.await?
.id,
spanner_node_id
);

Ok(())
}

async fn db_pool() -> DbResult<TokenserverPool> {
let _ = env_logger::try_init();

Expand Down
Loading

0 comments on commit 3e81ef1

Please sign in to comment.