Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store the channel_id metadata in a single bigtable row #574

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 146 additions & 120 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use uuid::Uuid;
use crate::db::{
client::{DbClient, FetchMessageResponse},
error::{DbError, DbResult},
DbSettings, Notification, User,
DbSettings, Notification, User, MAX_CHANNEL_TTL,
};

use self::row::Row;
Expand Down Expand Up @@ -137,26 +137,84 @@ impl BigTableClientImpl {
})
}

/// Return a ReadRowsRequest for a given row key
fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
let mut req = bigtable::ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());

let mut row_keys = RepeatedField::default();
row_keys.push(row_key.as_bytes().to_vec());
let mut row_set = data::RowSet::default();
row_set.set_row_keys(row_keys);
req.set_rows(row_set);

req
}

/// Read a given row from the row key.
async fn read_row(
&self,
row_key: &str,
timestamp_filter: Option<u64>,
) -> Result<Option<row::Row>, error::BigTableError> {
debug!("🉑 Row key: {}", row_key);
let req = self.read_row_request(row_key);
let mut rows = self.read_rows(req, timestamp_filter, None).await?;
Ok(rows.remove(row_key))
}

let mut row_keys = RepeatedField::default();
row_keys.push(row_key.as_bytes().to_vec());

let mut row_set = data::RowSet::default();
row_set.set_row_keys(row_keys);
/// Perform a MutateRowsRequest
async fn mutate_rows(
&self,
req: bigtable::MutateRowsRequest,
) -> Result<(), error::BigTableError> {
let bigtable = self.pool.get().await?;
// ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
let resp = bigtable
.conn
.mutate_rows(&req)
.map_err(error::BigTableError::Write)?;

let mut req = bigtable::ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());
req.set_rows(row_set);
// Scan the returned stream looking for errors.
// As I understand, the returned stream contains chunked MutateRowsResponse structs. Each
// struct contains the result of the row mutation, and contains a `status` (non-zero on error)
// and an optional message string (empty if none).
// The structure also contains an overall `status` but that does not appear to be exposed.
// Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html
let mut stream = Box::pin(resp);
let mut cnt = 0;
loop {
let (result, remainder) = stream.into_future().await;
if let Some(result) = result {
debug!("🎏 Result block: {}", cnt);
match result {
Ok(r) => {
for e in r.get_entries() {
if e.has_status() {
let status = e.get_status();
// See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
let code = error::MutateRowStatus::from(status.get_code());
if !code.is_ok() {
return Err(error::BigTableError::Status(
code,
status.get_message().to_owned(),
));
}
debug!("🎏 Response: {} OK", e.index);
}
}
}
Err(e) => return Err(error::BigTableError::Write(e)),
};
cnt += 1;
} else {
debug!("🎏 Done!");
break;
}
stream = remainder;
}

let mut rows = self.read_rows(req, timestamp_filter, None).await?;
Ok(rows.remove(row_key))
Ok(())
}

/// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key.
Expand Down Expand Up @@ -261,7 +319,7 @@ impl BigTableClientImpl {
&self,
row_key: &str,
family: &str,
column_names: &Vec<&str>,
column_names: &[&str],
time_range: Option<&data::TimestampRange>,
) -> Result<(), error::BigTableError> {
let mut req = bigtable::MutateRowRequest::default();
Expand Down Expand Up @@ -622,13 +680,13 @@ impl DbClient for BigTableClientImpl {
}

async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
let key = uaid.as_simple().to_string();
let row_key = uaid.as_simple().to_string();
let mut result = User {
uaid: *uaid,
..Default::default()
};

if let Some(mut record) = self.read_row(&key, None).await? {
if let Some(mut record) = self.read_row(&row_key, None).await? {
trace!("🉑 Found a record for that user");
if let Some(mut cells) = record.take_cells("connected_at") {
if let Some(cell) = cells.pop() {
Expand Down Expand Up @@ -699,24 +757,31 @@ impl DbClient for BigTableClientImpl {
}

async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
let key = format!("{}@{}", uaid.simple(), channel_id.as_hyphenated());

// We can use the default timestamp here because there shouldn't be a time
// based GC for router records.
let row_key = uaid.simple().to_string();
// channel_ids are stored as a set within one Bigtable row
//
// Bigtable allows "millions of columns in a table, as long as no row
// exceeds the maximum limit of 256 MB per row" enabling the use of
// column qualifiers as data.
//
// The "set" of channel_ids consists of column qualifiers named
// "chid:<chid value>" as set member entries (with their cell values
// being a single 0 byte).
//
// Storing the full set in a single row makes batch updates
// (particularly to reset the GC expiry timestamps) potentially more
// easy/efficient
let mut row = Row {
row_key: key,
row_key,
..Default::default()
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| DbError::General(e.to_string()))?
.as_millis();
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_CHANNEL_TTL);
row.cells.insert(
ROUTER_FAMILY.to_owned(),
vec![cell::Cell {
family: ROUTER_FAMILY.to_owned(),
qualifier: "updated".to_owned(),
value: now.to_be_bytes().to_vec(),
qualifier: format!("chid:{}", channel_id.as_hyphenated()),
timestamp: expiry,
..Default::default()
}],
);
Expand All @@ -726,36 +791,34 @@ impl DbClient for BigTableClientImpl {
/// Add channels in bulk (used mostly during migration)
///
async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| DbError::General(e.to_string()))?
let row_key = uaid.simple().to_string();
let expiry = (std::time::SystemTime::now() + Duration::from_secs(MAX_CHANNEL_TTL))
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(error::BigTableError::WriteTime)?
.as_millis();

let mut entries = protobuf::RepeatedField::default();
let mut req = bigtable::MutateRowsRequest::default();
let mut limit: u32 = 0;
req.set_table_name(self.settings.table_name.clone());

// Create entries that define rows that contain mutations to hold the updated value which
// will create/update the channels.
for channel in channels {
let mut entry = bigtable::MutateRowsRequest_Entry::default();
let key = format!("{}@{}", uaid.simple(), channel.as_hyphenated());
entry.set_row_key(key.into_bytes());
let mut entry = bigtable::MutateRowsRequest_Entry::default();
entry.set_row_key(row_key.into_bytes());
let mut cell_mutations = protobuf::RepeatedField::default();

let mut cell_mutations = protobuf::RepeatedField::default();
// Create entries that define rows that contain mutations to
// create/update the channels.
for channel in channels {
let mut mutation = data::Mutation::default();
let mut set_cell = data::Mutation_SetCell {
let set_cell = data::Mutation_SetCell {
family_name: ROUTER_FAMILY.to_owned(),
column_qualifier: format!("chid:{}", channel.as_hyphenated()).into_bytes(),
timestamp_micros: (expiry * 1000) as i64,
..Default::default()
};
set_cell.set_column_qualifier("updated".as_bytes().to_vec());
set_cell.set_value(now.to_be_bytes().to_vec());
set_cell.set_timestamp_micros((now * 1000) as i64);

mutation.set_set_cell(set_cell);
cell_mutations.push(mutation);
entry.set_mutations(cell_mutations);
entries.push(entry);
// There is a limit of 100,000 mutations per batch for bigtable.
// https://cloud.google.com/bigtable/quotas
// If you have 100,000 channels, you have too many.
Expand All @@ -764,96 +827,64 @@ impl DbClient for BigTableClientImpl {
break;
}
}
entry.set_mutations(cell_mutations);
entries.push(entry);
req.set_entries(entries);

let bigtable = self.pool.get().await?;

// ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
let resp = bigtable
.conn
.mutate_rows(&req)
.map_err(error::BigTableError::Write)?;

// Scan the returned stream looking for errors.
// As I understand, the returned stream contains chunked MutateRowsResponse structs. Each
// struct contains the result of the row mutation, and contains a `status` (non-zero on error)
// and an optional message string (empty if none).
// The structure also contains an overall `status` but that does not appear to be exposed.
// Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html
let mut stream = Box::pin(resp);
let mut cnt = 0;
loop {
let (result, remainder) = stream.into_future().await;
if let Some(result) = result {
debug!("🎏 Result block: {}", cnt);
match result {
Ok(r) => {
for e in r.get_entries() {
if e.has_status() {
let status = e.get_status();
// See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
let code = error::MutateRowStatus::from(status.get_code());
if !code.is_ok() {
return Err(error::BigTableError::Status(
code,
status.get_message().to_owned(),
)
.into());
}
debug!("🎏 Response: {} OK", e.index);
}
}
}
Err(e) => return Err(error::BigTableError::Write(e).into()),
};
cnt += 1;
} else {
debug!("🎏 Done!");
break;
}
stream = remainder;
}
self.mutate_rows(req).await?;
Ok(())
}

/// Delete all the rows that start with the given prefix. NOTE: this may be metered and should
/// be used with caution.
async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
let mut result = HashSet::new();
let row_key = uaid.simple().to_string();
let mut req = self.read_row_request(&row_key);

let mut req = bigtable::ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());
let mut filter_set: RepeatedField<RowFilter> = RepeatedField::default();

let start_key = format!("{}@", uaid.simple());
// '[' is the char after '@'
let end_key = format!("{}[", uaid.simple());
let mut rows = data::RowSet::default();
let mut row_range = data::RowRange::default();
row_range.set_start_key_open(start_key.into_bytes());
row_range.set_end_key_open(end_key.into_bytes());
let mut row_ranges = RepeatedField::default();
row_ranges.push(row_range);
rows.set_row_ranges(row_ranges);
req.set_rows(rows);
let mut family_filter = data::RowFilter::default();
family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$"));

let mut strip_filter = data::RowFilter::default();
strip_filter.set_strip_value_transformer(true);
req.set_filter(strip_filter);
let mut cq_filter = data::RowFilter::default();
cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());

filter_set.push(family_filter);
filter_set.push(cq_filter);

let mut filter_chain = RowFilter_Chain::default();
filter_chain.set_filters(filter_set);

let mut filter = data::RowFilter::default();
filter.set_chain(filter_chain);
req.set_filter(filter);

let rows = self.read_rows(req, None, None).await?;
for key in rows.keys().map(|v| v.to_owned()).collect::<Vec<String>>() {
if let Some(chid) = key.split('@').last() {
let mut rows = self.read_rows(req, None, None).await?;
let mut result = HashSet::new();
if let Some(record) = rows.remove(&row_key) {
for mut cells in record.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
};
let Some(chid) = cell.qualifier.split("chid:").last() else {
return Err(DbError::Integrity(
"get_channels expected: chid:<chid>".to_owned(),
));
};
result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
}
}

Ok(result)
}

/// Delete the channel and all associated pending messages.
/// Delete the channel. Does not delete its associated pending messages.
async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
let row_key = format!("{}@{}", uaid.simple(), channel_id.as_hyphenated());
Ok(self.delete_rows(&row_key).await?)
let row_key = uaid.simple().to_string();
let column = format!("chid:{}", channel_id.as_hyphenated());
self.delete_cells(&row_key, ROUTER_FAMILY, &[column.as_ref()], None)
.await?;
// XXX: Can we determine if the cq was actually removed (existed) from
// mutate_row's response?
Ok(true)
}

/// Remove the node_id. Can't really "surgically strike" this
Expand All @@ -872,13 +903,8 @@ impl DbClient for BigTableClientImpl {
let mut time_range = data::TimestampRange::default();
// convert connected at seconds into microseconds
time_range.set_end_timestamp_micros((connected_at * 1000000) as i64);
self.delete_cells(
&row_key,
ROUTER_FAMILY,
&["node_id"].to_vec(),
Some(&time_range),
)
.await?;
self.delete_cells(&row_key, ROUTER_FAMILY, &["node_id"], Some(&time_range))
.await?;
Ok(true)
}

Expand Down
3 changes: 3 additions & 0 deletions autopush-common/src/db/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub enum DbError {
#[error("Connection failure {0}")]
ConnectionError(String),

#[error("Database integrity error: {}", _0)]
Integrity(String),

#[error("Unknown Database Error {0}")]
General(String),
}
Expand Down