Skip to content

Commit

Permalink
fix: fetch messages shouldn't read the current timestamp's notif (#577)
Browse files Browse the repository at this point in the history
and remove the manual sortkey_timestamp filtering which is no longer
necessary w/ the row range reading (and it covered up this bug)

also fix flakey hello_again tests (connected_at should be in the past)

Issue: SYNC-4068
  • Loading branch information
pjenvey authored Jan 26, 2024
1 parent 09fe7ac commit fc99e77
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 37 deletions.
7 changes: 6 additions & 1 deletion autoconnect/autoconnect-common/src/test_support.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use uuid::Uuid;

use autopush_common::db::{mock::MockDbClient, User};
use autopush_common::{
db::{mock::MockDbClient, User},
util::timing::ms_since_epoch,
};

pub const UA: &str =
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0";
Expand Down Expand Up @@ -33,6 +36,8 @@ pub fn hello_again_db(uaid: Uuid) -> MockDbClient {
db.expect_get_user().times(1).return_once(move |_| {
Ok(Some(User {
uaid,
// Last connected 10 minutes ago
connected_at: ms_since_epoch() - (10 * 60 * 1000),
current_month: Some(CURRENT_MONTH.to_owned()),
..Default::default()
}))
Expand Down
22 changes: 0 additions & 22 deletions autopush-common/src/db/bigtable/bigtable_client/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ impl RowMerger {
/// Iterate through all the returned chunks and compile them into a hash of finished cells indexed by row_key
pub async fn process_chunks(
mut stream: ClientSStreamReceiver<ReadRowsResponse>,
timestamp_filter: Option<u64>,
limit: Option<usize>,
) -> Result<BTreeMap<RowKey, Row>, BigTableError> {
// Work object
Expand Down Expand Up @@ -440,28 +439,7 @@ impl RowMerger {
}
if merger.state == ReadState::RowComplete {
debug! {"🟧 row complete"};
// Check to see if we can add this row, or if it's blocked by the timestamp filter.
let finished_row = merger.row_complete(&mut chunk)?;
if let Some(timestamp) = timestamp_filter {
if let Some(sk_ts) = finished_row.clone().take_cell("sortkey_timestamp") {
let ts_val = crate::db::bigtable::bigtable_client::to_u64(
sk_ts.value,
"sortkey_timestamp",
)
.map_err(|_| {
BigTableError::InvalidChunk("Invalid timestamp".to_owned())
})?;
if ts_val <= timestamp {
trace!(
"⚖ {}: Skipping {} <= {}",
&finished_row.row_key,
ts_val,
timestamp
);
continue;
}
}
}
rows.insert(finished_row.row_key.clone(), finished_row);
} else if chunk.has_commit_row() {
return Err(BigTableError::InvalidChunk(format!(
Expand Down
25 changes: 11 additions & 14 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,10 @@ impl BigTableClientImpl {
}

/// Read a given row from the row key.
async fn read_row(
&self,
row_key: &str,
timestamp_filter: Option<u64>,
) -> Result<Option<row::Row>, error::BigTableError> {
async fn read_row(&self, row_key: &str) -> Result<Option<row::Row>, error::BigTableError> {
debug!("🉑 Row key: {}", row_key);
let req = self.read_row_request(row_key);
let mut rows = self.read_rows(req, timestamp_filter, None).await?;
let mut rows = self.read_rows(req, None).await?;
Ok(rows.remove(row_key))
}

Expand Down Expand Up @@ -227,15 +223,14 @@ impl BigTableClientImpl {
async fn read_rows(
&self,
req: ReadRowsRequest,
sortkey_filter: Option<u64>,
limit: Option<usize>,
) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
let bigtable = self.pool.get().await?;
let resp = bigtable
.conn
.read_rows(&req)
.map_err(error::BigTableError::Read)?;
merge::RowMerger::process_chunks(resp, sortkey_filter, limit).await
merge::RowMerger::process_chunks(resp, limit).await
}

/// write a given row.
Expand Down Expand Up @@ -684,7 +679,7 @@ impl DbClient for BigTableClientImpl {
..Default::default()
};

if let Some(mut record) = self.read_row(&row_key, None).await? {
if let Some(mut record) = self.read_row(&row_key).await? {
trace!("🉑 Found a record for that user");
if let Some(mut cells) = record.take_cells("connected_at") {
if let Some(cell) = cells.pop() {
Expand Down Expand Up @@ -854,7 +849,7 @@ impl DbClient for BigTableClientImpl {
filter.set_chain(filter_chain);
req.set_filter(filter);

let mut rows = self.read_rows(req, None, None).await?;
let mut rows = self.read_rows(req, None).await?;
let mut result = HashSet::new();
if let Some(record) = rows.remove(&row_key) {
for mut cells in record.cells.into_values() {
Expand Down Expand Up @@ -1100,7 +1095,7 @@ impl DbClient for BigTableClientImpl {
req.set_rows_limit(limit as i64);
}
// */
let rows = self.read_rows(req, None, Some(limit)).await?;
let rows = self.read_rows(req, Some(limit)).await?;
debug!(
"🉑 Fetch Topic Messages. Found {} row(s) of {}",
rows.len(),
Expand All @@ -1124,7 +1119,9 @@ impl DbClient for BigTableClientImpl {
let mut row_range = data::RowRange::default();

let start_key = if let Some(ts) = timestamp {
format!("{}#02:{}", uaid.simple(), ts)
// Fetch everything after the last message with timestamp: the "z"
// moves past the last message's channel_id's 1st hex digit
format!("{}#02:{}z", uaid.simple(), ts)
} else {
format!("{}#02:", uaid.simple())
};
Expand Down Expand Up @@ -1159,7 +1156,7 @@ impl DbClient for BigTableClientImpl {
req.set_rows_limit(limit as i64);
}
// */
let rows = self.read_rows(req, timestamp, Some(limit)).await?;
let rows = self.read_rows(req, Some(limit)).await?;
debug!(
"🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}",
timestamp,
Expand Down Expand Up @@ -1491,7 +1488,7 @@ mod tests {
}],
);
client.write_row(row).await.unwrap();
let Some(row) = client.read_row(&row_key, None).await.unwrap() else {
let Some(row) = client.read_row(&row_key).await.unwrap() else {
panic!("Expected row");
};
assert_eq!(row.cells.len(), 1);
Expand Down

0 comments on commit fc99e77

Please sign in to comment.