Skip to content

Commit

Permalink
feat: add implicit updates of channel records to update_user (#734)
Browse files Browse the repository at this point in the history
in order to refresh TTLs of all the router row data, allowing automatic
garbage collection of inactive router records by Bigtable

and update openssl per RUSTSEC-2024-0357 

Closes SYNC-4221
  • Loading branch information
pjenvey authored Jul 22, 2024
1 parent 843eeed commit 4aef280
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 61 deletions.
40 changes: 36 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions autoconnect/autoconnect-common/src/test_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ pub fn hello_db() -> MockDbClient {
pub fn hello_again_db(uaid: Uuid) -> MockDbClient {
let mut db = MockDbClient::new();
db.expect_get_user().times(1).return_once(move |_| {
Ok(Some(User {
uaid,
// Last connected 10 minutes ago
connected_at: ms_since_epoch() - (10 * 60 * 1000),
..Default::default()
}))
let user = User::builder()
.uaid(uaid)
.connected_at(ms_since_epoch() - (10 * 60 * 1000))
.build()
.unwrap();
Ok(Some(user))
});

db.expect_update_user().times(1).return_once(|_| Ok(true));
Expand Down
10 changes: 5 additions & 5 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ impl UnidentifiedClient {
// change from the previous state machine impl)
}

let user = User {
node_id: Some(self.app_state.router_url.to_owned()),
connected_at,
..Default::default()
};
let user = User::builder()
.node_id(self.app_state.router_url.to_owned())
.connected_at(connected_at)
.build()
.map_err(|e| SMErrorKind::Internal(format!("User::builder error: {e}")))?;
Ok(GetOrCreateUser {
user,
existing_user: false,
Expand Down
11 changes: 6 additions & 5 deletions autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,15 @@ pub mod tests {
data: Option<String>,
router_type: RouterType,
) -> Notification {
let user = User::builder()
.router_data(router_data)
.router_type(router_type.to_string())
.build()
.unwrap();
Notification {
message_id: "test-message-id".to_string(),
subscription: Subscription {
user: User {
router_data: Some(router_data),
router_type: router_type.to_string(),
..Default::default()
},
user,
channel_id: channel_id(),
vapid: None,
},
Expand Down
10 changes: 5 additions & 5 deletions autoendpoint/src/routes/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ pub async fn register_uaid_route(
incr_metric("ua.command.register", &app_state.metrics, &request);

// Register user and channel in database
let user = User {
router_type: path_args.router_type.to_string(),
router_data: Some(router_data),
..Default::default()
};
let user = User::builder()
.router_type(path_args.router_type.to_string())
.router_data(router_data)
.build()
.map_err(|e| ApiErrorKind::General(format!("User::builder error: {e}")))?;
let channel_id = router_data_input.channel_id.unwrap_or_else(Uuid::new_v4);
trace!("🌍 Creating user with UAID {}", user.uaid);
trace!("🌍 user = {:?}", user);
Expand Down
1 change: 1 addition & 0 deletions autopush-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ url.workspace = true

again = "0.1"
async-trait = "0.1"
derive_builder = "0.20"
gethostname = "0.4"
num_cpus = "1.16"
woothee = "0.13"
Expand Down
168 changes: 134 additions & 34 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt;
use std::fmt::Display;
Expand Down Expand Up @@ -28,7 +29,7 @@ use crate::db::{
};

pub use self::metadata::MetadataBuilder;
use self::row::Row;
use self::row::{Row, RowCells};
use super::pool::BigTablePool;
use super::BigTableDbSettings;

Expand Down Expand Up @@ -201,6 +202,46 @@ fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
})
}

/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row.
///
/// Cells should solely contain the set of channels otherwise an Error is returned.
fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
let mut result = HashSet::new();
for cells in cells.values() {
let Some(cell) = cells.last() else {
continue;
};
let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
return Err(DbError::Integrity(
"get_channels expected: chid:<chid>".to_owned(),
None,
));
};
result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
}
Ok(result)
}

/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row
fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
let channels = channels.into_owned();
let mut cells = Vec::with_capacity(channels.len().min(100_000));
for (i, channel_id) in channels.into_iter().enumerate() {
// There is a limit of 100,000 mutations per batch for bigtable.
// https://cloud.google.com/bigtable/quotas
// If you have 100,000 channels, you have too many.
if i >= 100_000 {
break;
}
cells.push(cell::Cell {
qualifier: format!("chid:{}", channel_id.as_hyphenated()),
timestamp: expiry,
..Default::default()
});
}
cells
}

pub fn retry_policy(max: usize) -> RetryPolicy {
RetryPolicy::default()
.with_max_retries(max)
Expand Down Expand Up @@ -281,7 +322,7 @@ pub fn retryable_error(metrics: Arc<StatsdClient>) -> impl Fn(&grpcio::Error) ->
/// 2) When router TTLs are eventually enabled: `add_channel` and
/// `increment_storage` can write cells with later expiry times than the other
/// router cells
fn is_incomplete_router_record(cells: &HashMap<String, Vec<cell::Cell>>) -> bool {
fn is_incomplete_router_record(cells: &RowCells) -> bool {
cells
.keys()
.all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
Expand Down Expand Up @@ -770,6 +811,11 @@ impl BigTableClientImpl {
});
};

cells.extend(channels_to_cells(
Cow::Borrowed(&user.priv_channels),
expiry,
));

row.add_cells(ROUTER_FAMILY, cells);
row
}
Expand Down Expand Up @@ -942,6 +988,9 @@ impl DbClient for BigTableClientImpl {
result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
}

// Read the channels last, after removal of all non channel cells
result.priv_channels = channels_from_cells(&row.cells)?;

Ok(Some(result))
}

Expand Down Expand Up @@ -976,24 +1025,13 @@ impl DbClient for BigTableClientImpl {
let mut row = Row::new(row_key);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_CHANNEL_TTL);

let mut cells = Vec::with_capacity(channels.len().min(100_000));
for (i, channel_id) in channels.into_iter().enumerate() {
// There is a limit of 100,000 mutations per batch for bigtable.
// https://cloud.google.com/bigtable/quotas
// If you have 100,000 channels, you have too many.
if i >= 100_000 {
break;
}
cells.push(cell::Cell {
qualifier: format!("chid:{}", channel_id.as_hyphenated()),
timestamp: expiry,
..Default::default()
});
}
// Note: updating the version column isn't necessary here because this
// write only adds a new (or updates an existing) column with a 0 byte
// value
row.add_cells(ROUTER_FAMILY, cells);
row.add_cells(
ROUTER_FAMILY,
channels_to_cells(Cow::Owned(channels), expiry),
);

self.write_row(row).await?;
Ok(())
Expand All @@ -1011,23 +1049,10 @@ impl DbClient for BigTableClientImpl {
cq_filter,
]));

