Skip to content

Commit

Permalink
bug: copy over the channels when migrating the user (#539)
Browse files Browse the repository at this point in the history
This will attempt to copy the existing channels for a user. This is
important because desktop does not check and rebuild channels if the
server doesn't have them. (Might want to make that a standardized
feature in the future.)

Closes: SYNC-4050
  • Loading branch information
jrconlin authored Dec 20, 2023
1 parent ac5c691 commit e36b3fe
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 186 deletions.
367 changes: 183 additions & 184 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion autopush-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ actix-rt = "2.8"

[features]
# for testing:
# default = ["emulator"]
# default = ["emulator", "bigtable", "dynamodb"]
default = ["dynamodb"]
bigtable = ["dep:google-cloud-rust-raw", "dep:grpcio", "dep:protobuf"]
dynamodb = ["dep:rusoto_core", "dep:rusoto_credential", "dep:rusoto_dynamodb"]
Expand Down
86 changes: 86 additions & 0 deletions autopush-common/src/db/bigtable/bigtable_client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,83 @@ use thiserror::Error;

use crate::errors::ReportableError;

#[derive(PartialEq, Eq, Debug)]
pub enum MutateRowStatus {
OK,
Cancelled,
Unknown,
InvalidArgument,
DeadlineExceeded,
NotFound,
AlreadyExists,
PermissionDenied,
ResourceExhausted,
FailedPrecondition,
Aborted,
OutOfRange,
Unimplemented,
Internal,
Unavailable,
DataLoss,
Unauthenticated,
}

impl MutateRowStatus {
pub fn is_ok(&self) -> bool {
self == &Self::OK
}
}

impl From<i32> for MutateRowStatus {
fn from(v: i32) -> Self {
match v {
0 => Self::OK,
1 => Self::Cancelled,
2 => Self::Unknown,
3 => Self::InvalidArgument,
4 => Self::DeadlineExceeded,
5 => Self::NotFound,
6 => Self::AlreadyExists,
7 => Self::PermissionDenied,
8 => Self::ResourceExhausted,
9 => Self::FailedPrecondition,
10 => Self::Aborted,
11 => Self::OutOfRange,
12 => Self::Unimplemented,
13 => Self::Internal,
14 => Self::Unavailable,
15 => Self::DataLoss,
16 => Self::Unauthenticated,
_ => Self::Unknown,
}
}
}

impl ToString for MutateRowStatus {
fn to_string(&self) -> String {
match self {
MutateRowStatus::OK => "Ok",
MutateRowStatus::Cancelled => "Cancelled",
MutateRowStatus::Unknown => "Unknown",
MutateRowStatus::InvalidArgument => "Invalid Argument",
MutateRowStatus::DeadlineExceeded => "Deadline Exceeded",
MutateRowStatus::NotFound => "Not Found",
MutateRowStatus::AlreadyExists => "Already Exists",
MutateRowStatus::PermissionDenied => "Permission Denied",
MutateRowStatus::ResourceExhausted => "Resource Exhausted",
MutateRowStatus::FailedPrecondition => "Failed Precondition",
MutateRowStatus::Aborted => "Aborted",
MutateRowStatus::OutOfRange => "Out of Range",
MutateRowStatus::Unimplemented => "Unimplemented",
MutateRowStatus::Internal => "Internal",
MutateRowStatus::Unavailable => "Unavailable",
MutateRowStatus::DataLoss => "Data Loss",
MutateRowStatus::Unauthenticated => "Unauthenticated",
}
.to_owned()
}
}

#[derive(Debug, Error)]
pub enum BigTableError {
#[error("Invalid Row Response")]
Expand All @@ -20,6 +97,11 @@ pub enum BigTableError {
#[error("Bigtable write error")]
Write(grpcio::Error),

/// Return a GRPC status code and any message.
/// See https://grpc.github.io/grpc/core/md_doc_statuscodes.html
#[error("Bigtable status response")]
Status(MutateRowStatus, String),

#[error("BigTable Admin Error")]
Admin(String, Option<String>),

Expand Down Expand Up @@ -52,6 +134,7 @@ impl ReportableError for BigTableError {
BigTableError::InvalidChunk(_) => "storage.bigtable.error.invalid_chunk",
BigTableError::Read(_) => "storage.bigtable.error.read",
BigTableError::Write(_) => "storage.bigtable.error.write",
BigTableError::Status(_, _) => "storage.bigtable.error.status",
BigTableError::WriteTime(_) => "storage.bigtable.error.writetime",
BigTableError::Admin(_, _) => "storage.bigtable.error.admin",
BigTableError::Recycle => "storage.bigtable.error.recycle",
Expand All @@ -66,6 +149,9 @@ impl ReportableError for BigTableError {
BigTableError::InvalidChunk(s) => vec![("error", s.to_string())],
BigTableError::Read(s) => vec![("error", s.to_string())],
BigTableError::Write(s) => vec![("error", s.to_string())],
BigTableError::Status(code, s) => {
vec![("code", code.to_string()), ("error", s.to_string())]
}
BigTableError::WriteTime(s) => vec![("error", s.to_string())],
BigTableError::Admin(s, raw) => {
let mut x = vec![("error", s.to_owned())];
Expand Down
108 changes: 107 additions & 1 deletion autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use async_trait::async_trait;
use cadence::StatsdClient;
use futures_util::StreamExt;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
Expand Down Expand Up @@ -650,6 +651,98 @@ impl DbClient for BigTableClientImpl {
self.write_row(row).await.map_err(|e| e.into())
}

/// Add channels in bulk (used mostly during migration)
///
async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| DbError::General(e.to_string()))?
.as_millis();
let mut entries = protobuf::RepeatedField::default();
let mut req = bigtable::MutateRowsRequest::default();
let mut limit: u32 = 0;
req.set_table_name(self.settings.table_name.clone());

// Create entries that define rows that contain mutations to hold the updated value which
// will create/update the channels.
for channel in channels {
let mut entry = bigtable::MutateRowsRequest_Entry::default();
let key = as_key(uaid, Some(&channel), None);
entry.set_row_key(key.into_bytes());

let mut cell_mutations = protobuf::RepeatedField::default();
let mut mutation = data::Mutation::default();
let mut set_cell = data::Mutation_SetCell {
family_name: ROUTER_FAMILY.to_owned(),
..Default::default()
};
set_cell.set_column_qualifier("updated".to_owned().into_bytes().to_vec());
set_cell.set_value(now.to_be_bytes().to_vec());

mutation.set_set_cell(set_cell);
cell_mutations.push(mutation);
entry.set_mutations(cell_mutations);
entries.push(entry);
// There is a limit of 100,000 mutations per batch for bigtable.
// https://cloud.google.com/bigtable/quotas
// If you have 100,000 channels, you have too many.
limit += 1;
if limit >= 100_000 {
break;
}
}
req.set_entries(entries);

let bigtable = self.pool.get().await?;

// ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
let resp = bigtable
.conn
.mutate_rows(&req)
.map_err(error::BigTableError::Write)?;

// Scan the returned stream looking for errors.
// As I understand, the returned stream contains chunked MutateRowsResponse structs. Each
// struct contains the result of the row mutation, and contains a `status` (non-zero on error)
// and an optional message string (empty if none).
// The structure also contains an overall `status` but that does not appear to be exposed.
// Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html
let mut stream = Box::pin(resp);
let mut cnt = 0;
loop {
let (result, remainder) = stream.into_future().await;
if let Some(result) = result {
debug!("🎏 Result block: {}", cnt);
match result {
Ok(r) => {
for e in r.get_entries() {
if e.has_status() {
let status = e.get_status();
// See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
let code = error::MutateRowStatus::from(status.get_code());
if !code.is_ok() {
return Err(error::BigTableError::Status(
code,
status.get_message().to_owned(),
)
.into());
}
debug!("🎏 Response: {} OK", e.index);
}
}
}
Err(e) => return Err(error::BigTableError::Write(e).into()),
};
cnt += 1;
} else {
debug!("🎏 Done!");
break;
}
stream = remainder;
}
Ok(())
}

/// Delete all the rows that start with the given prefix. NOTE: this may be metered and should
/// be used with caution.
async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
Expand Down Expand Up @@ -1070,6 +1163,7 @@ mod tests {
//!
use std::sync::Arc;
use std::time::SystemTime;
use uuid;

use super::*;
use cadence::StatsdClient;
Expand Down Expand Up @@ -1153,6 +1247,19 @@ mod tests {
let channels = client.get_channels(&uaid).await;
assert!(channels.unwrap().contains(&chid));

// can we add lots of channels?
let mut new_channels: HashSet<Uuid> = HashSet::new();
new_channels.insert(chid);
for _ in 1..10 {
new_channels.insert(uuid::Uuid::new_v4());
}
client
.add_channels(&uaid, new_channels.clone())
.await
.unwrap();
let channels = client.get_channels(&uaid).await.unwrap();
assert_eq!(channels, new_channels);

// can we modify the user record?
let updated = User {
connected_at: now() + 3,
Expand Down Expand Up @@ -1222,7 +1329,6 @@ mod tests {
.await
.unwrap()
.messages;
print!("Messages: {:?}", &msgs);
assert!(msgs.is_empty());

assert!(client.remove_user(&uaid).await.is_ok());
Expand Down
3 changes: 3 additions & 0 deletions autopush-common/src/db/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub trait DbClient: Send + Sync {
/// Add a channel to a user
async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()>;

/// Add a batch of channels to a user
async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()>;

/// Get the set of channel IDs for a user
async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>>;

Expand Down
7 changes: 7 additions & 0 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl DbClient for DualClientImpl {
// copy the user record over to the new data store.
debug!("⚖ Found user record in secondary, moving to primary");
self.primary.add_user(&user).await?;
let channels = self.secondary.get_channels(uaid).await?;
self.primary.add_channels(uaid, channels).await?;
return Ok(Some(user));
}
}
Expand All @@ -165,6 +167,11 @@ impl DbClient for DualClientImpl {
target.add_channel(uaid, channel_id).await
}

async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
let (target, _) = self.allot(uaid).await?;
target.add_channels(uaid, channels).await
}

async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
let (target, is_primary) = self.allot(uaid).await?;
let mut channels = target.get_channels(uaid).await?;
Expand Down
8 changes: 8 additions & 0 deletions autopush-common/src/db/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ impl DbClient for DdbClientImpl {
Ok(())
}

/// Hopefully, this is never called. It is provided for completion sake.
async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
for channel_id in channels {
self.add_channel(uaid, &channel_id).await?;
}
Ok(())
}

async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
// Channel IDs are stored in a special row in the message table, where
// chidmessageid = " "
Expand Down
4 changes: 4 additions & 0 deletions autopush-common/src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ impl DbClient for Arc<MockDbClient> {
Arc::as_ref(self).add_channel(uaid, channel_id).await
}

async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
Arc::as_ref(self).add_channels(uaid, channels).await
}

async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
Arc::as_ref(self).get_channels(uaid).await
}
Expand Down

0 comments on commit e36b3fe

Please sign in to comment.