let mut result = HashSet::new();
if let Some(record) = self.read_row(req).await? {
for mut cells in record.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
};
let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
return Err(DbError::Integrity(
"get_channels expected: chid:<chid>".to_owned(),
None,
));
};
result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
}
}

Ok(result)
let Some(row) = self.read_row(req).await? else {
return Ok(Default::default());
};
channels_from_cells(&row.cells)
}

/// Delete the channel. Does not delete its associated pending messages.
Expand Down Expand Up @@ -1769,4 +1794,79 @@ mod tests {

client.remove_user(&uaid).await.unwrap();
}

#[actix_rt::test]
async fn channel_and_current_timestamp_ttl_updates() {
let client = new_client().unwrap();
let uaid = gen_test_uaid();
let chid = Uuid::parse_str(TEST_CHID).unwrap();
client.remove_user(&uaid).await.unwrap();

// Setup a user with some channels and a current_timestamp
let user = User {
uaid,
..Default::default()
};
client.add_user(&user).await.unwrap();

client.add_channel(&uaid, &chid).await.unwrap();
client
.add_channel(&uaid, &uuid::Uuid::new_v4())
.await
.unwrap();

client
.increment_storage(
&uaid,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
)
.await
.unwrap();

let req = client.read_row_request(&uaid.as_simple().to_string());
let Some(mut row) = client.read_row(req).await.unwrap() else {
panic!("Expected row");
};

// Ensure the initial expiry (timestamp) of all the cells in the row
let expiry = row.take_required_cell("connected_at").unwrap().timestamp;
for mut cells in row.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
};
assert!(
cell.timestamp >= expiry,
"{} cell timestamp should >= connected_at's",
cell.qualifier
);
}

let mut user = client.get_user(&uaid).await.unwrap().unwrap();
client.update_user(&mut user).await.unwrap();

// Ensure update_user updated the expiry (timestamp) of every cell in the row
let req = client.read_row_request(&uaid.as_simple().to_string());
let Some(mut row) = client.read_row(req).await.unwrap() else {
panic!("Expected row");
};

let expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
assert!(expiry2 > expiry);

for mut cells in row.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
};
assert_eq!(
cell.timestamp, expiry2,
"{} cell timestamp should match connected_at's",
cell.qualifier
);
}

client.remove_user(&uaid).await.unwrap();
}
}
4 changes: 3 additions & 1 deletion autopush-common/src/db/bigtable/bigtable_client/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::db::error::{DbError, DbResult};

use super::{cell::Cell, RowKey};

pub type RowCells = HashMap<String, Vec<Cell>>;

/// A Bigtable storage row. Bigtable stores by Family ID which isn't
/// very useful for us later, so we overload this structure a bit.
/// When we read data back out of Bigtable, we index cells by
Expand All @@ -19,7 +21,7 @@ pub struct Row {
pub row_key: RowKey,
/// The row's collection of cells, indexed by either the
/// FamilyID (for write) or Qualifier (for read).
pub cells: HashMap<String, Vec<Cell>>,
pub cells: RowCells,
}

impl Row {
Expand Down
Loading

0 comments on commit 4aef280

Please sign in to comment.